Quantcast

New Optimizer Hint

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

New Optimizer Hint

ptkool
Hello, 

I am in the process of putting together a PR that introduces a new hint called NO_COLLAPSE. This hint is essentially identical to Oracle's NO_MERGE hint. 

Let me first give an example of why I am proposing this. 

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"]) 
df2 = df1.withColumn("ua", user_agent_details(df1["user_agent"])) 
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"), df2["ua"].browser_version.alias("c2")) 
df3.explain(True) 

== Parsed Logical Plan == 
'Project [ua#85[device_form_factor] AS c1#90, ua#85[browser_version] AS c2#91] 
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85] 
   +- LogicalRDD [id#80L, user_agent#81] 

== Analyzed Logical Plan == 
c1: string, c2: string 
Project [ua#85.device_form_factor AS c1#90, ua#85.browser_version AS c2#91] 
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85] 
   +- LogicalRDD [id#80L, user_agent#81] 

== Optimized Logical Plan == 
Project [UDF(user_agent#81).device_form_factor AS c1#90, UDF(user_agent#81).browser_version AS c2#91] 
+- LogicalRDD [id#80L, user_agent#81] 

== Physical Plan == 
*Project [UDF(user_agent#81).device_form_factor AS c1#90, UDF(user_agent#81).browser_version AS c2#91] 
+- Scan ExistingRDD[id#80L,user_agent#81] 

user_agent_details is a user-defined function that returns a struct. As can be seen from the generated query plan, the function is being executed multiple times which could lead to performance issues. This is due to the CollapseProject optimizer rule that collapses adjacent projections. 

I'm proposing a hint that prevent the optimizer from collapsing adjacent projections. A new function called 'no_collapse' would be introduced for this purpose. Consider the following example and generated query plan. 

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"]) 
df2 = F.no_collapse(df1.withColumn("ua", user_agent_details(df1["user_agent"]))) 
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"), df2["ua"].browser_version.alias("c2")) 
df3.explain(True) 

== Parsed Logical Plan == 
'Project [ua#69[device_form_factor] AS c1#75, ua#69[browser_version] AS c2#76] 
+- NoCollapseHint 
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Analyzed Logical Plan == 
c1: string, c2: string 
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- NoCollapseHint 
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Optimized Logical Plan == 
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- NoCollapseHint 
   +- Project [UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Physical Plan == 
*Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- *Project [UDF(user_agent#65) AS ua#69] 
   +- Scan ExistingRDD[id#64L,user_agent#65] 

As can be seen from the query plan, the user-defined function is now evaluated once per row. 

I would like to get some feedback on this proposal. 

Thanks.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: New Optimizer Hint

Herman van Hövell tot Westerflier-2
Hi Michael,

This sounds like a good idea. Can you open a JIRA to track this?

My initial feedback on your proposal would be that you might want to express the no_collapse at the expression level and not at the plan level.

HTH

On Thu, Apr 20, 2017 at 3:31 PM, Michael Styles <[hidden email]> wrote:
Hello, 

I am in the process of putting together a PR that introduces a new hint called NO_COLLAPSE. This hint is essentially identical to Oracle's NO_MERGE hint. 

Let me first give an example of why I am proposing this. 

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"]) 
df2 = df1.withColumn("ua", user_agent_details(df1["user_agent"])) 
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"), df2["ua"].browser_version.alias("c2")) 
df3.explain(True) 

== Parsed Logical Plan == 
'Project [ua#85[device_form_factor] AS c1#90, ua#85[browser_version] AS c2#91] 
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85] 
   +- LogicalRDD [id#80L, user_agent#81] 

== Analyzed Logical Plan == 
c1: string, c2: string 
Project [ua#85.device_form_factor AS c1#90, ua#85.browser_version AS c2#91] 
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85] 
   +- LogicalRDD [id#80L, user_agent#81] 

== Optimized Logical Plan == 
Project [UDF(user_agent#81).device_form_factor AS c1#90, UDF(user_agent#81).browser_version AS c2#91] 
+- LogicalRDD [id#80L, user_agent#81] 

== Physical Plan == 
*Project [UDF(user_agent#81).device_form_factor AS c1#90, UDF(user_agent#81).browser_version AS c2#91] 
+- Scan ExistingRDD[id#80L,user_agent#81] 

user_agent_details is a user-defined function that returns a struct. As can be seen from the generated query plan, the function is being executed multiple times which could lead to performance issues. This is due to the CollapseProject optimizer rule that collapses adjacent projections. 

I'm proposing a hint that prevent the optimizer from collapsing adjacent projections. A new function called 'no_collapse' would be introduced for this purpose. Consider the following example and generated query plan. 

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"]) 
df2 = F.no_collapse(df1.withColumn("ua", user_agent_details(df1["user_agent"]))) 
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"), df2["ua"].browser_version.alias("c2")) 
df3.explain(True) 

== Parsed Logical Plan == 
'Project [ua#69[device_form_factor] AS c1#75, ua#69[browser_version] AS c2#76] 
+- NoCollapseHint 
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Analyzed Logical Plan == 
c1: string, c2: string 
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- NoCollapseHint 
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Optimized Logical Plan == 
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- NoCollapseHint 
   +- Project [UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Physical Plan == 
*Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- *Project [UDF(user_agent#65) AS ua#69] 
   +- Scan ExistingRDD[id#64L,user_agent#65] 

As can be seen from the query plan, the user-defined function is now evaluated once per row. 

I would like to get some feedback on this proposal. 

Thanks.




--

Herman van Hövell

Software Engineer

Databricks Inc.

[hidden email]

+31 6 420 590 27

databricks.com

http://databricks.com


Join Databricks at Spark Summit 2017 in San Francisco, the world's largest event for the Apache Spark community.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: New Optimizer Hint

rxin
Doesn't common sub expression elimination address this issue as well?

On Thu, Apr 20, 2017 at 6:40 AM Herman van Hövell tot Westerflier <[hidden email]> wrote:
Hi Michael,

This sounds like a good idea. Can you open a JIRA to track this?

My initial feedback on your proposal would be that you might want to express the no_collapse at the expression level and not at the plan level.

HTH

On Thu, Apr 20, 2017 at 3:31 PM, Michael Styles <[hidden email]> wrote:
Hello, 

I am in the process of putting together a PR that introduces a new hint called NO_COLLAPSE. This hint is essentially identical to Oracle's NO_MERGE hint. 

Let me first give an example of why I am proposing this. 

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"]) 
df2 = df1.withColumn("ua", user_agent_details(df1["user_agent"])) 
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"), df2["ua"].browser_version.alias("c2")) 
df3.explain(True) 

== Parsed Logical Plan == 
'Project [ua#85[device_form_factor] AS c1#90, ua#85[browser_version] AS c2#91] 
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85] 
   +- LogicalRDD [id#80L, user_agent#81] 

== Analyzed Logical Plan == 
c1: string, c2: string 
Project [ua#85.device_form_factor AS c1#90, ua#85.browser_version AS c2#91] 
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85] 
   +- LogicalRDD [id#80L, user_agent#81] 

== Optimized Logical Plan == 
Project [UDF(user_agent#81).device_form_factor AS c1#90, UDF(user_agent#81).browser_version AS c2#91] 
+- LogicalRDD [id#80L, user_agent#81] 

== Physical Plan == 
*Project [UDF(user_agent#81).device_form_factor AS c1#90, UDF(user_agent#81).browser_version AS c2#91] 
+- Scan ExistingRDD[id#80L,user_agent#81] 

user_agent_details is a user-defined function that returns a struct. As can be seen from the generated query plan, the function is being executed multiple times which could lead to performance issues. This is due to the CollapseProject optimizer rule that collapses adjacent projections. 

I'm proposing a hint that prevent the optimizer from collapsing adjacent projections. A new function called 'no_collapse' would be introduced for this purpose. Consider the following example and generated query plan. 

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"]) 
df2 = F.no_collapse(df1.withColumn("ua", user_agent_details(df1["user_agent"]))) 
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"), df2["ua"].browser_version.alias("c2")) 
df3.explain(True) 

== Parsed Logical Plan == 
'Project [ua#69[device_form_factor] AS c1#75, ua#69[browser_version] AS c2#76] 
+- NoCollapseHint 
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Analyzed Logical Plan == 
c1: string, c2: string 
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- NoCollapseHint 
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Optimized Logical Plan == 
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- NoCollapseHint 
   +- Project [UDF(user_agent#65) AS ua#69] 
      +- LogicalRDD [id#64L, user_agent#65] 

== Physical Plan == 
*Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76] 
+- *Project [UDF(user_agent#65) AS ua#69] 
   +- Scan ExistingRDD[id#64L,user_agent#65] 

As can be seen from the query plan, the user-defined function is now evaluated once per row. 

I would like to get some feedback on this proposal. 

Thanks.




--

Herman van Hövell

Software Engineer

Databricks Inc.

[hidden email]

+31 6 420 590 27

databricks.com

http://databricks.com


Join Databricks at Spark Summit 2017 in San Francisco, the world's largest event for the Apache Spark community.

Loading...