AQE effectiveness

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

AQE effectiveness

Koert Kuipers
we tend to have spark.sql.shuffle.partitions set very high by default simply because some jobs need it to be high and it's easier to then just set the default high instead of having people tune it manually per job. the main downside is lots of part files which leads to pressure on the driver, and dynamic allocation becomes troublesome if every aggregation requires thousands of tasks... even the simplest aggregation on tiny small data will demand all resources on the cluster.

because of these issues AQE appeals a lot to me: by automatically scaling the reducer partitions we avoid these issues. so we have AQE turned on by default. every once in a while i scan through our spark AMs and logs to see how it's doing. i mostly look for stages that have a number of tasks equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being effective. unfortunately this seems to be the majority. i suspect it has to do with caching/persisting which we use frequently. a simple reproduction is below.

any idea why caching/persisting would interfere with AQE?

best, koert

$ hadoop fs -text fruits.csv
fruit,color,quantity
apple,red,5
grape,blue,50
pear,green,3

# works well using AQE, uses 1 to 3 tasks per job
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").persist()
scala> data.groupBy("fruit").count().write.format("csv").save("out)

# does not work well using AQR, uses 200 tasks (e.g. spark.sql.shuffle.partitions) for certain jobs. the only difference is when persist is called.
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").groupBy("fruit").count().persist()
scala> data.write.format("csv").save("out)

Reply | Threaded
Open this post in threaded view
|

Re: AQE effectiveness

Maryann Xue
AQE has been turned off deliberately so that the `outputPartitioning` of the cached relation won't be changed by AQE partition coalescing or skew join optimization and the outputPartitioning can potentially be used by relations built on top of the cache.

On a second thought, we should probably add a config there and enable AQE by default.


Thanks,
Maryann

On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <[hidden email]> wrote:
we tend to have spark.sql.shuffle.partitions set very high by default simply because some jobs need it to be high and it's easier to then just set the default high instead of having people tune it manually per job. the main downside is lots of part files which leads to pressure on the driver, and dynamic allocation becomes troublesome if every aggregation requires thousands of tasks... even the simplest aggregation on tiny small data will demand all resources on the cluster.

because of these issues AQE appeals a lot to me: by automatically scaling the reducer partitions we avoid these issues. so we have AQE turned on by default. every once in a while i scan through our spark AMs and logs to see how it's doing. i mostly look for stages that have a number of tasks equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being effective. unfortunately this seems to be the majority. i suspect it has to do with caching/persisting which we use frequently. a simple reproduction is below.

any idea why caching/persisting would interfere with AQE?

best, koert

$ hadoop fs -text fruits.csv
fruit,color,quantity
apple,red,5
grape,blue,50
pear,green,3

# works well using AQE, uses 1 to 3 tasks per job
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").persist()
scala> data.groupBy("fruit").count().write.format("csv").save("out)

# does not work well using AQR, uses 200 tasks (e.g. spark.sql.shuffle.partitions) for certain jobs. the only difference is when persist is called.
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").groupBy("fruit").count().persist()
scala> data.write.format("csv").save("out)

Reply | Threaded
Open this post in threaded view
|

Re: AQE effectiveness

Koert Kuipers
i see. it makes sense to maximize re-use of cached data. i didn't realize we have two potentially conflicting goals here.


On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <[hidden email]> wrote:
AQE has been turned off deliberately so that the `outputPartitioning` of the cached relation won't be changed by AQE partition coalescing or skew join optimization and the outputPartitioning can potentially be used by relations built on top of the cache.

On a second thought, we should probably add a config there and enable AQE by default.


Thanks,
Maryann

On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <[hidden email]> wrote:
we tend to have spark.sql.shuffle.partitions set very high by default simply because some jobs need it to be high and it's easier to then just set the default high instead of having people tune it manually per job. the main downside is lots of part files which leads to pressure on the driver, and dynamic allocation becomes troublesome if every aggregation requires thousands of tasks... even the simplest aggregation on tiny small data will demand all resources on the cluster.

because of these issues AQE appeals a lot to me: by automatically scaling the reducer partitions we avoid these issues. so we have AQE turned on by default. every once in a while i scan through our spark AMs and logs to see how it's doing. i mostly look for stages that have a number of tasks equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being effective. unfortunately this seems to be the majority. i suspect it has to do with caching/persisting which we use frequently. a simple reproduction is below.

any idea why caching/persisting would interfere with AQE?

best, koert

$ hadoop fs -text fruits.csv
fruit,color,quantity
apple,red,5
grape,blue,50
pear,green,3

# works well using AQE, uses 1 to 3 tasks per job
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").persist()
scala> data.groupBy("fruit").count().write.format("csv").save("out)

# does not work well using AQR, uses 200 tasks (e.g. spark.sql.shuffle.partitions) for certain jobs. the only difference is when persist is called.
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").groupBy("fruit").count().persist()
scala> data.write.format("csv").save("out)

Reply | Threaded
Open this post in threaded view
|

Re: AQE effectiveness

Maryann Xue
No. The worst case of enabling AQE in cached data is not losing the opportunity of using/reusing the cache, but rather just an extra shuffle if the outputPartitioning happens to match without AQE and not match after AQE. The chance of this happening is rather low.

On Thu, Aug 20, 2020 at 12:09 PM Koert Kuipers <[hidden email]> wrote:
i see. it makes sense to maximize re-use of cached data. i didn't realize we have two potentially conflicting goals here.


On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <[hidden email]> wrote:
AQE has been turned off deliberately so that the `outputPartitioning` of the cached relation won't be changed by AQE partition coalescing or skew join optimization and the outputPartitioning can potentially be used by relations built on top of the cache.

On a second thought, we should probably add a config there and enable AQE by default.


Thanks,
Maryann

On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <[hidden email]> wrote:
we tend to have spark.sql.shuffle.partitions set very high by default simply because some jobs need it to be high and it's easier to then just set the default high instead of having people tune it manually per job. the main downside is lots of part files which leads to pressure on the driver, and dynamic allocation becomes troublesome if every aggregation requires thousands of tasks... even the simplest aggregation on tiny small data will demand all resources on the cluster.

because of these issues AQE appeals a lot to me: by automatically scaling the reducer partitions we avoid these issues. so we have AQE turned on by default. every once in a while i scan through our spark AMs and logs to see how it's doing. i mostly look for stages that have a number of tasks equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being effective. unfortunately this seems to be the majority. i suspect it has to do with caching/persisting which we use frequently. a simple reproduction is below.

any idea why caching/persisting would interfere with AQE?

best, koert

$ hadoop fs -text fruits.csv
fruit,color,quantity
apple,red,5
grape,blue,50
pear,green,3

# works well using AQE, uses 1 to 3 tasks per job
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").persist()
scala> data.groupBy("fruit").count().write.format("csv").save("out)

# does not work well using AQR, uses 200 tasks (e.g. spark.sql.shuffle.partitions) for certain jobs. the only difference is when persist is called.
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").groupBy("fruit").count().persist()
scala> data.write.format("csv").save("out)

Reply | Threaded
Open this post in threaded view
|

Re: AQE effectiveness

Maryann Xue
It would break CachedTableSuite."A cached table preserves the partitioning and ordering of its cached SparkPlan" if AQE was turned on.

Anyway, the chance of this outputPartitioning being useful is rather low and should not justify turning off AQE for SQL cache.

On Thu, Aug 20, 2020 at 10:54 PM Koert Kuipers <[hidden email]> wrote:
in our inhouse spark version i changed this without trouble and it didnt even break any tests
just some minor changes in CacheManager it seems

On Thu, Aug 20, 2020 at 1:12 PM Maryann Xue <[hidden email]> wrote:
No. The worst case of enabling AQE in cached data is not losing the opportunity of using/reusing the cache, but rather just an extra shuffle if the outputPartitioning happens to match without AQE and not match after AQE. The chance of this happening is rather low.

On Thu, Aug 20, 2020 at 12:09 PM Koert Kuipers <[hidden email]> wrote:
i see. it makes sense to maximize re-use of cached data. i didn't realize we have two potentially conflicting goals here.


On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <[hidden email]> wrote:
AQE has been turned off deliberately so that the `outputPartitioning` of the cached relation won't be changed by AQE partition coalescing or skew join optimization and the outputPartitioning can potentially be used by relations built on top of the cache.

On a second thought, we should probably add a config there and enable AQE by default.


Thanks,
Maryann

On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <[hidden email]> wrote:
we tend to have spark.sql.shuffle.partitions set very high by default simply because some jobs need it to be high and it's easier to then just set the default high instead of having people tune it manually per job. the main downside is lots of part files which leads to pressure on the driver, and dynamic allocation becomes troublesome if every aggregation requires thousands of tasks... even the simplest aggregation on tiny small data will demand all resources on the cluster.

because of these issues AQE appeals a lot to me: by automatically scaling the reducer partitions we avoid these issues. so we have AQE turned on by default. every once in a while i scan through our spark AMs and logs to see how it's doing. i mostly look for stages that have a number of tasks equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being effective. unfortunately this seems to be the majority. i suspect it has to do with caching/persisting which we use frequently. a simple reproduction is below.

any idea why caching/persisting would interfere with AQE?

best, koert

$ hadoop fs -text fruits.csv
fruit,color,quantity
apple,red,5
grape,blue,50
pear,green,3

# works well using AQE, uses 1 to 3 tasks per job
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").persist()
scala> data.groupBy("fruit").count().write.format("csv").save("out)

# does not work well using AQR, uses 200 tasks (e.g. spark.sql.shuffle.partitions) for certain jobs. the only difference is when persist is called.
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header", true).load("fruits.csv").groupBy("fruit").count().persist()
scala> data.write.format("csv").save("out)