[DISCUSS] Expensive deterministic UDFs

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Expensive deterministic UDFs

Enrico Minack

Hi all,

Running expensive deterministic UDFs that return complex types, followed by multiple references to those results cause Spark to evaluate the UDF multiple times per row. This has been reported and discussed before: SPARK-18748 SPARK-17728

    val f: Int => Array[Int]
    val udfF = udf(f)
    df
      .select($"id", udfF($"id").as("array"))
      .select($"array"(0).as("array0"), $"array"(1).as("array1"))

A common approach to make Spark evaluate the UDF only once is to cache the intermediate result right after projecting the UDF:

    df
      .select($"id", udfF($"id").as("array"))
      .cache()
      .select($"array"(0).as("array0"), $"array"(1).as("array1"))

There are scenarios where this intermediate result is too big for the cluster to cache. Also this is bad design.

The best approach is to mark the UDF as non-deterministic. Then Spark optimizes the query in a way that the UDF gets called only once per row, exactly what you want.

    val udfF = udf(f).asNondeterministic()

However, stating a UDF is non-deterministic though it clearly is deterministic is counter-intuitive and makes your code harder to read.

Spark should provide a better way to flag the UDF. Calling it expensive would be a better naming here.

    val udfF = udf(f).asExpensive()

I understand that deterministic is a notion that Expression provides, and there is no equivalent to expensive that is understood by the optimizer. However, that asExpensive() could just set the ScalaUDF.udfDeterministic = deterministic && !expensive, which implements the best available approach behind a better naming.

What are your thoughts on asExpensive()?

Regards,
Enrico

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Expensive deterministic UDFs

Sean Owen-2
Interesting, what does non-deterministic do except have this effect?
aside from the naming, it could be a fine use of this flag if that's
all it effectively does. I'm not sure I'd introduce another flag with
the same semantics just over naming. If anything 'expensive' also
isn't the right word, more like 'try not to evaluate multiple times'.

Why isn't caching the answer? I realize it's big, but you can cache to
disk. This may be faster than whatever plan reordering has to happen
to evaluate once.

Usually I'd say, can you redesign your UDF and code to be more
efficient too? or use a big a cluster if that's really what you need.

At first look, no I don't think this Spark-side workaround for naming
for your use case is worthwhile. There are existing better solutions.

On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <[hidden email]> wrote:

>
> Hi all,
>
> Running expensive deterministic UDFs that return complex types, followed by multiple references to those results cause Spark to evaluate the UDF multiple times per row. This has been reported and discussed before: SPARK-18748 SPARK-17728
>
>     val f: Int => Array[Int]
>     val udfF = udf(f)
>     df
>       .select($"id", udfF($"id").as("array"))
>       .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>
> A common approach to make Spark evaluate the UDF only once is to cache the intermediate result right after projecting the UDF:
>
>     df
>       .select($"id", udfF($"id").as("array"))
>       .cache()
>       .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>
> There are scenarios where this intermediate result is too big for the cluster to cache. Also this is bad design.
>
> The best approach is to mark the UDF as non-deterministic. Then Spark optimizes the query in a way that the UDF gets called only once per row, exactly what you want.
>
>     val udfF = udf(f).asNondeterministic()
>
> However, stating a UDF is non-deterministic though it clearly is deterministic is counter-intuitive and makes your code harder to read.
>
> Spark should provide a better way to flag the UDF. Calling it expensive would be a better naming here.
>
>     val udfF = udf(f).asExpensive()
>
> I understand that deterministic is a notion that Expression provides, and there is no equivalent to expensive that is understood by the optimizer. However, that asExpensive() could just set the ScalaUDF.udfDeterministic = deterministic && !expensive, which implements the best available approach behind a better naming.
>
> What are your thoughts on asExpensive()?
>
> Regards,
> Enrico

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Expensive deterministic UDFs

Rubén Berenguel
That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.

