Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

Li Jin
Hi All,

Sorry for the long email title. I am a bit surprised to find that the current optimizer rule "ConvertToLocalRelation" causes expressions to be eager-evaluated in planning phase, this can be demonstrated with the following code:

scala> val myUDF = udf((x: String) => { println("UDF evaled"); "result" })

myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))

df: org.apache.spark.sql.DataFrame = [UDF(s): string]


scala> println(df.queryExecution.optimizedPlan)

UDF evaled

LocalRelation [UDF(s)#9]


 This is somewhat unexpected to me because of Spark's lazy execution model.

I am wondering if this behavior is by design?

Thanks!
Li


Reply | Threaded
Open this post in threaded view
|

Re: Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

rxin
But from the user's perspective, optimization is not run right? So it is still lazy.


On Fri, Jun 8, 2018 at 12:35 PM Li Jin <[hidden email]> wrote:
Hi All,

Sorry for the long email title. I am a bit surprised to find that the current optimizer rule "ConvertToLocalRelation" causes expressions to be eager-evaluated in planning phase, this can be demonstrated with the following code:

scala> val myUDF = udf((x: String) => { println("UDF evaled"); "result" })

myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))

df: org.apache.spark.sql.DataFrame = [UDF(s): string]


scala> println(df.queryExecution.optimizedPlan)

UDF evaled

LocalRelation [UDF(s)#9]


 This is somewhat unexpected to me because of Spark's lazy execution model.

I am wondering if this behavior is by design?

Thanks!
Li


Reply | Threaded
Open this post in threaded view
|

Re: Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

Li Jin
I see. Thanks for the clarification. It's not a a big issue but I am surprised my UDF can be executed in planning phase. If my UDF is doing something expensive it could get weird.



On Fri, Jun 8, 2018 at 3:44 PM, Reynold Xin <[hidden email]> wrote:
But from the user's perspective, optimization is not run right? So it is still lazy.


On Fri, Jun 8, 2018 at 12:35 PM Li Jin <[hidden email]> wrote:
Hi All,

Sorry for the long email title. I am a bit surprised to find that the current optimizer rule "ConvertToLocalRelation" causes expressions to be eager-evaluated in planning phase, this can be demonstrated with the following code:

scala> val myUDF = udf((x: String) => { println("UDF evaled"); "result" })

myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))

df: org.apache.spark.sql.DataFrame = [UDF(s): string]


scala> println(df.queryExecution.optimizedPlan)

UDF evaled

LocalRelation [UDF(s)#9]


 This is somewhat unexpected to me because of Spark's lazy execution model.

I am wondering if this behavior is by design?

Thanks!
Li



Reply | Threaded
Open this post in threaded view
|

Re: Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

Li Jin
Sorry I am confused now... My UDF gets executed for each row anyway (because I am doing with column and want to execute the UDF with each row). The difference is that with the optimization "ConvertToLocalRelation" it gets executed for each row on the driver in the optimization stage?

On Fri, Jun 8, 2018 at 3:57 PM, Herman van Hövell tot Westerflier <[hidden email]> wrote:
But that is still cheaper than executing that expensive UDF for each row in your dataset right?

On Fri, Jun 8, 2018 at 9:51 PM Li Jin <[hidden email]> wrote:
I see. Thanks for the clarification. It's not a a big issue but I am surprised my UDF can be executed in planning phase. If my UDF is doing something expensive it could get weird.



On Fri, Jun 8, 2018 at 3:44 PM, Reynold Xin <[hidden email]> wrote:
But from the user's perspective, optimization is not run right? So it is still lazy.


On Fri, Jun 8, 2018 at 12:35 PM Li Jin <[hidden email]> wrote:
Hi All,

Sorry for the long email title. I am a bit surprised to find that the current optimizer rule "ConvertToLocalRelation" causes expressions to be eager-evaluated in planning phase, this can be demonstrated with the following code:

scala> val myUDF = udf((x: String) => { println("UDF evaled"); "result" })

myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))

df: org.apache.spark.sql.DataFrame = [UDF(s): string]


scala> println(df.queryExecution.optimizedPlan)

UDF evaled

LocalRelation [UDF(s)#9]


 This is somewhat unexpected to me because of Spark's lazy execution model.

I am wondering if this behavior is by design?

Thanks!
Li




Reply | Threaded
Open this post in threaded view
|

Re: Optimizer rule ConvertToLocalRelation causes expressions to be eager-evaluated in Planning phase

Xiao Li
For stateful/non-deterministic UDFs, we do not evaluate them in the optimizer stage. For deterministic UDFs, each invocation should return the same result. 

Before Spark 2.3 release, we assume all the UDFs are deterministic and stateless. In the recent release Spark 2.3, we allow users to mark the determinism of UDFs. Thus, in your case, if you want to execute the UDF for each row, please define the UDF as the deterministic one.

In the next release, we might introduce an interface to let users control the optimizer to not run some specific rules. That might also help your case too. 
 

2018-06-08 13:22 GMT-07:00 Li Jin <[hidden email]>:
Sorry I am confused now... My UDF gets executed for each row anyway (because I am doing with column and want to execute the UDF with each row). The difference is that with the optimization "ConvertToLocalRelation" it gets executed for each row on the driver in the optimization stage?

On Fri, Jun 8, 2018 at 3:57 PM, Herman van Hövell tot Westerflier <[hidden email]> wrote:
But that is still cheaper than executing that expensive UDF for each row in your dataset right?

On Fri, Jun 8, 2018 at 9:51 PM Li Jin <[hidden email]> wrote:
I see. Thanks for the clarification. It's not a a big issue but I am surprised my UDF can be executed in planning phase. If my UDF is doing something expensive it could get weird.



On Fri, Jun 8, 2018 at 3:44 PM, Reynold Xin <[hidden email]> wrote:
But from the user's perspective, optimization is not run right? So it is still lazy.


On Fri, Jun 8, 2018 at 12:35 PM Li Jin <[hidden email]> wrote:
Hi All,

Sorry for the long email title. I am a bit surprised to find that the current optimizer rule "ConvertToLocalRelation" causes expressions to be eager-evaluated in planning phase, this can be demonstrated with the following code:

scala> val myUDF = udf((x: String) => { println("UDF evaled"); "result" })

myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))

df: org.apache.spark.sql.DataFrame = [UDF(s): string]


scala> println(df.queryExecution.optimizedPlan)

UDF evaled

LocalRelation [UDF(s)#9]


 This is somewhat unexpected to me because of Spark's lazy execution model.

I am wondering if this behavior is by design?

Thanks!
Li