How to force sorted merge join to broadcast join

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to force sorted merge join to broadcast join

zhangliyun
Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A has problem.

A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark sql to use 
broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and i did not know 
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to
broadcast join.  


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))), DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))), DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang


 

Reply | Threaded
Open this post in threaded view
|

Re: How to force sorted merge join to broadcast join

Rubén Berenguel
Hi, I hope this answers your question.

You can hint the broadcast in SQL as detailed here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html (thanks Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the join (for clarity). Also keep in mind using the methods is more powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it depends on personal preference).

Regards, 

Ruben

-- 
Rubén Berenguel

On 29 July 2019 at 07:12:30, zhangliyun ([hidden email]) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A has problem.

A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark sql to use 
broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and i did not know 
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to
broadcast join.  


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))), DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))), DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang


 

Reply | Threaded
Open this post in threaded view
|

Re:Re: How to force sorted merge join to broadcast join

zhangliyun
thks! after using the syntax provided in the link, select /*+ BROADCAST (A) */ ...  , i got what i want.
but i want to ask beside using queryExecution.stringWithStats (dataframe api) to show the table statistics, is there any way to show the table statistics in explain xxx in spark sql command line?

Best Regards
Kelly



在 2019-07-29 14:29:50,"Rubén Berenguel" <[hidden email]> 写道:
Hi, I hope this answers your question.

You can hint the broadcast in SQL as detailed here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html (thanks Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the join (for clarity). Also keep in mind using the methods is more powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it depends on personal preference).

Regards, 

Ruben

-- 
Rubén Berenguel

On 29 July 2019 at 07:12:30, zhangliyun ([hidden email]) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A has problem.

A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark sql to use 
broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and i did not know 
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to
broadcast join.  


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))), DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))), DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang


 



 

Reply | Threaded
Open this post in threaded view
|

Re:Re: How to force sorted merge join to broadcast join

Rubén Berenguel
I think there is no way of doing that (at least don't remember one right now). The closer I remember now, is you can run the SQL "ANALYZE TABLE table_name COMPUTE STATISTIC" to compute them regardless of having a query (also hints the cost based optimiser if I remember correctly), but as far as displaying them it escapes me right now if it can be done.

R

-- 
Rubén Berenguel

On 29 July 2019 at 11:03:13, zhangliyun ([hidden email]) wrote:

thks! after using the syntax provided in the link, select /*+ BROADCAST (A) */ ...  , i got what i want.
but i want to ask beside using queryExecution.stringWithStats (dataframe api) to show the table statistics, is there any way to show the table statistics in explain xxx in spark sql command line?

Best Regards
Kelly



在 2019-07-29 14:29:50,"Rubén Berenguel" <[hidden email]> 写道:
Hi, I hope this answers your question.

You can hint the broadcast in SQL as detailed here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html (thanks Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the join (for clarity). Also keep in mind using the methods is more powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it depends on personal preference).

Regards, 

Ruben

-- 
Rubén Berenguel

On 29 July 2019 at 07:12:30, zhangliyun ([hidden email]) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A has problem.

A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark sql to use 
broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and i did not know 
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to
broadcast join.  


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))), DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))), DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang


 



 

Reply | Threaded
Open this post in threaded view
|

Re: Re: How to force sorted merge join to broadcast join

cloud0fan
You can try EXPLAIN COST query and see if it works for you.

On Mon, Jul 29, 2019 at 5:34 PM Rubén Berenguel <[hidden email]> wrote:
I think there is no way of doing that (at least don't remember one right now). The closer I remember now, is you can run the SQL "ANALYZE TABLE table_name COMPUTE STATISTIC" to compute them regardless of having a query (also hints the cost based optimiser if I remember correctly), but as far as displaying them it escapes me right now if it can be done.

R

-- 
Rubén Berenguel

On 29 July 2019 at 11:03:13, zhangliyun ([hidden email]) wrote:

thks! after using the syntax provided in the link, select /*+ BROADCAST (A) */ ...  , i got what i want.
but i want to ask beside using queryExecution.stringWithStats (dataframe api) to show the table statistics, is there any way to show the table statistics in explain xxx in spark sql command line?

Best Regards
Kelly



在 2019-07-29 14:29:50,"Rubén Berenguel" <[hidden email]> 写道:
Hi, I hope this answers your question.

You can hint the broadcast in SQL as detailed here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html (thanks Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the join (for clarity). Also keep in mind using the methods is more powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it depends on personal preference).

Regards, 

Ruben

-- 
Rubén Berenguel

On 29 July 2019 at 07:12:30, zhangliyun ([hidden email]) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A has problem.

A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark sql to use 
broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and i did not know 
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to
broadcast join.  


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))), DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))), DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang