[Spark Core]: Adding support for size based partition coalescing

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark Core]: Adding support for size based partition coalescing

mhawes
Hi all, Sending this first before creating a jira issue in an effort to start
a discussion :)

Problem:

We have a situation where we end with a very large number (O(10K)) of
partitions, with very little data in most partitions but a lot of data in
some of them. This not only causes slow execution but we run into issues
like  SPARK-12837 <https://issues.apache.org/jira/browse/SPARK-12837>  .
Naturally the solution here is to coalesce these partitions, however it's
impossible for us to know upfront the size and number of partitions we're
going to get. Additionally this can change on each run. If we naively try
coalescing to say 50 partitions, we're likely to end up with very poorly
distributed data and potentially executor OOMs. This is because the coalesce
tries to balance number of partitions rather than partition size.

Proposed Solution

Therefore I was thinking about the possibility of contributing a feature
whereby a user could specify a target partition size and spark would aim to
get as close as possible to that. Something along these lines has been
partially attempted in  SPARK-14042
<https://issues.apache.org/jira/browse/SPARK-14042>   but this falls short
as there is no generic way (AFAIK) of getting access to the partition sizes
before the query execution. Thus I would suggest that we try do something
similar to AQEs "Post Shuffle Partition Coalescing" whereby we allow the
user to trigger an adaptive coalesce even without a shuffle? The optimum
partitions would be calculated dynamically at runtime. This way we dont pay
the price of shuffling but can still get reasonable partition sizes.

What do people think? Happy to clarify things upon request. :)

Best,
Matt



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark Core]: Adding support for size based partition coalescing

Pol Santamaria
Hi Matt,

I have encountered the same issue several times so I totally agree with you that it would be a useful addition to Spark. I frequently solve the unbalance by coding a custom partitioner which is far from ideal, since then I get down to RDDs. I don't know the Spark code base well enough to judge the complexity or how feasible it is though.

Bests,
Pol Santamaria

On Tue, Mar 30, 2021 at 1:30 PM mhawes <[hidden email]> wrote:
Hi all, Sending this first before creating a jira issue in an effort to start
a discussion :)

Problem:

We have a situation where we end with a very large number (O(10K)) of
partitions, with very little data in most partitions but a lot of data in
some of them. This not only causes slow execution but we run into issues
like  SPARK-12837 <https://issues.apache.org/jira/browse/SPARK-12837>  .
Naturally the solution here is to coalesce these partitions, however it's
impossible for us to know upfront the size and number of partitions we're
going to get. Additionally this can change on each run. If we naively try
coalescing to say 50 partitions, we're likely to end up with very poorly
distributed data and potentially executor OOMs. This is because the coalesce
tries to balance number of partitions rather than partition size.

Proposed Solution

Therefore I was thinking about the possibility of contributing a feature
whereby a user could specify a target partition size and spark would aim to
get as close as possible to that. Something along these lines has been
partially attempted in  SPARK-14042
<https://issues.apache.org/jira/browse/SPARK-14042>   but this falls short
as there is no generic way (AFAIK) of getting access to the partition sizes
before the query execution. Thus I would suggest that we try do something
similar to AQEs "Post Shuffle Partition Coalescing" whereby we allow the
user to trigger an adaptive coalesce even without a shuffle? The optimum
partitions would be calculated dynamically at runtime. This way we dont pay
the price of shuffling but can still get reasonable partition sizes.

What do people think? Happy to clarify things upon request. :)

Best,
Matt



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark Core]: Adding support for size based partition coalescing

mhawes
Hi Pol, I had considered repartitioning but the main issue for me there is
that it will trigger a shuffle and could significantly slow down the
query/application as a result. Thanks for contributing that as an
alternative suggestion though :)



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark Core]: Adding support for size based partition coalescing

angers.zhu
Hi all,

If you need I can raise a pr  add a SizeBasedCoaleaser

mhawes <[hidden email]> 于2021年3月30日周二 下午9:06写道:
Hi Pol, I had considered repartitioning but the main issue for me there is
that it will trigger a shuffle and could significantly slow down the
query/application as a result. Thanks for contributing that as an
alternative suggestion though :)



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark Core]: Adding support for size based partition coalescing

mhawes
Hi angers.zhu,

Thanks for pointing me towards that PR, I think the main issue there is that
the coalesce operation requires an additional computation which in this case
is undesirable. It also approximates the row size rather than just directly
using the partition size. Thus it has the potential to produce Executor OOMs
or suboptimal partitionings.

I'm thinking of something along the lines of what  CoalesceShufflePartitions
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala>  
does during adaptive query execution. This class just uses the `mapStats`
from the shuffle stages to find out the partition sizes and I'm wondering if
we can do something similar "on demand" i.e. a user can request an
adaptively run coalesce by calling `dataset.adaptiveCoalesce()` or similar.

Does that make sense? I'm hoping that one of the spark core contributors can
weigh in on the feasibility of this and whether this is the way they would
think about solving the issue described in my first post?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark Core]: Adding support for size based partition coalescing

mhawes
Okay from looking closer at some of the code, I'm not sure that what I'm
asking for in terms of adaptive execution makes much sense as it can only
happen between stages. I.e. optimising future /stages/ based on the results
of previous stages. Thus an "on-demand" adaptive coalesce doesn't make much
sense as it wouldn't necessarily occur at a stage boundary.

However I think my original question still stands of:
- How to /dynamically/ deal with poorly partitioned data without incurring a
shuffle or extra computation.

I think the only thing that's changed is that I no longer have any good
ideas on how to do it :/



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark Core]: Adding support for size based partition coalescing

German Schiavon
Hi!

have you tried spark.sql.files.maxRecordsPerFile ? 

As a workaround you could try to see how many rows are 128MB and then set that number in that property.

Best


On Thu, 1 Apr 2021 at 00:38, mhawes <[hidden email]> wrote:
Okay from looking closer at some of the code, I'm not sure that what I'm
asking for in terms of adaptive execution makes much sense as it can only
happen between stages. I.e. optimising future /stages/ based on the results
of previous stages. Thus an "on-demand" adaptive coalesce doesn't make much
sense as it wouldn't necessarily occur at a stage boundary.

However I think my original question still stands of:
- How to /dynamically/ deal with poorly partitioned data without incurring a
shuffle or extra computation.

I think the only thing that's changed is that I no longer have any good
ideas on how to do it :/



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]