Regards,

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen <[hidden email]> wrote:
>
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple times'.
>
> Why isn't caching the answer? I realize it's big, but you can cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
>
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you need.
>
> At first look, no I don't think this Spark-side workaround for naming
> for your use case is worthwhile. There are existing better solutions.
>
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <[hidden email]> wrote:
>>
>> Hi all,
>>
>> Running expensive deterministic UDFs that return complex types, followed by multiple references to those results cause Spark to evaluate the UDF multiple times per row. This has been reported and discussed before: SPARK-18748 SPARK-17728
>>
>>    val f: Int => Array[Int]
>>    val udfF = udf(f)
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> A common approach to make Spark evaluate the UDF only once is to cache the intermediate result right after projecting the UDF:
>>
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .cache()
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> There are scenarios where this intermediate result is too big for the cluster to cache. Also this is bad design.
>>
>> The best approach is to mark the UDF as non-deterministic. Then Spark optimizes the query in a way that the UDF gets called only once per row, exactly what you want.
>>
>>    val udfF = udf(f).asNondeterministic()
>>
>> However, stating a UDF is non-deterministic though it clearly is deterministic is counter-intuitive and makes your code harder to read.
>>
>> Spark should provide a better way to flag the UDF. Calling it expensive would be a better naming here.
>>
>>    val udfF = udf(f).asExpensive()
>>
>> I understand that deterministic is a notion that Expression provides, and there is no equivalent to expensive that is understood by the optimizer. However, that asExpensive() could just set the ScalaUDF.udfDeterministic = deterministic && !expensive, which implements the best available approach behind a better naming.
>>
>> What are your thoughts on asExpensive()?
>>
>> Regards,
>> Enrico
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Expensive deterministic UDFs

cloud0fan
We really need some documents to define what non-deterministic means. AFAIK, non-deterministic expressions may produce a different result for the same input row, if the already processed input rows are different.

The optimizer tries its best to not change the input sequence of non-deterministic expressions. For example, `df.select(..., nonDeterministicExpr).filter...` can't do filter pushdown. An exception is filter condition. For `df.filter(nonDeterministic && cond)`, Spark still pushes down `cond` even if it may change the input sequence of the first condition. This is to respect the SQL semantic that filter conditions ANDed together are order-insensitive. Users should write `df.filter(nonDeterministic).filter(cond)` to guarantee the order.

