Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

JOAQUIN GUANTER GONZALBEZ

Hello,

 

I have observed that if a DataFrame is saved with partitioning columns in Parquet, then a sort is performed in FileFormatWriter (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152) because DynamicPartitionDataWriter only supports having a single file open at a time (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171). I think it would be possible to avoid this sort (which is a major bottleneck for some of my scenarios) if DynamicPartitionDataWriter could have multiple files open at the same time, and writing each piece of data to its corresponding file.

 

Would that change be a welcome PR for the project or is there any major problem that I am not considering that would prevent removing this sort?

 

Thanks,

Ximo.

 

 

 

 

Some more detail about the problem, in case I didn’t explain myself correctly: suppose we have a dataframe which we want to partition by column A:

 

Column A

Column B

4

A

1

B

2

C

 

The current behavior will first sort the dataframe:

 

Column A

Column B

1

B

2

C

4

A

 

So that DynamicPartitionDataWriter can have a single file open, since all the data for a single partition will be adjacent and can be iterated over sequentially. In order to process the first row, DynamicPartitionDataWriter will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the data. When processing the second row it will see it belongs to a different partition, closet he first file and open a new file in /columna=2/part-r-00000-<uuid>.parquet and so on.

 

My proposed change would involve changing DynamicPartitionDataWriter to have as many open files as partitions, and close them all once all data has been processed.




Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição
Reply | Threaded
Open this post in threaded view
|

Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

rxin
The issue is memory overhead. Writing files create a lot of buffer (especially in columnar formats like Parquet/ORC). Even a few file handlers and buffers per task can OOM the entire process easily.


On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ <[hidden email]> wrote:

Hello,

 

I have observed that if a DataFrame is saved with partitioning columns in Parquet, then a sort is performed in FileFormatWriter (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152) because DynamicPartitionDataWriter only supports having a single file open at a time (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171). I think it would be possible to avoid this sort (which is a major bottleneck for some of my scenarios) if DynamicPartitionDataWriter could have multiple files open at the same time, and writing each piece of data to its corresponding file.

 

Would that change be a welcome PR for the project or is there any major problem that I am not considering that would prevent removing this sort?

 

Thanks,

Ximo.

 

 

 

 

Some more detail about the problem, in case I didn’t explain myself correctly: suppose we have a dataframe which we want to partition by column A:

 

Column A

Column B

4

A

1

B

2

C

 

The current behavior will first sort the dataframe:

 

Column A

Column B

1

B

2

C

4

A

 

So that DynamicPartitionDataWriter can have a single file open, since all the data for a single partition will be adjacent and can be iterated over sequentially. In order to process the first row, DynamicPartitionDataWriter will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the data. When processing the second row it will see it belongs to a different partition, closet he first file and open a new file in /columna=2/part-r-00000-<uuid>.parquet and so on.

 

My proposed change would involve changing DynamicPartitionDataWriter to have as many open files as partitions, and close them all once all data has been processed.




Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição


smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

Cheng Su-2

Hi,

 

