Join Strategies

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Join Strategies

Marco Gaido
Hi dev,

I have a question about how join strategies are defined.

I see that CartesianProductExec is used only for InnerJoin, while for other kind of joins BroadcastNestedLoopJoinExec is used.
For reference:
https://github.com/apache/spark/blob/cd9f49a2aed3799964976ead06080a0f7044a0c3/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L260

May you kindly explain me why this is done? It doesn't seem a great choice to me, since BroadcastNestedLoopJoinExec can fail with OOM.

Thanks,
Marco



Reply | Threaded
Open this post in threaded view
|

Re: Join Strategies

Herman van Hövell tot Westerflier-2
Hey Marco,

A Cartesian product is an inner join by definition :). The current cartesian product operator does not support outer joins, so we use the only operator that does: BroadcastNestedLoopJoinExec. This is far from great, and it does have the potential to OOM, there are some safety nets in the driver that should start complaining before you actually OOM though.

An outer non-equi join is pretty hard to do in a distributed setting. This is caused by two things:
  • There is no way to partition the data in such a way that you can exploit some locality (know that all the same keys are in one partition), unless you use only one partition or use some clever index.
  • You need to keep track of records that do not match the join condition if you are doing a full join or a join in which the stream side does not match the join side. This is the number one source of complexity in the current join implementations. If you can partition your data then you can track and emit unmatched rows as part of processing the partition. If you cannot (and you have more than 1 partition) then you need to send the unmatched rows (in some form) back to the driver and figure out which records actually have not been matched (see BroadcastNestedLoopJoinExec for example).
It is definitely doable to implement a such a join, however I have not seen many JIRA's or user requests for this.

HTH

Herman


On Sat, Jan 13, 2018 at 6:41 AM, Marco Gaido <[hidden email]> wrote:
Hi dev,

I have a question about how join strategies are defined.

I see that CartesianProductExec is used only for InnerJoin, while for other kind of joins BroadcastNestedLoopJoinExec is used.
For reference:
https://github.com/apache/spark/blob/cd9f49a2aed3799964976ead06080a0f7044a0c3/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L260

May you kindly explain me why this is done? It doesn't seem a great choice to me, since BroadcastNestedLoopJoinExec can fail with OOM.

Thanks,
Marco