For this particular problem, I think it's not only about UDF, but a general problem with how Spark collapses Projects.
For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`,  Spark optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a * 5 twice.

I think we should revisit this optimization and think about when we can collapse.

On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel <[hidden email]> wrote:
That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.

Regards,

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen <[hidden email]> wrote:
>
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple times'.
>
> Why isn't caching the answer? I realize it's big, but you can cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
>
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you need.
>
> At first look, no I don't think this Spark-side workaround for naming
> for your use case is worthwhile. There are existing better solutions.
>
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <[hidden email]> wrote:
>>
>> Hi all,
>>
>> Running expensive deterministic UDFs that return complex types, followed by multiple references to those results cause Spark to evaluate the UDF multiple times per row. This has been reported and discussed before: SPARK-18748 SPARK-17728
>>
>>    val f: Int => Array[Int]
>>    val udfF = udf(f)
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> A common approach to make Spark evaluate the UDF only once is to cache the intermediate result right after projecting the UDF:
>>
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .cache()
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> There are scenarios where this intermediate result is too big for the cluster to cache. Also this is bad design.
>>
>> The best approach is to mark the UDF as non-deterministic. Then Spark optimizes the query in a way that the UDF gets called only once per row, exactly what you want.
>>
>>    val udfF = udf(f).asNondeterministic()
>>
>> However, stating a UDF is non-deterministic though it clearly is deterministic is counter-intuitive and makes your code harder to read.
>>
>> Spark should provide a better way to flag the UDF. Calling it expensive would be a better naming here.
>>
>>    val udfF = udf(f).asExpensive()
>>
>> I understand that deterministic is a notion that Expression provides, and there is no equivalent to expensive that is understood by the optimizer. However, that asExpensive() could just set the ScalaUDF.udfDeterministic = deterministic && !expensive, which implements the best available approach behind a better naming.
>>
>> What are your thoughts on asExpensive()?
>>
>> Regards,
>> Enrico
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Expensive deterministic UDFs

Enrico Minack
I agree that 'non-deterministic' is the right term for what it currently does: mark an expression as non-deterministic (returns different values for the same input, e.g. rand()), and the optimizer does its best to not break semantics when moving expressions around.

In case of expensive deterministic UDFs, or any expensive deterministic expression, the optimizer should not multiply effort. Even in case of cheap expressions like a * 5, where performance impact is comparably small, it simply should not execute things twice. So this is not about expensive deterministic expressions but deterministic expressions that get referenced multiple times.

Pushing those expressions into other expressions that reference them is useful in order to simplify those other expressions, e.g. df.withColumn("b", not($"a")).where(not($"b")) will eliminate the double negation of a.

So if expressions are referenced multiple times, they should not be collapsed, unless referencing expressions get simplified. And then the simplification must pay off for evaluating the referenced expression twice. This needs some kind of cost-model, or at least heuristics.

In case of UDFs, I think they should never be collapsed because they cannot be used to simplify other expressions (can they?). They should rather be materialised as close to the first reference as possible. If executing the UDF and referencing it multiple times happens in the same stage, hence the same generated code, we end up with the perfect situation where that materialisation of the result per call is hold in memory and processed by all referencing expressions.

Marking UDFs as expensive is not the right approach here, I agree, they should simply not be executed multiple times.

Enrico


Am 08.11.19 um 04:26 schrieb Wenchen Fan:
We really need some documents to define what non-deterministic means. AFAIK, non-deterministic expressions may produce a different result for the same input row, if the already processed input rows are different.

The optimizer tries its best to not change the input sequence of non-deterministic expressions. For example, `df.select(..., nonDeterministicExpr).filter...` can't do filter pushdown. An exception is filter condition. For `df.filter(nonDeterministic && cond)`, Spark still pushes down `cond` even if it may change the input sequence of the first condition. This is to respect the SQL semantic that filter conditions ANDed together are order-insensitive. Users should write `df.filter(nonDeterministic).filter(cond)` to guarantee the order.

For this particular problem, I think it's not only about UDF, but a general problem with how Spark collapses Projects.
For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`,  Spark optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a * 5 twice.

I think we should revisit this optimization and think about when we can collapse.

On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel <[hidden email]> wrote:
That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.

Regards,

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen <[hidden email]> wrote:
>
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple times'.
>
> Why isn't caching the answer? I realize it's big, but you can cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
>
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you need.
>
> At first look, no I don't think this Spark-side workaround for naming
> for your use case is worthwhile. There are existing better solutions.
>
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <[hidden email]> wrote:
>>
>> Hi all,
>>
>> Running expensive deterministic UDFs that return complex types, followed by multiple references to those results cause Spark to evaluate the UDF multiple times per row. This has been reported and discussed before: SPARK-18748 SPARK-17728
>>
>>    val f: Int => Array[Int]
>>    val udfF = udf(f)
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> A common approach to make Spark evaluate the UDF only once is to cache the intermediate result right after projecting the UDF:
>>
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .cache()
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> There are scenarios where this intermediate result is too big for the cluster to cache. Also this is bad design.
>>
>> The best approach is to mark the UDF as non-deterministic. Then Spark optimizes the query in a way that the UDF gets called only once per row, exactly what you want.
>>
>>    val udfF = udf(f).asNondeterministic()
>>
>> However, stating a UDF is non-deterministic though it clearly is deterministic is counter-intuitive and makes your code harder to read.
>>
>> Spark should provide a better way to flag the UDF. Calling it expensive would be a better naming here.
>>
>>    val udfF = udf(f).asExpensive()
>>
>> I understand that deterministic is a notion that Expression provides, and there is no equivalent to expensive that is understood by the optimizer. However, that asExpensive() could just set the ScalaUDF.udfDeterministic = deterministic && !expensive, which implements the best available approach behind a better naming.
>>
>> What are your thoughts on asExpensive()?
>>
>> Regards,
>> Enrico
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]