Just for context - I created the JIRA for this around 2 years ago (https://issues.apache.org/jira/browse/SPARK-26164 and a stale PR not merged - https://github.com/apache/spark/pull/23163), and I recently discussed with Wenchen again, it looks like it might be reasonable to:

 

  1. Open multiple writers in parallel to write partitions/buckets.
  2. If number of writers exceeds a pre-defined threshold (controlled by a config), we sort rest of input rows, and fallback to current mode for write.

 

The approach uses number of writers to be proxy for memory usage here, I agree this is quite rudimentary. But given memory usage from writers is non-visible to spark now, it seems to me that there’s no other good way to model the memory usage for write. Internally we did the thing in same way, but our internal ORC is customized to better work with internal Spark for memory usage so we don’t see much issue for OOM (non-vectorization code path).

 

The config can be disabled by default to be consistent with current behavior, and users can choose to opt-in to non-sort mode if they are benefitted with not sorting on large amount of data.

 

Does it sound good as a plan? Would like to get more opinion on this. Thanks.

 

Cheng Su

 

From: Reynold Xin <[hidden email]>
Date: Friday, September 4, 2020 at 10:33 AM
To: XIMO GUANTER GONZALBEZ <[hidden email]>
Cc: Spark Dev List <[hidden email]>
Subject: Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

The issue is memory overhead. Writing files create a lot of buffer (especially in columnar formats like Parquet/ORC). Even a few file handlers and buffers per task can OOM the entire process easily.

 

 

On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ <[hidden email]> wrote:

Hello,

 

I have observed that if a DataFrame is saved with partitioning columns in Parquet, then a sort is performed in FileFormatWriter (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152) because DynamicPartitionDataWriter only supports having a single file open at a time (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171). I think it would be possible to avoid this sort (which is a major bottleneck for some of my scenarios) if DynamicPartitionDataWriter could have multiple files open at the same time, and writing each piece of data to its corresponding file.

 

Would that change be a welcome PR for the project or is there any major problem that I am not considering that would prevent removing this sort?

 

Thanks,

Ximo.

 

 

 

 

Some more detail about the problem, in case I didn’t explain myself correctly: suppose we have a dataframe which we want to partition by column A:

 

Column A

Column B

4

A

1

B

2

C

 

The current behavior will first sort the dataframe:

 

Column A

Column B

1

B

2

C

4

A

 

So that DynamicPartitionDataWriter can have a single file open, since all the data for a single partition will be adjacent and can be iterated over sequentially. In order to process the first row, DynamicPartitionDataWriter will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the data. When processing the second row it will see it belongs to a different partition, closet he first file and open a new file in /columna=2/part-r-00000-<uuid>.parquet and so on.

 

My proposed change would involve changing DynamicPartitionDataWriter to have as many open files as partitions, and close them all once all data has been processed.

 



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição

 

Reply | Threaded
Open this post in threaded view
|

Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

kalyan
Hi Cheng, 
Is there some place where I can get  more details on this, or if you could give a couple of lines explaining about it. 
But given memory usage from writers is non-visible to spark now, it seems to me that there’s no other good way to model the memory usage for write.

regards
kalyan.

On Sat, Sep 5, 2020 at 12:07 AM Cheng Su <[hidden email]> wrote:

Hi,

 

Just for context - I created the JIRA for this around 2 years ago (https://issues.apache.org/jira/browse/SPARK-26164 and a stale PR not merged - https://github.com/apache/spark/pull/23163), and I recently discussed with Wenchen again, it looks like it might be reasonable to:

 

  1. Open multiple writers in parallel to write partitions/buckets.
  2. If number of writers exceeds a pre-defined threshold (controlled by a config), we sort rest of input rows, and fallback to current mode for write.

 

The approach uses number of writers to be proxy for memory usage here, I agree this is quite rudimentary. But given memory usage from writers is non-visible to spark now, it seems to me that there’s no other good way to model the memory usage for write. Internally we did the thing in same way, but our internal ORC is customized to better work with internal Spark for memory usage so we don’t see much issue for OOM (non-vectorization code path).

 

The config can be disabled by default to be consistent with current behavior, and users can choose to opt-in to non-sort mode if they are benefitted with not sorting on large amount of data.

 

Does it sound good as a plan? Would like to get more opinion on this. Thanks.

 

Cheng Su

 

From: Reynold Xin <[hidden email]>
Date: Friday, September 4, 2020 at 10:33 AM
To: XIMO GUANTER GONZALBEZ <[hidden email]>
Cc: Spark Dev List <[hidden email]>
Subject: Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

Image removed by sender.

The issue is memory overhead. Writing files create a lot of buffer (especially in columnar formats like Parquet/ORC). Even a few file handlers and buffers per task can OOM the entire process easily.

 

 

On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ <[hidden email]> wrote:

Hello,

 

I have observed that if a DataFrame is saved with partitioning columns in Parquet, then a sort is performed in FileFormatWriter (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152) because DynamicPartitionDataWriter only supports having a single file open at a time (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171). I think it would be possible to avoid this sort (which is a major bottleneck for some of my scenarios) if DynamicPartitionDataWriter could have multiple files open at the same time, and writing each piece of data to its corresponding file.

 

Would that change be a welcome PR for the project or is there any major problem that I am not considering that would prevent removing this sort?

 

Thanks,

Ximo.

 

 

 

 

Some more detail about the problem, in case I didn’t explain myself correctly: suppose we have a dataframe which we want to partition by column A:

 

Column A

Column B

4

A

1

B

2

C

 

The current behavior will first sort the dataframe:

 

Column A

Column B

1

B

2

C

4

A

 

So that DynamicPartitionDataWriter can have a single file open, since all the data for a single partition will be adjacent and can be iterated over sequentially. In order to process the first row, DynamicPartitionDataWriter will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the data. When processing the second row it will see it belongs to a different partition, closet he first file and open a new file in /columna=2/part-r-00000-<uuid>.parquet and so on.

 

My proposed change would involve changing DynamicPartitionDataWriter to have as many open files as partitions, and close them all once all data has been processed.

 



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição

 

Reply | Threaded
Open this post in threaded view
|

Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

Cheng Su-2

Hi Kalyan,

 

> Is there some place where I can get  more details on this, or if you could give a couple of lines explaining about it. 

But given memory usage from writers is non-visible to spark now, it seems to me that there’s no other good way to model the memory usage for write.

 

Data source v1 write path is modeling writer as subclasses of `OutputWriter` class [1], where we only define methods to `write()` and `close()`, but no way to get memory usage for the writer. Underlying implementation for `OutputWriter` can decide how to use memory arbitrarily without going through spark memory management. E.g. `TextOutputWriter` [2] writes row right away to output downstream without any buffering/batching, however `OrcOutputWriter/OrcMapreduceRecordWriter` [3] buffers multiple rows into vectorization format columnar batch in memory before persisting them in ORC file.

 

[1]: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L274

[2]: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala#L40

[3]: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala#L54

and https://github.com/apache/orc/blob/master/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java#L53-L56

 

Cheng Su

 

From: kalyan <[hidden email]>
Date: Saturday, September 5, 2020 at 12:54 AM
To: Cheng Su <[hidden email]>
Cc: Reynold Xin <[hidden email]>, XIMO GUANTER GONZALBEZ <[hidden email]>, Spark Dev List <[hidden email]>
Subject: Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

Hi Cheng, 

Is there some place where I can get  more details on this, or if you could give a couple of lines explaining about it. 

But given memory usage from writers is non-visible to spark now, it seems to me that there’s no other good way to model the memory usage for write.

 

regards

kalyan.

 

On Sat, Sep 5, 2020 at 12:07 AM Cheng Su <[hidden email]> wrote:

Hi,

 

Just for context - I created the JIRA for this around 2 years ago (https://issues.apache.org/jira/browse/SPARK-26164 and a stale PR not merged - https://github.com/apache/spark/pull/23163), and I recently discussed with Wenchen again, it looks like it might be reasonable to:

 

  1. Open multiple writers in parallel to write partitions/buckets.
  2. If number of writers exceeds a pre-defined threshold (controlled by a config), we sort rest of input rows, and fallback to current mode for write.

 

The approach uses number of writers to be proxy for memory usage here, I agree this is quite rudimentary. But given memory usage from writers is non-visible to spark now, it seems to me that there’s no other good way to model the memory usage for write. Internally we did the thing in same way, but our internal ORC is customized to better work with internal Spark for memory usage so we don’t see much issue for OOM (non-vectorization code path).

 

The config can be disabled by default to be consistent with current behavior, and users can choose to opt-in to non-sort mode if they are benefitted with not sorting on large amount of data.

 

Does it sound good as a plan? Would like to get more opinion on this. Thanks.

 

Cheng Su

 

From: Reynold Xin <[hidden email]>
Date: Friday, September 4, 2020 at 10:33 AM
To: XIMO GUANTER GONZALBEZ <[hidden email]>
Cc: Spark Dev List <[hidden email]>
Subject: Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

Error! Filename not specified.

The issue is memory overhead. Writing files create a lot of buffer (especially in columnar formats like Parquet/ORC). Even a few file handlers and buffers per task can OOM the entire process easily.

 

 

On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ <[hidden email]> wrote:

Hello,

 

I have observed that if a DataFrame is saved with partitioning columns in Parquet, then a sort is performed in FileFormatWriter (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152) because DynamicPartitionDataWriter only supports having a single file open at a time (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171). I think it would be possible to avoid this sort (which is a major bottleneck for some of my scenarios) if DynamicPartitionDataWriter could have multiple files open at the same time, and writing each piece of data to its corresponding file.

 

Would that change be a welcome PR for the project or is there any major problem that I am not considering that would prevent removing this sort?

 

Thanks,

Ximo.

 

 

 

 

Some more detail about the problem, in case I didn’t explain myself correctly: suppose we have a dataframe which we want to partition by column A:

 

Column A

Column B

4

A

1

B

2

C

 

The current behavior will first sort the dataframe:

 

Column A

Column B

1

B

2

C

4

A

 

So that DynamicPartitionDataWriter can have a single file open, since all the data for a single partition will be adjacent and can be iterated over sequentially. In order to process the first row, DynamicPartitionDataWriter will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the data. When processing the second row it will see it belongs to a different partition, closet he first file and open a new file in /columna=2/part-r-00000-<uuid>.parquet and so on.

 

My proposed change would involve changing DynamicPartitionDataWriter to have as many open files as partitions, and close them all once all data has been processed.

 



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição

 

Reply | Threaded
Open this post in threaded view
|

RE: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

JOAQUIN GUANTER GONZALBEZ
In reply to this post by Cheng Su-2

> 1.        If number of writers exceeds a pre-defined threshold (controlled by a config), we sort rest of input rows, and fallback to current mode for write.

> The config can be disabled by default to be consistent with current behavior, and users can choose to opt-in to non-sort mode if they are benefitted with not sorting on large amount of data.

 

With both of those points in place, I think the plan is super reasonable since it wouldn’t affect anyone who isn’t actively tuning Spark, and enables those of us who are hitting this sort to have the tools to improve performance in our scenario.

 

Cheers,

Ximo.

 

De: Cheng Su <[hidden email]>
Enviado el: viernes, 4 de septiembre de 2020 20:38
Para: Reynold Xin <[hidden email]>; XIMO GUANTER GONZALBEZ <[hidden email]>
CC: Spark Dev List <[hidden email]>
Asunto: Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

Hi,

 

Just for context - I created the JIRA for this around 2 years ago (https://issues.apache.org/jira/browse/SPARK-26164 and a stale PR not merged - https://github.com/apache/spark/pull/23163), and I recently discussed with Wenchen again, it looks like it might be reasonable to:

 

  1. Open multiple writers in parallel to write partitions/buckets.
  2. If number of writers exceeds a pre-defined threshold (controlled by a config), we sort rest of input rows, and fallback to current mode for write.

 

The approach uses number of writers to be proxy for memory usage here, I agree this is quite rudimentary. But given memory usage from writers is non-visible to spark now, it seems to me that there’s no other good way to model the memory usage for write. Internally we did the thing in same way, but our internal ORC is customized to better work with internal Spark for memory usage so we don’t see much issue for OOM (non-vectorization code path).

 

The config can be disabled by default to be consistent with current behavior, and users can choose to opt-in to non-sort mode if they are benefitted with not sorting on large amount of data.

 

Does it sound good as a plan? Would like to get more opinion on this. Thanks.

 

Cheng Su

 

From: Reynold Xin <[hidden email]>
Date: Friday, September 4, 2020 at 10:33 AM
To: XIMO GUANTER GONZALBEZ <[hidden email]>
Cc: Spark Dev List <[hidden email]>
Subject: Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

The issue is memory overhead. Writing files create a lot of buffer (especially in columnar formats like Parquet/ORC). Even a few file handlers and buffers per task can OOM the entire process easily.

 

 

On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ <[hidden email]> wrote:

Hello,

 

I have observed that if a DataFrame is saved with partitioning columns in Parquet, then a sort is performed in FileFormatWriter (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152) because DynamicPartitionDataWriter only supports having a single file open at a time (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171). I think it would be possible to avoid this sort (which is a major bottleneck for some of my scenarios) if DynamicPartitionDataWriter could have multiple files open at the same time, and writing each piece of data to its corresponding file.

 

Would that change be a welcome PR for the project or is there any major problem that I am not considering that would prevent removing this sort?

 

Thanks,

Ximo.

 

 

 

 

Some more detail about the problem, in case I didn’t explain myself correctly: suppose we have a dataframe which we want to partition by column A:

 

Column A

Column B

4

A

1

B

2

C

 

The current behavior will first sort the dataframe:

 

Column A

Column B

1

B

2

C

4

A

 

So that DynamicPartitionDataWriter can have a single file open, since all the data for a single partition will be adjacent and can be iterated over sequentially. In order to process the first row, DynamicPartitionDataWriter will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the data. When processing the second row it will see it belongs to a different partition, closet he first file and open a new file in /columna=2/part-r-00000-<uuid>.parquet and so on.

 

My proposed change would involve changing DynamicPartitionDataWriter to have as many open files as partitions, and close them all once all data has been processed.

 



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição

 




Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição
Reply | Threaded
Open this post in threaded view
|

Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

Cheng Su-2

Thanks, Ximo. On our side, we do see the similar cases in production as well and we added this feature internally couple years ago. Let me submit new PR (which is mostly to rebase https://github.com/apache/spark/pull/23163 to latest master and try to have better code structure), if there’s no objection.

 

Thanks,

Cheng Su

 

From: XIMO GUANTER GONZALBEZ <[hidden email]>
Date: Sunday, September 6, 2020 at 10:55 PM
To: Cheng Su <[hidden email]>, Reynold Xin <[hidden email]>
Cc: Spark Dev List <[hidden email]>
Subject: RE: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

> 1.        If number of writers exceeds a pre-defined threshold (controlled by a config), we sort rest of input rows, and fallback to current mode for write.

> The config can be disabled by default to be consistent with current behavior, and users can choose to opt-in to non-sort mode if they are benefitted with not sorting on large amount of data.

 

With both of those points in place, I think the plan is super reasonable since it wouldn’t affect anyone who isn’t actively tuning Spark, and enables those of us who are hitting this sort to have the tools to improve performance in our scenario.

 

Cheers,

Ximo.

 

De: Cheng Su <[hidden email]>
Enviado el: viernes, 4 de septiembre de 2020 20:38
Para: Reynold Xin <[hidden email]>; XIMO GUANTER GONZALBEZ <[hidden email]>
CC: Spark Dev List <[hidden email]>
Asunto: Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

Hi,

 

Just for context - I created the JIRA for this around 2 years ago (https://issues.apache.org/jira/browse/SPARK-26164 and a stale PR not merged - https://github.com/apache/spark/pull/23163), and I recently discussed with Wenchen again, it looks like it might be reasonable to:

 

  1. Open multiple writers in parallel to write partitions/buckets.
  2. If number of writers exceeds a pre-defined threshold (controlled by a config), we sort rest of input rows, and fallback to current mode for write.

 

The approach uses number of writers to be proxy for memory usage here, I agree this is quite rudimentary. But given memory usage from writers is non-visible to spark now, it seems to me that there’s no other good way to model the memory usage for write. Internally we did the thing in same way, but our internal ORC is customized to better work with internal Spark for memory usage so we don’t see much issue for OOM (non-vectorization code path).

 

The config can be disabled by default to be consistent with current behavior, and users can choose to opt-in to non-sort mode if they are benefitted with not sorting on large amount of data.

 

Does it sound good as a plan? Would like to get more opinion on this. Thanks.

 

Cheng Su

 

From: Reynold Xin <[hidden email]>
Date: Friday, September 4, 2020 at 10:33 AM
To: XIMO GUANTER GONZALBEZ <[hidden email]>
Cc: Spark Dev List <[hidden email]>
Subject: Re: Avoiding unnnecessary sort in FileFormatWriter/DynamicPartitionDataWriter

 

The issue is memory overhead. Writing files create a lot of buffer (especially in columnar formats like Parquet/ORC). Even a few file handlers and buffers per task can OOM the entire process easily.

 

 

On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ <[hidden email]> wrote:

Hello,

 

I have observed that if a DataFrame is saved with partitioning columns in Parquet, then a sort is performed in FileFormatWriter (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152) because DynamicPartitionDataWriter only supports having a single file open at a time (see https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171). I think it would be possible to avoid this sort (which is a major bottleneck for some of my scenarios) if DynamicPartitionDataWriter could have multiple files open at the same time, and writing each piece of data to its corresponding file.

 

Would that change be a welcome PR for the project or is there any major problem that I am not considering that would prevent removing this sort?

 

Thanks,

Ximo.

 

 

 

 

Some more detail about the problem, in case I didn’t explain myself correctly: suppose we have a dataframe which we want to partition by column A:

 

Column A

Column B

4

A

1

B

2

C

 

The current behavior will first sort the dataframe:

 

Column A

Column B

1

B

2

C

4

A

 

So that DynamicPartitionDataWriter can have a single file open, since all the data for a single partition will be adjacent and can be iterated over sequentially. In order to process the first row, DynamicPartitionDataWriter will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the data. When processing the second row it will see it belongs to a different partition, closet he first file and open a new file in /columna=2/part-r-00000-<uuid>.parquet and so on.

 

My proposed change would involve changing DynamicPartitionDataWriter to have as many open files as partitions, and close them all once all data has been processed.

 



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição

 

 



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição