[DISCUSS][SQL] Control the number of output files

classic Classic list List threaded Threaded
22 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS][SQL] Control the number of output files

John Zhuge
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

rxin
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

Mark Hamstra
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

saurfang
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

saurfang
Sorry I see https://issues.apache.org/jira/browse/SPARK-6221 was referenced in John's email. Can you elaborate how is your requirement different? In my experience, it usually is driven by the need to decrease the final output parallelism without compromising compute parallelism (i.e. to prevent too many small files to be persisted on HDFS.) The requirement in my experience is often pretty ballpark and does not require precise number of partitions. Therefore setting the desired output size to say 32-64mb usually gives a good enough result. I'm curious why 6221 was marked as won't fix.

On Wed, Jul 25, 2018 at 2:26 PM Forest Fang <[hidden email]> wrote:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

saurfang
In reply to this post by saurfang
Sorry I see https://issues.apache.org/jira/browse/SPARK-6221 was referenced in John's email. Can you elaborate how is your requirement different? In my experience, it usually is driven by the need to decrease the final output parallelism without compromising compute parallelism (i.e. to prevent too many small files to be persisted on HDFS.) The requirement in my experience is often pretty ballpark and does not require precise number of partitions. Therefore setting the desired output size to say 32-64mb usually gives a good enough result. I'm curious why 6221 was marked as won't fix.

On Wed, Jul 25, 2018 at 2:26 PM Forest Fang <[hidden email]> wrote:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

saurfang
In reply to this post by Mark Hamstra
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

lukas nalezenec
In reply to this post by saurfang
Hi,
Yes, This feature is planned - Spark should be soon able to repartition output by size.
Lukas


Dne st 25. 7. 2018 23:26 uživatel Forest Fang <[hidden email]> napsal:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

saurfang
In reply to this post by Mark Hamstra
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

saurfang
In reply to this post by Mark Hamstra
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

saurfang
In reply to this post by saurfang
Sorry I see https://issues.apache.org/jira/browse/SPARK-6221 was referenced in John's email. Can you elaborate how is your requirement different? In my experience, it usually is driven by the need to decrease the final output parallelism without compromising compute parallelism (i.e. to prevent too many small files to be persisted on HDFS.) The requirement in my experience is often pretty ballpark and does not require precise number of partitions. Therefore setting the desired output size to say 32-64mb usually gives a good enough result. I'm curious why 6221 was marked as won't fix.

On Wed, Jul 25, 2018 at 2:26 PM Forest Fang <[hidden email]> wrote:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

saurfang
In reply to this post by saurfang
Sorry I see https://issues.apache.org/jira/browse/SPARK-6221 was referenced in John's email. Can you elaborate how is your requirement different? In my experience, it usually is driven by the need to decrease the final output parallelism without compromising compute parallelism (i.e. to prevent too many small files to be persisted on HDFS.) The requirement in my experience is often pretty ballpark and does not require precise number of partitions. Therefore setting the desired output size to say 32-64mb usually gives a good enough result. I'm curious why 6221 was marked as won't fix.

On Wed, Jul 25, 2018 at 2:26 PM Forest Fang <[hidden email]> wrote:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

John Zhuge
In reply to this post by saurfang
Thanks for the comment, Forest. What I am asking is to make whatever DF repartition/coalesce functionalities available to SQL users.

Agree with you on that reducing the final number of output files by file size is very nice to have. Lukas indicated this is planned.

On Wed, Jul 25, 2018 at 2:31 PM Forest Fang <[hidden email]> wrote:
Sorry I see https://issues.apache.org/jira/browse/SPARK-6221 was referenced in John's email. Can you elaborate how is your requirement different? In my experience, it usually is driven by the need to decrease the final output parallelism without compromising compute parallelism (i.e. to prevent too many small files to be persisted on HDFS.) The requirement in my experience is often pretty ballpark and does not require precise number of partitions. Therefore setting the desired output size to say 32-64mb usually gives a good enough result. I'm curious why 6221 was marked as won't fix.

On Wed, Jul 25, 2018 at 2:26 PM Forest Fang <[hidden email]> wrote:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge


--
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

rxin
John,

You want to create a ticket and submit a patch for this? If there is a coalesce hint, inject a coalesce logical node. Pretty simple.


On Wed, Jul 25, 2018 at 2:48 PM John Zhuge <[hidden email]> wrote:
Thanks for the comment, Forest. What I am asking is to make whatever DF repartition/coalesce functionalities available to SQL users.

Agree with you on that reducing the final number of output files by file size is very nice to have. Lukas indicated this is planned.

On Wed, Jul 25, 2018 at 2:31 PM Forest Fang <[hidden email]> wrote:
Sorry I see https://issues.apache.org/jira/browse/SPARK-6221 was referenced in John's email. Can you elaborate how is your requirement different? In my experience, it usually is driven by the need to decrease the final output parallelism without compromising compute parallelism (i.e. to prevent too many small files to be persisted on HDFS.) The requirement in my experience is often pretty ballpark and does not require precise number of partitions. Therefore setting the desired output size to say 32-64mb usually gives a good enough result. I'm curious why 6221 was marked as won't fix.

On Wed, Jul 25, 2018 at 2:26 PM Forest Fang <[hidden email]> wrote:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge


--
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

John Zhuge
Filed https://issues.apache.org/jira/browse/SPARK-24940. Will upload a patch shortly.

