Sort-merge join improvement

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Sort-merge join improvement

pzecevic
Hello everybody

We (at University of Zagreb and University of Washington) have
implemented an optimization of Spark's sort-merge join (SMJ) which has
improved performance of our jobs considerably and we would like to know
if Spark community thinks it would be useful to include this in the main
distribution.

The problem we are solving is the case where you have two big tables
partitioned by X column, but also sorted by Y column (within partitions)
and you need to calculate an expensive function on the joined rows.
During a sort-merge join, Spark will do cross-joins of all rows that
have the same X values and calculate the function's value on all of
them. If the two tables have a large number of rows per X, this can
result in a huge number of calculations.

Our optimization allows you to reduce the number of matching rows per X
using a range condition on Y columns of the two tables. Something like:

... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d

The way SMJ is currently implemented, these extra conditions have no
influence on the number of rows (per X) being checked because these
extra conditions are put in the same block with the function being
calculated.

Our optimization changes the sort-merge join so that, when these extra
conditions are specified, a queue is used instead of the
ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a
moving window across the values from the right relation as the left row
changes. You could call this a combination of an equi-join and a theta
join (we call it "sort-merge inner range join").

Potential use-cases for this are joins based on spatial or temporal
distance calculations.

The optimization is triggered automatically when an equi-join expression
is present AND lower and upper range conditions on a secondary column
are specified. If the tables aren't sorted by both columns, appropriate
sorts will be added.


We have several questions:

1. Do you see any other way to optimize queries like these (eliminate
unnecessary calculations) without changing the sort-merge join algorithm?

2. We believe there is a more general pattern here and that this could
help in other similar situations where secondary sorting is available.
Would you agree?

3. Would you like us to open a JIRA ticket and create a pull request?

Thanks,

Petar Zecevic



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Sort-merge join improvement

pzecevic
As instructed offline, I opened a JIRA for this:

https://issues.apache.org/jira/browse/SPARK-24020

I will create a pull request soon.


Le 4/17/2018 à 6:21 PM, Petar Zecevic a écrit :

> Hello everybody
>
> We (at University of Zagreb and University of Washington) have
> implemented an optimization of Spark's sort-merge join (SMJ) which has
> improved performance of our jobs considerably and we would like to know
> if Spark community thinks it would be useful to include this in the main
> distribution.
>
> The problem we are solving is the case where you have two big tables
> partitioned by X column, but also sorted by Y column (within partitions)
> and you need to calculate an expensive function on the joined rows.
> During a sort-merge join, Spark will do cross-joins of all rows that
> have the same X values and calculate the function's value on all of
> them. If the two tables have a large number of rows per X, this can
> result in a huge number of calculations.
>
> Our optimization allows you to reduce the number of matching rows per X
> using a range condition on Y columns of the two tables. Something like:
>
> ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d
>
> The way SMJ is currently implemented, these extra conditions have no
> influence on the number of rows (per X) being checked because these
> extra conditions are put in the same block with the function being
> calculated.
>
> Our optimization changes the sort-merge join so that, when these extra
> conditions are specified, a queue is used instead of the
> ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a
> moving window across the values from the right relation as the left row
> changes. You could call this a combination of an equi-join and a theta
> join (we call it "sort-merge inner range join").
>
> Potential use-cases for this are joins based on spatial or temporal
> distance calculations.
>
> The optimization is triggered automatically when an equi-join expression
> is present AND lower and upper range conditions on a secondary column
> are specified. If the tables aren't sorted by both columns, appropriate
> sorts will be added.
>
>
> We have several questions:
>
> 1. Do you see any other way to optimize queries like these (eliminate
> unnecessary calculations) without changing the sort-merge join algorithm?
>
> 2. We believe there is a more general pattern here and that this could
> help in other similar situations where secondary sorting is available.
> Would you agree?
>
> 3. Would you like us to open a JIRA ticket and create a pull request?
>
> Thanks,
>
> Petar Zecevic
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Sort-merge join improvement

pzecevic
Hi,

the PR tests completed successfully
(https://github.com/apache/spark/pull/21109).

Can you please review the patch and merge it upstream if you think it's OK?

Thanks,

Petar


Le 4/18/2018 à 4:52 PM, Petar Zecevic a écrit :

> As instructed offline, I opened a JIRA for this:
>
> https://issues.apache.org/jira/browse/SPARK-24020
>
> I will create a pull request soon.
>
>
> Le 4/17/2018 à 6:21 PM, Petar Zecevic a écrit :
>> Hello everybody
>>
>> We (at University of Zagreb and University of Washington) have
>> implemented an optimization of Spark's sort-merge join (SMJ) which has
>> improved performance of our jobs considerably and we would like to know
>> if Spark community thinks it would be useful to include this in the main
>> distribution.
>>
>> The problem we are solving is the case where you have two big tables
>> partitioned by X column, but also sorted by Y column (within partitions)
>> and you need to calculate an expensive function on the joined rows.
>> During a sort-merge join, Spark will do cross-joins of all rows that
>> have the same X values and calculate the function's value on all of
>> them. If the two tables have a large number of rows per X, this can
>> result in a huge number of calculations.
>>
>> Our optimization allows you to reduce the number of matching rows per X
>> using a range condition on Y columns of the two tables. Something like:
>>
>> ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d
>>
>> The way SMJ is currently implemented, these extra conditions have no
>> influence on the number of rows (per X) being checked because these
>> extra conditions are put in the same block with the function being
>> calculated.
>>
>> Our optimization changes the sort-merge join so that, when these extra
>> conditions are specified, a queue is used instead of the
>> ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a
>> moving window across the values from the right relation as the left row
>> changes. You could call this a combination of an equi-join and a theta
>> join (we call it "sort-merge inner range join").
>>
>> Potential use-cases for this are joins based on spatial or temporal
>> distance calculations.
>>
>> The optimization is triggered automatically when an equi-join expression
>> is present AND lower and upper range conditions on a secondary column
>> are specified. If the tables aren't sorted by both columns, appropriate
>> sorts will be added.
>>
>>
>> We have several questions:
>>
>> 1. Do you see any other way to optimize queries like these (eliminate
>> unnecessary calculations) without changing the sort-merge join algorithm?
>>
>> 2. We believe there is a more general pattern here and that this could
>> help in other similar situations where secondary sorting is available.
>> Would you agree?
>>
>> 3. Would you like us to open a JIRA ticket and create a pull request?
>>
>> Thanks,
>>
>> Petar Zecevic
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Sort-merge join improvement

pzecevic
Based on some reviews I put additional effort into fixing the case when
wholestage codegen is turned off.

Sort-merge join with additional range conditions is now 10x faster (can
be more or less, depending on exact use-case) in both cases - with
wholestage turned off or on - compared to non-optimized SMJ.

Merging this would help us tremendously and I believe this can be useful
in other applications, too.

Can you please review (https://github.com/apache/spark/pull/21109) and
merge the patch?

Thank you,

Petar Zecevic


Le 4/23/2018 à 6:28 PM, Petar Zecevic a écrit :

> Hi,
>
> the PR tests completed successfully
> (https://github.com/apache/spark/pull/21109).
>
> Can you please review the patch and merge it upstream if you think it's OK?
>
> Thanks,
>
> Petar
>
>
> Le 4/18/2018 à 4:52 PM, Petar Zecevic a écrit :
>> As instructed offline, I opened a JIRA for this:
>>
>> https://issues.apache.org/jira/browse/SPARK-24020
>>
>> I will create a pull request soon.
>>
>>
>> Le 4/17/2018 à 6:21 PM, Petar Zecevic a écrit :
>>> Hello everybody
>>>
>>> We (at University of Zagreb and University of Washington) have
>>> implemented an optimization of Spark's sort-merge join (SMJ) which has
>>> improved performance of our jobs considerably and we would like to know
>>> if Spark community thinks it would be useful to include this in the main
>>> distribution.
>>>
>>> The problem we are solving is the case where you have two big tables
>>> partitioned by X column, but also sorted by Y column (within partitions)
>>> and you need to calculate an expensive function on the joined rows.
>>> During a sort-merge join, Spark will do cross-joins of all rows that
>>> have the same X values and calculate the function's value on all of
>>> them. If the two tables have a large number of rows per X, this can
>>> result in a huge number of calculations.
>>>
>>> Our optimization allows you to reduce the number of matching rows per X
>>> using a range condition on Y columns of the two tables. Something like:
>>>
>>> ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d
>>>
>>> The way SMJ is currently implemented, these extra conditions have no
>>> influence on the number of rows (per X) being checked because these
>>> extra conditions are put in the same block with the function being
>>> calculated.
>>>
>>> Our optimization changes the sort-merge join so that, when these extra
>>> conditions are specified, a queue is used instead of the
>>> ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a
>>> moving window across the values from the right relation as the left row
>>> changes. You could call this a combination of an equi-join and a theta
>>> join (we call it "sort-merge inner range join").
>>>
>>> Potential use-cases for this are joins based on spatial or temporal
>>> distance calculations.
>>>
>>> The optimization is triggered automatically when an equi-join expression
>>> is present AND lower and upper range conditions on a secondary column
>>> are specified. If the tables aren't sorted by both columns, appropriate
>>> sorts will be added.
>>>
>>>
>>> We have several questions:
>>>
>>> 1. Do you see any other way to optimize queries like these (eliminate
>>> unnecessary calculations) without changing the sort-merge join algorithm?
>>>
>>> 2. We believe there is a more general pattern here and that this could
>>> help in other similar situations where secondary sorting is available.
>>> Would you agree?
>>>
>>> 3. Would you like us to open a JIRA ticket and create a pull request?
>>>
>>> Thanks,
>>>
>>> Petar Zecevic
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: [hidden email]
>>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Sort-merge join improvement

pzecevic
Hi,

we went through a round of reviews on this PR. Performance improvements
can be substantial and there are unit and performance tests included.

One remark was that the amount of changed code is large but I don't see
how to reduce it and still keep the performance improvements. Besides,
all the new code is well contained in separate classes (unless it was
necessary to change existing ones).

So I believe this is ready to be merged.

Can some of the committers please take another look at this and accept
the PR?

Thank you,

Petar Zecevic


Le 5/15/2018 à 10:55 AM, Petar Zecevic a écrit :

> Based on some reviews I put additional effort into fixing the case when
> wholestage codegen is turned off.
>
> Sort-merge join with additional range conditions is now 10x faster (can
> be more or less, depending on exact use-case) in both cases - with
> wholestage turned off or on - compared to non-optimized SMJ.
>
> Merging this would help us tremendously and I believe this can be useful
> in other applications, too.
>
> Can you please review (https://github.com/apache/spark/pull/21109) and
> merge the patch?
>
> Thank you,
>
> Petar Zecevic
>
>
> Le 4/23/2018 à 6:28 PM, Petar Zecevic a écrit :
>> Hi,
>>
>> the PR tests completed successfully
>> (https://github.com/apache/spark/pull/21109).
>>
>> Can you please review the patch and merge it upstream if you think it's OK?
>>
>> Thanks,
>>
>> Petar
>>
>>
>> Le 4/18/2018 à 4:52 PM, Petar Zecevic a écrit :
>>> As instructed offline, I opened a JIRA for this:
>>>
>>> https://issues.apache.org/jira/browse/SPARK-24020
>>>
>>> I will create a pull request soon.
>>>
>>>
>>> Le 4/17/2018 à 6:21 PM, Petar Zecevic a écrit :
>>>> Hello everybody
>>>>
>>>> We (at University of Zagreb and University of Washington) have
>>>> implemented an optimization of Spark's sort-merge join (SMJ) which has
>>>> improved performance of our jobs considerably and we would like to know
>>>> if Spark community thinks it would be useful to include this in the main
>>>> distribution.
>>>>
>>>> The problem we are solving is the case where you have two big tables
>>>> partitioned by X column, but also sorted by Y column (within partitions)
>>>> and you need to calculate an expensive function on the joined rows.
>>>> During a sort-merge join, Spark will do cross-joins of all rows that
>>>> have the same X values and calculate the function's value on all of
>>>> them. If the two tables have a large number of rows per X, this can
>>>> result in a huge number of calculations.
>>>>
>>>> Our optimization allows you to reduce the number of matching rows per X
>>>> using a range condition on Y columns of the two tables. Something like:
>>>>
>>>> ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d
>>>>
>>>> The way SMJ is currently implemented, these extra conditions have no
>>>> influence on the number of rows (per X) being checked because these
>>>> extra conditions are put in the same block with the function being
>>>> calculated.
>>>>
>>>> Our optimization changes the sort-merge join so that, when these extra
>>>> conditions are specified, a queue is used instead of the
>>>> ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a
>>>> moving window across the values from the right relation as the left row
>>>> changes. You could call this a combination of an equi-join and a theta
>>>> join (we call it "sort-merge inner range join").
>>>>
>>>> Potential use-cases for this are joins based on spatial or temporal
>>>> distance calculations.
>>>>
>>>> The optimization is triggered automatically when an equi-join expression
>>>> is present AND lower and upper range conditions on a secondary column
>>>> are specified. If the tables aren't sorted by both columns, appropriate
>>>> sorts will be added.
>>>>
>>>>
>>>> We have several questions:
>>>>
>>>> 1. Do you see any other way to optimize queries like these (eliminate
>>>> unnecessary calculations) without changing the sort-merge join algorithm?
>>>>
>>>> 2. We believe there is a more general pattern here and that this could
>>>> help in other similar situations where secondary sorting is available.
>>>> Would you agree?
>>>>
>>>> 3. Would you like us to open a JIRA ticket and create a pull request?
>>>>
>>>> Thanks,
>>>>
>>>> Petar Zecevic
>>>>
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: [hidden email]
>>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: [hidden email]
>>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]