SortMergeJoinExec: Utilizing child partitioning when joining

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

SortMergeJoinExec: Utilizing child partitioning when joining

Brett Marcott
Hi all,

I found this jira for an issue I ran into recently:

My initial idea for a fix is to change SortMergeJoinExec's (and ShuffledHashJoinExec) requiredChildDistribution.

At least if all below conditions are met, we could only require a subset of keys for partitioning:
left and right children's output partitionings are hashpartitioning with same numpartitions
left and right partition expressions have the same subset (with regards to indices) of their respective join keys

If that subset of keys is returned by requiredChildDistribution, then EnsureRequirements.ensureDistributionAndOrdering would not add a shuffle stage, hence reusing the children's partitioning.

1.Thoughts on this approach?

2. Could someone help explain why the different join types have different output partitionings in SortMergeJoinExec.outputPartitioning?

Thanks,
Brett


Reply | Threaded
Open this post in threaded view
|

Re: SortMergeJoinExec: Utilizing child partitioning when joining

Long, Andrew-2

Thoughts on this approach?

 

Just to warn you this is a hazardous optimization without cardinality information. Removing columns from the hash exchange reduces entropy potentially resulting in skew. Also keep in mind that if you reduce the number of columns on one side of the join you need todo it on the other. This will require you to rewrite EnsureRequirements or add a special case to detect this.

 

As a word of warning there’s a whole bunch of subtle things that EnsureRequirements is doing and its really easy to unintentionally create performance regressions while making improvements in other areas.

 

Could someone help explain why the different join types have different output partitionings

 

Long story short when a join happens the join exec zips together the partitions of the left and right side so that one partition of the join has the elements of the left and right.  In the case of an inner join this means that that the resulting RDD is now partitioned by both the left join keys and the right join keys.  I’d suggest taking a look at the join execs and take a look at how they build the result RDD from the partitions of the left and right RDDs.(see doExecute(…))  left/right outer does look surprising though.

 

You should see something like…

 

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>

 

 

Cheers Andrew

 

From: Brett Marcott <[hidden email]>
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "[hidden email]" <[hidden email]>
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

 

Hi all,

 

I found this jira for an issue I ran into recently:

 

My initial idea for a fix is to change SortMergeJoinExec's (and ShuffledHashJoinExec) requiredChildDistribution.

 

At least if all below conditions are met, we could only require a subset of keys for partitioning:

left and right children's output partitionings are hashpartitioning with same numpartitions

left and right partition expressions have the same subset (with regards to indices) of their respective join keys

 

If that subset of keys is returned by requiredChildDistribution, then EnsureRequirements.ensureDistributionAndOrdering would not add a shuffle stage, hence reusing the children's partitioning.

 

1.Thoughts on this approach?

 

2. Could someone help explain why the different join types have different output partitionings in SortMergeJoinExec.outputPartitioning?

 

Thanks,

Brett

 

 

Reply | Threaded
Open this post in threaded view
|

Re: SortMergeJoinExec: Utilizing child partitioning when joining

Brett Marcott
Thanks for the response Andrew.

1. The approach
The approach I mentioned will not introduce any new skew, so it should only be worsen performance if the user was relying on the shuffle to fix skew they had before.
The user can address this by either not introducing their own skewed partition in the first place, or repartitioning with less skew again before the join.
Today the user cannot change partitioning without changing the join condition in a hacky way:    joinkey1 >= joinkey2 && joinkey1 <= joinkey2

The condition I mentioned below ensures that the same keys on left and right formed their respective subsets:
      left and right partition expressions have the same subset (with regards to indices) of their respective join keys

I don't believe EnsureRequirements will require any changes, just what the Exec's are saying is required.

2. output partitionings
Yea I got as far as you mentioned, but I didn't at first get why for outer joins only one side is used.
Now however, I think it makes sense because for outer joins you may be introducing nulls for at least one side, which makes that sides partitioning invalid right?

Warn New Year Regards,
Brett

On Thu, Jan 2, 2020 at 2:28 PM Long, Andrew <[hidden email]> wrote:

Thoughts on this approach?

 

Just to warn you this is a hazardous optimization without cardinality information. Removing columns from the hash exchange reduces entropy potentially resulting in skew. Also keep in mind that if you reduce the number of columns on one side of the join you need todo it on the other. This will require you to rewrite EnsureRequirements or add a special case to detect this.

 

As a word of warning there’s a whole bunch of subtle things that EnsureRequirements is doing and its really easy to unintentionally create performance regressions while making improvements in other areas.

 

Could someone help explain why the different join types have different output partitionings

 