SPARK-20857 introduced a generic SQL Hint Framework since 2.2.0.

On Thu, Jul 26, 2018 at 4:25 PM Reynold Xin <[hidden email]> wrote:
John,

You want to create a ticket and submit a patch for this? If there is a coalesce hint, inject a coalesce logical node. Pretty simple.


On Wed, Jul 25, 2018 at 2:48 PM John Zhuge <[hidden email]> wrote:
Thanks for the comment, Forest. What I am asking is to make whatever DF repartition/coalesce functionalities available to SQL users.

Agree with you on that reducing the final number of output files by file size is very nice to have. Lukas indicated this is planned.

On Wed, Jul 25, 2018 at 2:31 PM Forest Fang <[hidden email]> wrote:
Sorry I see https://issues.apache.org/jira/browse/SPARK-6221 was referenced in John's email. Can you elaborate how is your requirement different? In my experience, it usually is driven by the need to decrease the final output parallelism without compromising compute parallelism (i.e. to prevent too many small files to be persisted on HDFS.) The requirement in my experience is often pretty ballpark and does not require precise number of partitions. Therefore setting the desired output size to say 32-64mb usually gives a good enough result. I'm curious why 6221 was marked as won't fix.

On Wed, Jul 25, 2018 at 2:26 PM Forest Fang <[hidden email]> wrote:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge


--
John Zhuge


--
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

Koert Kuipers
In reply to this post by lukas nalezenec
lukas,
what is the jira ticket for this? i would like to follow it's activity.
thanks!
koert

On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec <[hidden email]> wrote:
Hi,
Yes, This feature is planned - Spark should be soon able to repartition output by size.
Lukas


Dne st 25. 7. 2018 23:26 uživatel Forest Fang <[hidden email]> napsal:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

Xiao Li
FYI, the new hints have been merged. They will be available in the upcoming release (Spark 2.4). 

John Zhuge, thanks for your work! Really appreciate it! Please submit more PRs and help the community improve Spark. : ) 

Xiao

2018-08-05 21:06 GMT-04:00 Koert Kuipers <[hidden email]>:
lukas,
what is the jira ticket for this? i would like to follow it's activity.
thanks!
koert

On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec <[hidden email]> wrote:
Hi,
Yes, This feature is planned - Spark should be soon able to repartition output by size.
Lukas


Dne st 25. 7. 2018 23:26 uživatel Forest Fang <[hidden email]> napsal:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

John Zhuge
In reply to this post by Koert Kuipers

On Sun, Aug 5, 2018 at 6:06 PM Koert Kuipers <[hidden email]> wrote:
lukas,
what is the jira ticket for this? i would like to follow it's activity.
thanks!
koert

On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec <[hidden email]> wrote:
Hi,
Yes, This feature is planned - Spark should be soon able to repartition output by size.
Lukas


Dne st 25. 7. 2018 23:26 uživatel Forest Fang <[hidden email]> napsal:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge



--
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

John Zhuge
In reply to this post by Xiao Li
Great help from the community!

On Sun, Aug 5, 2018 at 6:17 PM Xiao Li <[hidden email]> wrote:
FYI, the new hints have been merged. They will be available in the upcoming release (Spark 2.4). 

John Zhuge, thanks for your work! Really appreciate it! Please submit more PRs and help the community improve Spark. : ) 

Xiao

2018-08-05 21:06 GMT-04:00 Koert Kuipers <[hidden email]>:
lukas,
what is the jira ticket for this? i would like to follow it's activity.
thanks!
koert

On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec <[hidden email]> wrote:
Hi,
Yes, This feature is planned - Spark should be soon able to repartition output by size.
Lukas


Dne st 25. 7. 2018 23:26 uživatel Forest Fang <[hidden email]> napsal:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge




--
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SQL] Control the number of output files

Koert Kuipers
In reply to this post by Koert Kuipers
i went through the jiras targeting 2.4.0 trying to find a feature where spark would coalesce/repartition by size (so merge small files automatically), but didn't find it.
can someone point me to it?
thank you.
best,
koert

On Sun, Aug 5, 2018 at 9:06 PM, Koert Kuipers <[hidden email]> wrote:
lukas,
what is the jira ticket for this? i would like to follow it's activity.
thanks!
koert

On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec <[hidden email]> wrote:
Hi,
Yes, This feature is planned - Spark should be soon able to repartition output by size.
Lukas


Dne st 25. 7. 2018 23:26 uživatel Forest Fang <[hidden email]> napsal:
Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config.



On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra <[hidden email]> wrote:
See some of the related discussion under https://github.com/apache/spark/pull/21589

If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. 

On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin <[hidden email]> wrote:
Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any.


On Wed, Jul 25, 2018 at 11:50 AM John Zhuge <[hidden email]> wrote:
Hi all,

Many Spark users in my company are asking for a way to control the number of output files in Spark SQL. There are use cases to either reduce or increase the number. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code.

Could we introduce a query hint for this purpose (similar to Broadcast Join Hints)?

    /*+ COALESCE(n, shuffle) */

In general, is query hint is the best way to bring DF functionality to SQL without extending SQL syntax? Any suggestion is highly appreciated.

This requirement is not the same as SPARK-6221 that asked for auto-merging output files.

Thanks,
John Zhuge


12