Long story short when a join happens the join exec zips together the partitions of the left and right side so that one partition of the join has the elements of the left and right.  In the case of an inner join this means that that the resulting RDD is now partitioned by both the left join keys and the right join keys.  I’d suggest taking a look at the join execs and take a look at how they build the result RDD from the partitions of the left and right RDDs.(see doExecute(…))  left/right outer does look surprising though.

 

You should see something like…

 

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>

 

 

Cheers Andrew

 

From: Brett Marcott <[hidden email]>
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "[hidden email]" <[hidden email]>
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

 

Hi all,

 

I found this jira for an issue I ran into recently:

 

My initial idea for a fix is to change SortMergeJoinExec's (and ShuffledHashJoinExec) requiredChildDistribution.

 

At least if all below conditions are met, we could only require a subset of keys for partitioning:

left and right children's output partitionings are hashpartitioning with same numpartitions

left and right partition expressions have the same subset (with regards to indices) of their respective join keys

 

If that subset of keys is returned by requiredChildDistribution, then EnsureRequirements.ensureDistributionAndOrdering would not add a shuffle stage, hence reusing the children's partitioning.

 

1.Thoughts on this approach?

 

2. Could someone help explain why the different join types have different output partitionings in SortMergeJoinExec.outputPartitioning?

 

Thanks,

Brett

 

 

Reply | Threaded
Open this post in threaded view
|

Re: SortMergeJoinExec: Utilizing child partitioning when joining

Brett Marcott
1. Where can I find information on how to run standard performance tests/benchmarks?
2. Are performance degradations to existing queries that are fixable by new equivalent queries not allowed for a new major spark version? 

On Thu, Jan 2, 2020 at 3:05 PM Brett Marcott <[hidden email]> wrote:
Thanks for the response Andrew.

1. The approach
The approach I mentioned will not introduce any new skew, so it should only be worsen performance if the user was relying on the shuffle to fix skew they had before.
The user can address this by either not introducing their own skewed partition in the first place, or repartitioning with less skew again before the join.
Today the user cannot change partitioning without changing the join condition in a hacky way:    joinkey1 >= joinkey2 && joinkey1 <= joinkey2

The condition I mentioned below ensures that the same keys on left and right formed their respective subsets:
      left and right partition expressions have the same subset (with regards to indices) of their respective join keys

I don't believe EnsureRequirements will require any changes, just what the Exec's are saying is required.

2. output partitionings
Yea I got as far as you mentioned, but I didn't at first get why for outer joins only one side is used.
Now however, I think it makes sense because for outer joins you may be introducing nulls for at least one side, which makes that sides partitioning invalid right?

Warn New Year Regards,
Brett

On Thu, Jan 2, 2020 at 2:28 PM Long, Andrew <[hidden email]> wrote:

Thoughts on this approach?

 

Just to warn you this is a hazardous optimization without cardinality information. Removing columns from the hash exchange reduces entropy potentially resulting in skew. Also keep in mind that if you reduce the number of columns on one side of the join you need todo it on the other. This will require you to rewrite EnsureRequirements or add a special case to detect this.

 

As a word of warning there’s a whole bunch of subtle things that EnsureRequirements is doing and its really easy to unintentionally create performance regressions while making improvements in other areas.

 

Could someone help explain why the different join types have different output partitionings

 

Long story short when a join happens the join exec zips together the partitions of the left and right side so that one partition of the join has the elements of the left and right.  In the case of an inner join this means that that the resulting RDD is now partitioned by both the left join keys and the right join keys.  I’d suggest taking a look at the join execs and take a look at how they build the result RDD from the partitions of the left and right RDDs.(see doExecute(…))  left/right outer does look surprising though.

 

You should see something like…

 

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>

 

 

Cheers Andrew

 

From: Brett Marcott <[hidden email]>
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "[hidden email]" <[hidden email]>
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

 

Hi all,

 

I found this jira for an issue I ran into recently:

 

My initial idea for a fix is to change SortMergeJoinExec's (and ShuffledHashJoinExec) requiredChildDistribution.

 

At least if all below conditions are met, we could only require a subset of keys for partitioning:

left and right children's output partitionings are hashpartitioning with same numpartitions

left and right partition expressions have the same subset (with regards to indices) of their respective join keys

 

If that subset of keys is returned by requiredChildDistribution, then EnsureRequirements.ensureDistributionAndOrdering would not add a shuffle stage, hence reusing the children's partitioning.

 

1.Thoughts on this approach?

 

2. Could someone help explain why the different join types have different output partitionings in SortMergeJoinExec.outputPartitioning?

 

Thanks,

Brett

 

 

Reply | Threaded
Open this post in threaded view
|

Re: SortMergeJoinExec: Utilizing child partitioning when joining

Long, Andrew-2

“Where can I find information on how to run standard performance tests/benchmarks?“

 

The grand standard is spark-sql-perf and in particular the tpc-ds benchmark. Most of the big optimization teams are using this as the primary benchmark.  One word of warning is that most groups have also extended this to add entirely new types of benchmarks which are not in open source but the core tpc-ds benchmark will get you most of the way there.

 

https://aws.amazon.com/blogs/big-data/amazon-emr-introduces-emr-runtime-for-apache-spark/

https://github.com/databricks/spark-sql-perf

http://www.tpc.org/tpcds/

 

“Are performance degradations to existing queries that are fixable by new equivalent queries not allowed for a new major spark version”

 

The general rule of thumb for my group (which is NOT databricks) is, as long as the geomean of tpcds increases you’re fine as long as you don’t break any existing queries.  For example regressing a couple queries by 5% is fine BUT causing a query that would have previously run to crash is not ok. Additionally we have a sample of user queries +etl processes that we try not to break either.

 

Cheers Andrew

 

From: Brett Marcott <[hidden email]>
Date: Tuesday, January 7, 2020 at 12:00 AM
To: "Long, Andrew" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: SortMergeJoinExec: Utilizing child partitioning when joining

 

1. Where can I find information on how to run standard performance tests/benchmarks?

2. Are performance degradations to existing queries that are fixable by new equivalent queries not allowed for a new major spark version? 

 

On Thu, Jan 2, 2020 at 3:05 PM Brett Marcott <[hidden email]> wrote:

Thanks for the response Andrew.

 

1. The approach

The approach I mentioned will not introduce any new skew, so it should only be worsen performance if the user was relying on the shuffle to fix skew they had before.

The user can address this by either not introducing their own skewed partition in the first place, or repartitioning with less skew again before the join.

Today the user cannot change partitioning without changing the join condition in a hacky way:    joinkey1 >= joinkey2 && joinkey1 <= joinkey2

 

The condition I mentioned below ensures that the same keys on left and right formed their respective subsets:

      left and right partition expressions have the same subset (with regards to indices) of their respective join keys

 

I don't believe EnsureRequirements will require any changes, just what the Exec's are saying is required.

 

2. output partitionings

Yea I got as far as you mentioned, but I didn't at first get why for outer joins only one side is used.

Now however, I think it makes sense because for outer joins you may be introducing nulls for at least one side, which makes that sides partitioning invalid right?

 

Warn New Year Regards,

Brett

 

On Thu, Jan 2, 2020 at 2:28 PM Long, Andrew <[hidden email]> wrote:

Thoughts on this approach?

 

Just to warn you this is a hazardous optimization without cardinality information. Removing columns from the hash exchange reduces entropy potentially resulting in skew. Also keep in mind that if you reduce the number of columns on one side of the join you need todo it on the other. This will require you to rewrite EnsureRequirements or add a special case to detect this.

 

As a word of warning there’s a whole bunch of subtle things that EnsureRequirements is doing and its really easy to unintentionally create performance regressions while making improvements in other areas.

 

Could someone help explain why the different join types have different output partitionings

 

Long story short when a join happens the join exec zips together the partitions of the left and right side so that one partition of the join has the elements of the left and right.  In the case of an inner join this means that that the resulting RDD is now partitioned by both the left join keys and the right join keys.  I’d suggest taking a look at the join execs and take a look at how they build the result RDD from the partitions of the left and right RDDs.(see doExecute(…))  left/right outer does look surprising though.

 

You should see something like…

 

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>

 

 

Cheers Andrew

 

From: Brett Marcott <[hidden email]>
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "[hidden email]" <[hidden email]>
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

 

Hi all,

 

I found this jira for an issue I ran into recently:

 

My initial idea for a fix is to change SortMergeJoinExec's (and ShuffledHashJoinExec) requiredChildDistribution.

 

At least if all below conditions are met, we could only require a subset of keys for partitioning:

left and right children's output partitionings are hashpartitioning with same numpartitions

left and right partition expressions have the same subset (with regards to indices) of their respective join keys

 

If that subset of keys is returned by requiredChildDistribution, then EnsureRequirements.ensureDistributionAndOrdering would not add a shuffle stage, hence reusing the children's partitioning.

 

1.Thoughts on this approach?

 

2. Could someone help explain why the different join types have different output partitionings in SortMergeJoinExec.outputPartitioning?

 

Thanks,

Brett