[DISCUSS] SPIP: FunctionCatalog

classic Classic list List threaded Threaded
56 messages Options
123
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

cloud0fan
I think there is one agreement between us: we need both the individual-parameters and row-parameter APIs(your SupportsInvoke proposal and my VarargsScalarFunction proposal). IIUC the argument now is how to compose these 2 APIs.

Your proposal is to put the row-parameter API in the base ScalaFunction interface, with an optional SupportsInvoke interface for the individual-parameters API. I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

My proposal is to leave the choice to the users. They can pick one from ScalarFunction0ScalarFunction1, ..., VarargsScalarFunction.

More replies below:

> We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

I don't think we agree with it. Whatever UDF API we choose at the end (either individual-parameters or row-parameter), both non-codegen and codegen code paths should just call these Java methods from the UDF API. It doesn't make sense to have different UDF APIs for non-codegen and codegen.

> The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

My initial idea is to not have these 9 interfaces and fully rely on Java reflection. We can do some benchmark, if reflection is not that slow, I think we don't need to add these 9 interfaces. Preso UDF API takes the same approach. And one correction: my proposal is to use InternalRow for varargs UDF, not Object[].

> Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement

You can take a look at the Spark ScalaUDF expression. It has a big match statement for the non-codegen path, but the codegen path is much simpler because we can generate the exact Java code to call the specific UDF. I don't think it's a big problem, or we can use reflection in the non-codegen path to avoid the big match statement.

> Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.

You seem to keep ignoring my proposal that we can check the UDF function signature at the analysis phase to make sure it matches the input types. And with codegen Spark can call the specific function to avoid boxing issues. If you missed my previous example, here is what the generated code looks like:

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

> I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row.

I have a different opinion about this. If the input is string type but the UDF implementation calls `row.getLong(0)`, it returns wrong data, which is very bad. With the individual-parameters approach, if you implement UDF with `def call(input: Long)` but the input is string type, analyzer can detect it and fail the query.

On Thu, Feb 25, 2021 at 6:48 AM Ryan Blue <[hidden email]> wrote:

How functions are called is a really big element of this effort. I don’t want to get in a position where we’ve started committing changes without clear agreement on something so fundamental to the proposal. I’d like to make sure we’re in agreement with a vote on the SPIP before committing anything. That is, after all, the point of the SPIPs.

If people think it would help to have an alternative API in a PR, then that’s fine with me.

Since that PR suggestion is intended to make it easier to understand the technical details, I’ll try to summarize where we’re at now:

  • We agree on the scope of adding FunctionCatalog to load functions
  • We agree with the FunctionCatalog methods and the function binding approach
  • We agree that a bound function will be either a ScalarFunction or an AggregateFunction (plus the mix-in PartialAggregateFunction)
  • We agree that values should be passed should be Spark’s internal representation to avoid translation
  • We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

The disagreement is about how to call functions when codegen isn’t used or when the function needs to support variable-length argument lists. There are two options:

The first option is for each function to have a method that accepts an InternalRow, from the proposed SPIP:

interface ScalarFunction extends BoundFunction<R> {
  R produceResult(InternalRow input);
}
interface AggregateFunction<S> extends BoundFunction<R> {
  S update(S state, InternalRow input);
  ...
}

The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

interface ScalarFunction1<T1> extends BoundFunction<R> {
  R produceResult(T1 one);
}
interface ScalarFunction2<T1, T2> extends BoundFunction<R> {
  R produceResult(T1 one, T2 two);
}
... 8 more ScalarFunction interfaces
interface ScalarFunctionVarargs extends BoundFunction<R> {
  R produceResult(Object[] args);
}
interface AggregateFunction<S, T1> extends BoundFunction<R> {
  S update(S state, T1 one);
}
interface AggregateFunction<S, T1, T2> extends BoundFunction<R> {
  S update(S state, T1 one, T2 two);
}
... 8 more AggregateFunction interfaces
interface AggregateFunctionVarargs<S> extends BoundFunction<R> {
  S update(S state, Object[] args);
}

Because this is for the non-invoke case, the two options have roughly the same performance characteristics.

The first option has some advantages:

  • It is simpler: there are few interfaces and Spark will always find the right method
  • Accessing a value returns a concrete type, so it is less error-prone. I’ve given an example where this helps identify a problem with an invoke method.

The second option’s advantage is that users have values broken out into arguments. That is, if I understand Wenchen correctly here: “I don’t like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers.”

Disadvantages with the second option:

  • There are 20+ more interfaces in the API
  • Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement that calls each interface separately (see UDFRegistration).
  • Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.
  • The varargs case will result in casting to expected types, which could also fail with ClassCastException

I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row. You get compile-time checks when using the wrong type like this: String val = input.getString(0); won’t compile.

Another important thing to note is that although the original idea was to keep the individual parameter approach simple, Wenchen has already suggested passing arrays as Java arrays, like UTF8String[]. This adds to the complexity of the overall solution and requires matching multiple types. How would Spark know to pass UTF8String[] or ArrayData?

If anyone disagrees with that summary, please point out where it’s incorrect. But barring a major misunderstanding, I think the choice is clear: the simpler approach that provides additional compile-time safety is the right way to go.


On Tue, Feb 23, 2021 at 1:48 AM Wenchen Fan <[hidden email]> wrote:
+1, as I already proposed we can move forward with PRs

> To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

Ryan, can we focus on the function loading and binding part and get it committed first? I can also fork your branch and put everything together, but that might be too big to review.

On Tue, Feb 23, 2021 at 4:35 PM Dongjoon Hyun <[hidden email]> wrote:
I've been still supporting Ryan's SPIP (original PR and its extension proposal discussed here) because of its simplicity.

According to this email thread context, I also understand the different perspectives like Hyukjin's concerns about having multiple ways and Wenchen's proposal and rationales.

It looks like we need more discussion to reach an agreement. And the technical details become more difficult to track because this is an email thread.

Although Ryan initially suggested discussing this on Apache email thread instead of the PR, can we have a PR to discuss?

Especially, Wenchen, could you make your PR based on Ryan's PR?

If we collect the scattered ideas into a single PR, that would be greatly helpful not only for further discussions, but also when we go on a vote on Ryan's PR or Wenchen's PR.

Bests,
Dongjoon.


On Mon, Feb 22, 2021 at 1:16 AM Wenchen Fan <[hidden email]> wrote:
Hi Walaa,

Thanks for sharing this! The type signature stuff is already covered by the unbound UDF API, which specifies the input and output data types. The problem is how to check the method signature of the bound UDF. As you said, Java has type erasure and we can't check `List<String>` for example.

My initial proposal is to do nothing and simply pass the Spark ArrayData, MapData, InternalRow to the UDF. This requires the UDF developers to ensure the type is matched, as they need to call something like `array.getLong(index)` with the corrected type name. It's as worse as the row-parameter version but seems fine as it only happens with nested types. And the type check is still done for the first level (the method signature must use ArrayData/MapData/InternalRow at least).

We can allow more types in the future to make the type check better. It might be too detailed for this discussion thread but just put a few thoughts:
1. Java array doesn't do type erasure. We can use UTF8String[] for example if the input type is array of string.
2. For struct type, we can allow Java beans/Scala case classes if the field name and type match the type signature.
3. For map type, it's actually struct<keys: array<key_type>, values: array<value_type>>, so we can also allow Java beans/Scala case classes here.

The general idea is to use stuff that can retain nested type information at compile-time, i.e. array, java bean, case classes.

Thanks,
Wenchen



On Mon, Feb 22, 2021 at 3:47 PM Walaa Eldin Moustafa <[hidden email]> wrote:
Wenchen, in Transport, users provide the input parameter signatures and output parameter signature as part of the API. Compile-time checks are done by parsing the type signatures and matching them to the type tree received at compile-time. This also helps with inferring the concrete output type.

The specification in the UDF API looks like this:

  @Override
  public List<String> getInputParameterSignatures() {
    return ImmutableList.of(
        "ARRAY(K)",
        "ARRAY(V)"
    );
  }

  @Override
  public String getOutputParameterSignature() {
    return "MAP(K,V)";
  }

The benefits of this type of type signature specification as opposed to inferring types from Java type signatures given in the Java method are:
  • For nested types, Java type erasure eliminates the information about nested types, so for something like my_function(List<String> a1, List<Integer> a2), the value of both a1.class or a2.class is just a List. However, we are planning to work around this in a future version in the case of Array and Map types. Struct types are discussed in the next point.
  • Without pre-code-generation there is no single Java type signature from which we can capture the Struct info. However, Struct info can be expressed in type signatures of the above type, e.g., ROW(FirstName VARCHAR, LastName VARCHAR).
When a Transport UDF represents a Spark UDF, the type signatures are matched against Spark native types, i.e., org.apache.spark.sql.types.{ArrayType, MapType, StructType}, and primitive types. The function that parses/compiles type signatures is found in AbstractTypeInference. This class represents the generic component that is common between all supported engines. Its Spark-specific extension is in SparkTypeInference. In the above example, at compile time, if the first Array happens to be of String element type, and the second Array happens to be of Integer element type, the UDF will communicate to the Spark analyzer that the output should be of type MapData<String, Integer> (i.e., based on what was seen in the input at compile time). The whole UDF becomes a Spark Expression at the end of the day.

Thanks,
Walaa.


On Sun, Feb 21, 2021 at 7:26 PM Wenchen Fan <[hidden email]> wrote:
I think I have made it clear that it's simpler for the UDF developers to deal with the input parameters directly, instead of getting them from a row, as you need to provide the index and type (e.g. row.getLong(0)). It's also coherent with the existing Spark Scala/Java UDF APIs, so that Spark users will be more familiar with the individual-parameters API.

And I have explained it already that we can use reflection to make sure the defined methods have the right types at query-compilation time. It's better than leaving this problem to the UDF developers and asking them to ensure the inputs are gotten from the row correctly with index and type. If there are people from Presto/Transport, it will be great if you can share how Presto/Transport do this check.

I don't like 22 additional interfaces too, but if you look at the examples I gave, the current Spark Java UDF only has 9 interfaces, and Transport has 8. I think it's good enough and people can use VarargsScalarFunction if they need to take more parameters or varargs. It resolves your concern about doing reflection in the non-codegen execution path that leads to bad performance, it also serves for documentation purpose as people can easily see the number of UDF inputs and their types by a quick glance.

As I said, we need to investigate how to avoid boxing. Since you are asking the question now, I spent sometime to think about it. I think the DoubleAdd example is the way to go. For non-codegen code path, we can just call the interface method. For the codegen code path, the generated Java code would look like (omit the null check logic):

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

Which invokes the primitive version automatically. AFAIK this is also how Scala supports primitive type parameter (generate an extra non-boxing version of the method). If the UDF doesn't have the primtive version method, this code will just call the boxed version and still works.

I don't like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers. Can other people share your opinions about the API?

On Sat, Feb 20, 2021 at 5:32 AM Ryan Blue <[hidden email]> wrote:

I don’t see any benefit to more complexity with 22 additional interfaces, instead of simply passing an InternalRow. Why not use a single interface with InternalRow? Maybe you could share your motivation?

That would also result in strange duplication, where the ScalarFunction2 method is just the boxed version:

class DoubleAdd implements ScalarFunction2<Double, Double, Double> {
  @Override
  Double produceResult(Double left, Double right) {
    return left + right;
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

This would work okay, but would be awkward if you wanted to use the same implementation for any number of arguments, like a sum method that adds all of the arguments together and returns the result. It also isn’t great for varargs, since it is basically the same as the invoke case.

The combination of an InternalRow method and the invoke method seems to be a good way to handle this to me. What is wrong with it? And, how would you solve the problem when implementations define methods with the wrong types? The InternalRow approach helps implementations catch that problem (as demonstrated above) and also provides a fallback when there is a but preventing the invoke optimization from working. That seems like a good approach to me.


On Thu, Feb 18, 2021 at 11:31 PM Wenchen Fan <[hidden email]> wrote:
If people have such a big concern about reflection, we can follow the current Spark Java UDF and Transport, and create ScalarFuncion0[R]ScalarFuncion1[T1, R], etc. to avoid reflection. But we may need to investigate how to avoid boxing with this API design.

To put a detailed proposal: let's have ScalarFuncion0ScalarFuncion1, ..., ScalarFuncion9 and VarargsScalarFunction. At execution time, if Spark sees ScalarFuncion0-9, pass the input columns to the UDF directly, one column one parameter. So string type input is UTF8String, array type input is ArrayData. If Spark sees VarargsScalarFunction, wrap the input columns with InternalRow and pass it to the UDF.

In general, if VarargsScalarFunction is implemented, the UDF should not implement ScalarFuncion0-9. We can also define a priority order to allow this. I don't have a strong preference here.

What do you think?

On Fri, Feb 19, 2021 at 1:24 PM Walaa Eldin Moustafa <[hidden email]> wrote:
I agree with Ryan on the questions around the expressivity of the Invoke method. It is not clear to me how the Invoke method can be used to declare UDFs with type-parameterized parameters. For example: a UDF to get the Nth element of an array (regardless of the Array element type) or a UDF to merge two Arrays (of generic types) to a Map.

Also, to address Wenchen's InternalRow question, can we create a number of Function classes, each corresponding to a number of input parameter length (e.g., ScalarFunction1, ScalarFunction2, etc)?

Thanks,
Walaa.
 

On Thu, Feb 18, 2021 at 6:07 PM Ryan Blue <[hidden email]> wrote:

I agree with you that it is better in many cases to directly call a method. But it it not better in all cases, which is why I don’t think it is the right general-purpose choice.

First, if codegen isn’t used for some reason, the reflection overhead is really significant. That gets much better when you have an interface to call. That’s one reason I’d use this pattern:

class DoubleAdd implements ScalarFunction<Double>, SupportsInvoke {
  Double produceResult(InternalRow row) {
    return produceResult(row.getDouble(0), row.getDouble(1));
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

There’s little overhead to adding the InternalRow variation, but we could call it in eval to avoid the reflect overhead. To the point about UDF developers, I think this is a reasonable cost.

Second, I think usability is better and helps avoid runtime issues. Here’s an example:

class StrLen implements ScalarFunction<Integer>, SupportsInvoke {
  Integer produceResult(InternalRow row) {
    return produceResult(row.getString(0));
  }

  Integer produceResult(String str) {
    return str.length();
  }
}

See the bug? I forgot to use UTF8String instead of String. With the InternalRow method, I get a compiler warning because getString produces UTF8String that can’t be passed to produceResult(String). If I decided to implement length separately, then we could still run the InternalRow version and log a warning. The code would be slightly slower, but wouldn’t fail.

There are similar situations with varargs where it’s better to call methods that produce concrete types than to cast from Object to some expected type.

I think that using invoke is a great extension to the proposal, but I don’t think that it should be the only way to call functions. By all means, let’s work on it in parallel and use it where possible. But I think we do need the fallback of using InternalRow and that it isn’t a usability problem to include it.

Oh, and one last thought is that we already have users that call Dataset.map and use InternalRow. This would allow converting that code directly to a UDF.

I think we’re closer to agreeing here than it actually looks. Hopefully you’ll agree that having the InternalRow method isn’t a big usability problem.


On Wed, Feb 17, 2021 at 11:51 PM Wenchen Fan <[hidden email]> wrote:
I don't see any objections to the rest of the proposal (loading functions from the catalog, function binding stuff, etc.) and I assume everyone is OK with it. We can commit that part first.

Currently, the discussion focuses on the `ScalarFunction` API, where I think it's better to directly take the input columns as the UDF parameter, instead of wrapping the input columns with InternalRow and taking the InternalRow as the UDF parameter. It's not only for better performance, but also for ease of use. For example, it's easier for the UDF developer to write `input1 + input2` than `inputRow.getLong(0) + inputRow.getLong(1)`, as they don't need to specify the type and index by themselves (getLong(0)) which is error-prone.

It does push more work to the Spark side, but I think it's worth it if implementing UDF gets easier. I don't think the work is very challenging, as we can leverage the infra we built for the expression encoder.

I think it's also important to look at the UDF API from the user's perspective (UDF developers). How do you like the UDF API without considering how Spark can support it? Do you prefer the individual-parameters version or the row-parameter version?

To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

On Thu, Feb 18, 2021 at 4:48 AM Ryan Blue <[hidden email]> wrote:
Thanks, Hyukjin. I think that's a fair summary. And I agree with the idea that we should focus on what Spark will do by default.

I think we should focus on the proposal, for two reasons: first, there is a straightforward path to incorporate Wenchen's suggestion via `SupportsInvoke`, and second, the proposal is more complete: it defines a solution for many concerns like loading a function and finding out what types to use -- not just how to call code -- and supports more use cases like varargs functions. I think we can continue to discuss the rest of the proposal and be confident that we can support an invoke code path where it makes sense.

Does everyone agree? If not, I think we would need to solve a lot of the challenges that I initially brought up with the invoke idea. It seems like a good way to call a function, but needs a real proposal behind it if we don't use it via `SupportsInvoke` in the current proposal.

On Tue, Feb 16, 2021 at 11:07 PM Hyukjin Kwon <[hidden email]> wrote:

Just to make sure we don’t move past, I think we haven’t decided yet:

  • if we’ll replace the current proposal to Wenchen’s approach as the default
  • if we want to have Wenchen’s approach as an optional mix-in on the top of Ryan’s proposal (SupportsInvoke)

From what I read, some people pointed out it as a replacement. Please correct me if I misread this discussion thread.

As Dongjoon pointed out, it would be good to know rough ETA to make sure making progress in this, and people can compare more easily.


FWIW, there’s the saying I like in the zen of Python:

There should be one— and preferably only one —obvious way to do it.

If multiple approaches have the way for developers to do the (almost) same thing, I would prefer to avoid it.

In addition, I would prefer to focus on what Spark does by default first.


2021년 2월 17일 (수) 오후 2:33, Dongjoon Hyun <[hidden email]>님이 작성:
Hi, Wenchen.

This thread seems to get enough attention. Also, I'm expecting more and more if we have this on the `master` branch because we are developing together.

    > Spark SQL has many active contributors/committers and this thread doesn't get much attention yet.

So, what's your ETA from now?

    > I think the problem here is we were discussing some very detailed things without actual code.
    > I'll implement my idea after the holiday and then we can have more effective discussions.
    > We can also do benchmarks and get some real numbers.
    > In the meantime, we can continue to discuss other parts of this proposal, and make a prototype if possible.

I'm looking forward to seeing your PR. I hope we can conclude this thread and have at least one implementation in the `master` branch this month (February).
If you need more time (one month or longer), why don't we have Ryan's suggestion in the `master` branch first and benchmark with your PR later during Apache Spark 3.2 timeframe.

Bests,
Dongjoon.


On Tue, Feb 16, 2021 at 9:26 AM Ryan Blue <[hidden email]> wrote:
Andrew,

The proposal already includes an API for aggregate functions and I think we would want to implement those right away.

Processing ColumnBatch is something we can easily extend the interfaces to support, similar to Wenchen's suggestion. The important thing right now is to agree on some basic functionality: how to look up functions and what the simple API should be. Like the TableCatalog interfaces, we will layer on more support through optional interfaces like `SupportsInvoke` or `SupportsColumnBatch`.

On Tue, Feb 16, 2021 at 9:00 AM Andrew Melo <[hidden email]> wrote:
Hello Ryan,

This proposal looks very interesting. Would future goals for this
functionality include both support for aggregation functions, as well
as support for processing ColumnBatch-es (instead of Row/InternalRow)?

Thanks
Andrew

On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue <[hidden email]> wrote:
>
> Thanks for the positive feedback, everyone. It sounds like there is a clear path forward for calling functions. Even without a prototype, the `invoke` plans show that Wenchen's suggested optimization can be done, and incorporating it as an optional extension to this proposal solves many of the unknowns.
>
> With that area now understood, is there any discussion about other parts of the proposal, besides the function call interface?
>
> On Fri, Feb 12, 2021 at 10:40 PM Chao Sun <[hidden email]> wrote:
>>
>> This is an important feature which can unblock several other projects including bucket join support for DataSource v2, complete support for enforcing DataSource v2 distribution requirements on the write path, etc. I like Ryan's proposals which look simple and elegant, with nice support on function overloading and variadic arguments. On the other hand, I think Wenchen made a very good point about performance. Overall, I'm excited to see active discussions on this topic and believe the community will come to a proposal with the best of both sides.
>>
>> Chao
>>
>> On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon <[hidden email]> wrote:
>>>
>>> +1 for Liang-chi's.
>>>
>>> Thanks Ryan and Wenchen for leading this.
>>>
>>>
>>> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh <[hidden email]>님이 작성:
>>>>
>>>> Basically I think the proposal makes sense to me and I'd like to support the
>>>> SPIP as it looks like we have strong need for the important feature.
>>>>
>>>> Thanks Ryan for working on this and I do also look forward to Wenchen's
>>>> implementation. Thanks for the discussion too.
>>>>
>>>> Actually I think the SupportsInvoke proposed by Ryan looks a good
>>>> alternative to me. Besides Wenchen's alternative implementation, is there a
>>>> chance we also have the SupportsInvoke for comparison?
>>>>
>>>>
>>>> John Zhuge wrote
>>>> > Excited to see our Spark community rallying behind this important feature!
>>>> >
>>>> > The proposal lays a solid foundation of minimal feature set with careful
>>>> > considerations for future optimizations and extensions. Can't wait to see
>>>> > it leading to more advanced functionalities like views with shared custom
>>>> > functions, function pushdown, lambda, etc. It has already borne fruit from
>>>> > the constructive collaborations in this thread. Looking forward to
>>>> > Wenchen's prototype and further discussions including the SupportsInvoke
>>>> > extension proposed by Ryan.
>>>> >
>>>> >
>>>> > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley &lt;
>>>>
>>>> > owen.omalley@
>>>>
>>>> > &gt;
>>>> > wrote:
>>>> >
>>>> >> I think this proposal is a very good thing giving Spark a standard way of
>>>> >> getting to and calling UDFs.
>>>> >>
>>>> >> I like having the ScalarFunction as the API to call the UDFs. It is
>>>> >> simple, yet covers all of the polymorphic type cases well. I think it
>>>> >> would
>>>> >> also simplify using the functions in other contexts like pushing down
>>>> >> filters into the ORC & Parquet readers although there are a lot of
>>>> >> details
>>>> >> that would need to be considered there.
>>>> >>
>>>> >> .. Owen
>>>> >>
>>>> >>
>>>> >> On Fri, Feb 12, 2021 at 11:07 PM Erik Krogen &lt;
>>>>
>>>> > ekrogen@.com
>>>>
>>>> > &gt;
>>>> >> wrote:
>>>> >>
>>>> >>> I agree that there is a strong need for a FunctionCatalog within Spark
>>>> >>> to
>>>> >>> provide support for shareable UDFs, as well as make movement towards
>>>> >>> more
>>>> >>> advanced functionality like views which themselves depend on UDFs, so I
>>>> >>> support this SPIP wholeheartedly.
>>>> >>>
>>>> >>> I find both of the proposed UDF APIs to be sufficiently user-friendly
>>>> >>> and
>>>> >>> extensible. I generally think Wenchen's proposal is easier for a user to
>>>> >>> work with in the common case, but has greater potential for confusing
>>>> >>> and
>>>> >>> hard-to-debug behavior due to use of reflective method signature
>>>> >>> searches.
>>>> >>> The merits on both sides can hopefully be more properly examined with
>>>> >>> code,
>>>> >>> so I look forward to seeing an implementation of Wenchen's ideas to
>>>> >>> provide
>>>> >>> a more concrete comparison. I am optimistic that we will not let the
>>>> >>> debate
>>>> >>> over this point unreasonably stall the SPIP from making progress.
>>>> >>>
>>>> >>> Thank you to both Wenchen and Ryan for your detailed consideration and
>>>> >>> evaluation of these ideas!
>>>> >>> ------------------------------
>>>> >>> *From:* Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> *Sent:* Wednesday, February 10, 2021 6:06 PM
>>>> >>> *To:* Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt;
>>>> >>> *Cc:* Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;; Hyukjin Kwon <
>>>> >>>
>>>>
>>>> > gurwls223@
>>>>
>>>> >>; Spark Dev List &lt;
>>>>
>>>> > dev@.apache
>>>>
>>>> > &gt;; Wenchen Fan
>>>> >>> &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt;
>>>> >>> *Subject:* Re: [DISCUSS] SPIP: FunctionCatalog
>>>> >>>
>>>> >>> BTW, I forgot to add my opinion explicitly in this thread because I was
>>>> >>> on the PR before this thread.
>>>> >>>
>>>> >>> 1. The `FunctionCatalog API` PR was made on May 9, 2019 and has been
>>>> >>> there for almost two years.
>>>> >>> 2. I already gave my +1 on that PR last Saturday because I agreed with
>>>> >>> the latest updated design docs and AS-IS PR.
>>>> >>>
>>>> >>> And, the rest of the progress in this thread is also very satisfying to
>>>> >>> me.
>>>> >>> (e.g. Ryan's extension suggestion and Wenchen's alternative)
>>>> >>>
>>>> >>> To All:
>>>> >>> Please take a look at the design doc and the PR, and give us some
>>>> >>> opinions.
>>>> >>> We really need your participation in order to make DSv2 more complete.
>>>> >>> This will unblock other DSv2 features, too.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:58 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Hi, Ryan.
>>>> >>>
>>>> >>> We didn't move past anything (both yours and Wenchen's). What Wenchen
>>>> >>> suggested is double-checking the alternatives with the implementation to
>>>> >>> give more momentum to our discussion.
>>>> >>>
>>>> >>> Your new suggestion about optional extention also sounds like a new
>>>> >>> reasonable alternative to me.
>>>> >>>
>>>> >>> We are still discussing this topic together and I hope we can make a
>>>> >>> conclude at this time (for Apache Spark 3.2) without being stucked like
>>>> >>> last time.
>>>> >>>
>>>> >>> I really appreciate your leadership in this dicsussion and the moving
>>>> >>> direction of this discussion looks constructive to me. Let's give some
>>>> >>> time
>>>> >>> to the alternatives.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:14 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> I don’t think we should so quickly move past the drawbacks of this
>>>> >>> approach. The problems are significant enough that using invoke is not
>>>> >>> sufficient on its own. But, I think we can add it as an optional
>>>> >>> extension
>>>> >>> to shore up the weaknesses.
>>>> >>>
>>>> >>> Here’s a summary of the drawbacks:
>>>> >>>
>>>> >>>    - Magic function signatures are error-prone
>>>> >>>    - Spark would need considerable code to help users find what went
>>>> >>>    wrong
>>>> >>>    - Spark would likely need to coerce arguments (e.g., String,
>>>> >>>    Option[Int]) for usability
>>>> >>>    - It is unclear how Spark will find the Java Method to call
>>>> >>>    - Use cases that require varargs fall back to casting; users will
>>>> >>>    also get this wrong (cast to String instead of UTF8String)
>>>> >>>    - The non-codegen path is significantly slower
>>>> >>>
>>>> >>> The benefit of invoke is to avoid moving data into a row, like this:
>>>> >>>
>>>> >>> -- using invoke
>>>> >>> int result = udfFunction(x, y)
>>>> >>>
>>>> >>> -- using row
>>>> >>> udfRow.update(0, x); -- actual: values[0] = x;
>>>> >>> udfRow.update(1, y);
>>>> >>> int result = udfFunction(udfRow);
>>>> >>>
>>>> >>> And, again, that won’t actually help much in cases that require varargs.
>>>> >>>
>>>> >>> I suggest we add a new marker trait for BoundMethod called
>>>> >>> SupportsInvoke.
>>>> >>> If that interface is implemented, then Spark will look for a method that
>>>> >>> matches the expected signature based on the bound input type. If it
>>>> >>> isn’t
>>>> >>> found, Spark can print a warning and fall back to the InternalRow call:
>>>> >>> “Cannot find udfFunction(int, int)”.
>>>> >>>
>>>> >>> This approach allows the invoke optimization, but solves many of the
>>>> >>> problems:
>>>> >>>
>>>> >>>    - The method to invoke is found using the proposed load and bind
>>>> >>>    approach
>>>> >>>    - Magic function signatures are optional and do not cause runtime
>>>> >>>    failures
>>>> >>>    - Because this is an optional optimization, Spark can be more strict
>>>> >>>    about types
>>>> >>>    - Varargs cases can still use rows
>>>> >>>    - Non-codegen can use an evaluation method rather than falling back
>>>> >>>    to slow Java reflection
>>>> >>>
>>>> >>> This seems like a good extension to me; this provides a plan for
>>>> >>> optimizing the UDF call to avoid building a row, while the existing
>>>> >>> proposal covers the other cases well and addresses how to locate these
>>>> >>> function calls.
>>>> >>>
>>>> >>> This also highlights that the approach used in DSv2 and this proposal is
>>>> >>> working: start small and use extensions to layer on more complex
>>>> >>> support.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 9:04 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Thank you all for making a giant move forward for Apache Spark 3.2.0.
>>>> >>> I'm really looking forward to seeing Wenchen's implementation.
>>>> >>> That would be greatly helpful to make a decision!
>>>> >>>
>>>> >>> > I'll implement my idea after the holiday and then we can have
>>>> >>> more effective discussions. We can also do benchmarks and get some real
>>>> >>> numbers.
>>>> >>> > FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067978066%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=iMWmHqqXPcT7EK%2Bovyzhy%2BZpU6Llih%2BwdZD53wvobmc%3D&amp;reserved=0&gt;
>>>> >>> also
>>>> >>> takes individual parameters instead of the row parameter. I think this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>> On Tue, Feb 9, 2021 at 10:18 PM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=ZSBCR7yx3PpwL4KY9V73JG42Z02ZodqkjxC0LweHt1g%3D&amp;reserved=0&gt;
>>>> >>> also takes individual parameters instead of the row parameter. I think
>>>> >>> this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:18 AM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi Holden,
>>>> >>>
>>>> >>> As Hyukjin said, following existing designs is not the principle of DS
>>>> >>> v2
>>>> >>> API design. We should make sure the DS v2 API makes sense. AFAIK we
>>>> >>> didn't
>>>> >>> fully follow the catalog API design from Hive and I believe Ryan also
>>>> >>> agrees with it.
>>>> >>>
>>>> >>> I think the problem here is we were discussing some very detailed things
>>>> >>> without actual code. I'll implement my idea after the holiday and then
>>>> >>> we
>>>> >>> can have more effective discussions. We can also do benchmarks and get
>>>> >>> some
>>>> >>> real numbers.
>>>> >>>
>>>> >>> In the meantime, we can continue to discuss other parts of this
>>>> >>> proposal,
>>>> >>> and make a prototype if possible. Spark SQL has many active
>>>> >>> contributors/committers and this thread doesn't get much attention yet.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 6:17 AM Hyukjin Kwon &lt;
>>>>
>>>> > gurwls223@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Just dropping a few lines. I remember that one of the goals in DSv2 is
>>>> >>> to
>>>> >>> correct the mistakes we made in the current Spark codes.
>>>> >>> It would not have much point if we will happen to just follow and mimic
>>>> >>> what Spark currently does. It might just end up with another copy of
>>>> >>> Spark
>>>> >>> APIs, e.g. Expression (internal) APIs. I sincerely would like to avoid
>>>> >>> this
>>>> >>> I do believe we have been stuck mainly due to trying to come up with a
>>>> >>> better design. We already have an ugly picture of the current Spark APIs
>>>> >>> to
>>>> >>> draw a better bigger picture.
>>>> >>>
>>>> >>>
>>>> >>> 2021년 2월 10일 (수) 오전 3:28, Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;님이 작성:
>>>> >>>
>>>> >>> I think this proposal is a good set of trade-offs and has existed in the
>>>> >>> community for a long period of time. I especially appreciate how the
>>>> >>> design
>>>> >>> is focused on a minimal useful component, with future optimizations
>>>> >>> considered from a point of view of making sure it's flexible, but actual
>>>> >>> concrete decisions left for the future once we see how this API is used.
>>>> >>> I
>>>> >>> think if we try and optimize everything right out of the gate, we'll
>>>> >>> quickly get stuck (again) and not make any progress.
>>>> >>>
>>>> >>> On Mon, Feb 8, 2021 at 10:46 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi everyone,
>>>> >>>
>>>> >>> I'd like to start a discussion for adding a FunctionCatalog interface to
>>>> >>> catalog plugins. This will allow catalogs to expose functions to Spark,
>>>> >>> similar to how the TableCatalog interface allows a catalog to expose
>>>> >>> tables. The proposal doc is available here:
>>>> >>> https://docs.google.com/document/d/1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U/edit
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U%2Fedit&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=Kyth8%2FhNUZ6GXG2FsgcknZ7t7s0%2BpxnDMPyxvsxLLqE%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Here's a high-level summary of some of the main design choices:
>>>> >>> * Adds the ability to list and load functions, not to create or modify
>>>> >>> them in an external catalog
>>>> >>> * Supports scalar, aggregate, and partial aggregate functions
>>>> >>> * Uses load and bind steps for better error messages and simpler
>>>> >>> implementations
>>>> >>> * Like the DSv2 table read and write APIs, it uses InternalRow to pass
>>>> >>> data
>>>> >>> * Can be extended using mix-in interfaces to add vectorization, codegen,
>>>> >>> and other future features
>>>> >>>
>>>> >>> There is also a PR with the proposed API:
>>>> >>> https://github.com/apache/spark/pull/24559/files
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F24559%2Ffiles&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=t3ZCqffdsrmCY3X%2FT8x1oMjMcNUiQ0wQNk%2ByAXQx1Io%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Let's discuss the proposal here rather than on that PR, to get better
>>>> >>> visibility. Also, please take the time to read the proposal first. That
>>>> >>> really helps clear up misconceptions.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Twitter: https://twitter.com/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=fVfSPIyazuUYv8VLfNu%2BUIHdc3ePM1AAKKH%2BlnIicF8%3D&amp;reserved=0&gt;
>>>> >>> Books (Learning Spark, High Performance Spark, etc.):
>>>> >>> https://amzn.to/2MaRAG9
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Famzn.to%2F2MaRAG9&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=NbRl9kK%2B6Wy0jWmDnztYp3JCPNLuJvmFsLHUrXzEhlk%3D&amp;reserved=0&gt;
>>>> >>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.youtube.com%2Fuser%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060068007935%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=OWXOBELzO3hBa2JI%2FOSBZ3oNyLq0yr%2FGXMkNn7bqYDM%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >
>>>> > --
>>>> > John Zhuge
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: [hidden email]
>>>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Chao Sun
Correct me if I'm wrong, but it appears we've basically agreed upon the APIs proposed in the SPIP (forget the naming part):

interface ScalarFunction extends BoundFunction<R> { 
  R produceResult(InternalRow args); 

interface AggregateFunction<S, R> extends BoundFunction<R> { 
  S update(S state, InternalRow input); 
}

together with the rest of the design such as FunctionCatalog and binding process. 

The argument at the moment seems to be whether we want to have SupportsInvoke or [Scalar|Aggregate]FunctionN alongside these, is that correct?
 In order to move this forward, perhaps we can merge the PR as it is (maybe we'll need a vote?) and proceed to discuss these topics? We can also then present separate PRs on top of it, which can help a lot for people within this thread to provide comments.

WDYT?

Best,
Chao

On Wed, Feb 24, 2021 at 10:45 PM Wenchen Fan <[hidden email]> wrote:
I think there is one agreement between us: we need both the individual-parameters and row-parameter APIs(your SupportsInvoke proposal and my VarargsScalarFunction proposal). IIUC the argument now is how to compose these 2 APIs.

Your proposal is to put the row-parameter API in the base ScalaFunction interface, with an optional SupportsInvoke interface for the individual-parameters API. I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

My proposal is to leave the choice to the users. They can pick one from ScalarFunction0ScalarFunction1, ..., VarargsScalarFunction.

More replies below:

> We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

I don't think we agree with it. Whatever UDF API we choose at the end (either individual-parameters or row-parameter), both non-codegen and codegen code paths should just call these Java methods from the UDF API. It doesn't make sense to have different UDF APIs for non-codegen and codegen.

> The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

My initial idea is to not have these 9 interfaces and fully rely on Java reflection. We can do some benchmark, if reflection is not that slow, I think we don't need to add these 9 interfaces. Preso UDF API takes the same approach. And one correction: my proposal is to use InternalRow for varargs UDF, not Object[].

> Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement

You can take a look at the Spark ScalaUDF expression. It has a big match statement for the non-codegen path, but the codegen path is much simpler because we can generate the exact Java code to call the specific UDF. I don't think it's a big problem, or we can use reflection in the non-codegen path to avoid the big match statement.

> Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.

You seem to keep ignoring my proposal that we can check the UDF function signature at the analysis phase to make sure it matches the input types. And with codegen Spark can call the specific function to avoid boxing issues. If you missed my previous example, here is what the generated code looks like:

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

> I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row.

I have a different opinion about this. If the input is string type but the UDF implementation calls `row.getLong(0)`, it returns wrong data, which is very bad. With the individual-parameters approach, if you implement UDF with `def call(input: Long)` but the input is string type, analyzer can detect it and fail the query.

On Thu, Feb 25, 2021 at 6:48 AM Ryan Blue <[hidden email]> wrote:

How functions are called is a really big element of this effort. I don’t want to get in a position where we’ve started committing changes without clear agreement on something so fundamental to the proposal. I’d like to make sure we’re in agreement with a vote on the SPIP before committing anything. That is, after all, the point of the SPIPs.

If people think it would help to have an alternative API in a PR, then that’s fine with me.

Since that PR suggestion is intended to make it easier to understand the technical details, I’ll try to summarize where we’re at now:

  • We agree on the scope of adding FunctionCatalog to load functions
  • We agree with the FunctionCatalog methods and the function binding approach
  • We agree that a bound function will be either a ScalarFunction or an AggregateFunction (plus the mix-in PartialAggregateFunction)
  • We agree that values should be passed should be Spark’s internal representation to avoid translation
  • We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

The disagreement is about how to call functions when codegen isn’t used or when the function needs to support variable-length argument lists. There are two options:

The first option is for each function to have a method that accepts an InternalRow, from the proposed SPIP:

interface ScalarFunction extends BoundFunction<R> {
  R produceResult(InternalRow input);
}
interface AggregateFunction<S> extends BoundFunction<R> {
  S update(S state, InternalRow input);
  ...
}

The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

interface ScalarFunction1<T1> extends BoundFunction<R> {
  R produceResult(T1 one);
}
interface ScalarFunction2<T1, T2> extends BoundFunction<R> {
  R produceResult(T1 one, T2 two);
}
... 8 more ScalarFunction interfaces
interface ScalarFunctionVarargs extends BoundFunction<R> {
  R produceResult(Object[] args);
}
interface AggregateFunction<S, T1> extends BoundFunction<R> {
  S update(S state, T1 one);
}
interface AggregateFunction<S, T1, T2> extends BoundFunction<R> {
  S update(S state, T1 one, T2 two);
}
... 8 more AggregateFunction interfaces
interface AggregateFunctionVarargs<S> extends BoundFunction<R> {
  S update(S state, Object[] args);
}

Because this is for the non-invoke case, the two options have roughly the same performance characteristics.

The first option has some advantages:

  • It is simpler: there are few interfaces and Spark will always find the right method
  • Accessing a value returns a concrete type, so it is less error-prone. I’ve given an example where this helps identify a problem with an invoke method.

The second option’s advantage is that users have values broken out into arguments. That is, if I understand Wenchen correctly here: “I don’t like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers.”

Disadvantages with the second option:

  • There are 20+ more interfaces in the API
  • Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement that calls each interface separately (see UDFRegistration).
  • Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.
  • The varargs case will result in casting to expected types, which could also fail with ClassCastException

I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row. You get compile-time checks when using the wrong type like this: String val = input.getString(0); won’t compile.

Another important thing to note is that although the original idea was to keep the individual parameter approach simple, Wenchen has already suggested passing arrays as Java arrays, like UTF8String[]. This adds to the complexity of the overall solution and requires matching multiple types. How would Spark know to pass UTF8String[] or ArrayData?

If anyone disagrees with that summary, please point out where it’s incorrect. But barring a major misunderstanding, I think the choice is clear: the simpler approach that provides additional compile-time safety is the right way to go.


On Tue, Feb 23, 2021 at 1:48 AM Wenchen Fan <[hidden email]> wrote:
+1, as I already proposed we can move forward with PRs

> To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

Ryan, can we focus on the function loading and binding part and get it committed first? I can also fork your branch and put everything together, but that might be too big to review.

On Tue, Feb 23, 2021 at 4:35 PM Dongjoon Hyun <[hidden email]> wrote:
I've been still supporting Ryan's SPIP (original PR and its extension proposal discussed here) because of its simplicity.

According to this email thread context, I also understand the different perspectives like Hyukjin's concerns about having multiple ways and Wenchen's proposal and rationales.

It looks like we need more discussion to reach an agreement. And the technical details become more difficult to track because this is an email thread.

Although Ryan initially suggested discussing this on Apache email thread instead of the PR, can we have a PR to discuss?

Especially, Wenchen, could you make your PR based on Ryan's PR?

If we collect the scattered ideas into a single PR, that would be greatly helpful not only for further discussions, but also when we go on a vote on Ryan's PR or Wenchen's PR.

Bests,
Dongjoon.


On Mon, Feb 22, 2021 at 1:16 AM Wenchen Fan <[hidden email]> wrote:
Hi Walaa,

Thanks for sharing this! The type signature stuff is already covered by the unbound UDF API, which specifies the input and output data types. The problem is how to check the method signature of the bound UDF. As you said, Java has type erasure and we can't check `List<String>` for example.

My initial proposal is to do nothing and simply pass the Spark ArrayData, MapData, InternalRow to the UDF. This requires the UDF developers to ensure the type is matched, as they need to call something like `array.getLong(index)` with the corrected type name. It's as worse as the row-parameter version but seems fine as it only happens with nested types. And the type check is still done for the first level (the method signature must use ArrayData/MapData/InternalRow at least).

We can allow more types in the future to make the type check better. It might be too detailed for this discussion thread but just put a few thoughts:
1. Java array doesn't do type erasure. We can use UTF8String[] for example if the input type is array of string.
2. For struct type, we can allow Java beans/Scala case classes if the field name and type match the type signature.
3. For map type, it's actually struct<keys: array<key_type>, values: array<value_type>>, so we can also allow Java beans/Scala case classes here.

The general idea is to use stuff that can retain nested type information at compile-time, i.e. array, java bean, case classes.

Thanks,
Wenchen



On Mon, Feb 22, 2021 at 3:47 PM Walaa Eldin Moustafa <[hidden email]> wrote:
Wenchen, in Transport, users provide the input parameter signatures and output parameter signature as part of the API. Compile-time checks are done by parsing the type signatures and matching them to the type tree received at compile-time. This also helps with inferring the concrete output type.

The specification in the UDF API looks like this:

  @Override
  public List<String> getInputParameterSignatures() {
    return ImmutableList.of(
        "ARRAY(K)",
        "ARRAY(V)"
    );
  }

  @Override
  public String getOutputParameterSignature() {
    return "MAP(K,V)";
  }

The benefits of this type of type signature specification as opposed to inferring types from Java type signatures given in the Java method are:
  • For nested types, Java type erasure eliminates the information about nested types, so for something like my_function(List<String> a1, List<Integer> a2), the value of both a1.class or a2.class is just a List. However, we are planning to work around this in a future version in the case of Array and Map types. Struct types are discussed in the next point.
  • Without pre-code-generation there is no single Java type signature from which we can capture the Struct info. However, Struct info can be expressed in type signatures of the above type, e.g., ROW(FirstName VARCHAR, LastName VARCHAR).
When a Transport UDF represents a Spark UDF, the type signatures are matched against Spark native types, i.e., org.apache.spark.sql.types.{ArrayType, MapType, StructType}, and primitive types. The function that parses/compiles type signatures is found in AbstractTypeInference. This class represents the generic component that is common between all supported engines. Its Spark-specific extension is in SparkTypeInference. In the above example, at compile time, if the first Array happens to be of String element type, and the second Array happens to be of Integer element type, the UDF will communicate to the Spark analyzer that the output should be of type MapData<String, Integer> (i.e., based on what was seen in the input at compile time). The whole UDF becomes a Spark Expression at the end of the day.

Thanks,
Walaa.


On Sun, Feb 21, 2021 at 7:26 PM Wenchen Fan <[hidden email]> wrote:
I think I have made it clear that it's simpler for the UDF developers to deal with the input parameters directly, instead of getting them from a row, as you need to provide the index and type (e.g. row.getLong(0)). It's also coherent with the existing Spark Scala/Java UDF APIs, so that Spark users will be more familiar with the individual-parameters API.

And I have explained it already that we can use reflection to make sure the defined methods have the right types at query-compilation time. It's better than leaving this problem to the UDF developers and asking them to ensure the inputs are gotten from the row correctly with index and type. If there are people from Presto/Transport, it will be great if you can share how Presto/Transport do this check.

I don't like 22 additional interfaces too, but if you look at the examples I gave, the current Spark Java UDF only has 9 interfaces, and Transport has 8. I think it's good enough and people can use VarargsScalarFunction if they need to take more parameters or varargs. It resolves your concern about doing reflection in the non-codegen execution path that leads to bad performance, it also serves for documentation purpose as people can easily see the number of UDF inputs and their types by a quick glance.

As I said, we need to investigate how to avoid boxing. Since you are asking the question now, I spent sometime to think about it. I think the DoubleAdd example is the way to go. For non-codegen code path, we can just call the interface method. For the codegen code path, the generated Java code would look like (omit the null check logic):

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

Which invokes the primitive version automatically. AFAIK this is also how Scala supports primitive type parameter (generate an extra non-boxing version of the method). If the UDF doesn't have the primtive version method, this code will just call the boxed version and still works.

I don't like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers. Can other people share your opinions about the API?

On Sat, Feb 20, 2021 at 5:32 AM Ryan Blue <[hidden email]> wrote:

I don’t see any benefit to more complexity with 22 additional interfaces, instead of simply passing an InternalRow. Why not use a single interface with InternalRow? Maybe you could share your motivation?

That would also result in strange duplication, where the ScalarFunction2 method is just the boxed version:

class DoubleAdd implements ScalarFunction2<Double, Double, Double> {
  @Override
  Double produceResult(Double left, Double right) {
    return left + right;
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

This would work okay, but would be awkward if you wanted to use the same implementation for any number of arguments, like a sum method that adds all of the arguments together and returns the result. It also isn’t great for varargs, since it is basically the same as the invoke case.

The combination of an InternalRow method and the invoke method seems to be a good way to handle this to me. What is wrong with it? And, how would you solve the problem when implementations define methods with the wrong types? The InternalRow approach helps implementations catch that problem (as demonstrated above) and also provides a fallback when there is a but preventing the invoke optimization from working. That seems like a good approach to me.


On Thu, Feb 18, 2021 at 11:31 PM Wenchen Fan <[hidden email]> wrote:
If people have such a big concern about reflection, we can follow the current Spark Java UDF and Transport, and create ScalarFuncion0[R]ScalarFuncion1[T1, R], etc. to avoid reflection. But we may need to investigate how to avoid boxing with this API design.

To put a detailed proposal: let's have ScalarFuncion0ScalarFuncion1, ..., ScalarFuncion9 and VarargsScalarFunction. At execution time, if Spark sees ScalarFuncion0-9, pass the input columns to the UDF directly, one column one parameter. So string type input is UTF8String, array type input is ArrayData. If Spark sees VarargsScalarFunction, wrap the input columns with InternalRow and pass it to the UDF.

In general, if VarargsScalarFunction is implemented, the UDF should not implement ScalarFuncion0-9. We can also define a priority order to allow this. I don't have a strong preference here.

What do you think?

On Fri, Feb 19, 2021 at 1:24 PM Walaa Eldin Moustafa <[hidden email]> wrote:
I agree with Ryan on the questions around the expressivity of the Invoke method. It is not clear to me how the Invoke method can be used to declare UDFs with type-parameterized parameters. For example: a UDF to get the Nth element of an array (regardless of the Array element type) or a UDF to merge two Arrays (of generic types) to a Map.

Also, to address Wenchen's InternalRow question, can we create a number of Function classes, each corresponding to a number of input parameter length (e.g., ScalarFunction1, ScalarFunction2, etc)?

Thanks,
Walaa.
 

On Thu, Feb 18, 2021 at 6:07 PM Ryan Blue <[hidden email]> wrote:

I agree with you that it is better in many cases to directly call a method. But it it not better in all cases, which is why I don’t think it is the right general-purpose choice.

First, if codegen isn’t used for some reason, the reflection overhead is really significant. That gets much better when you have an interface to call. That’s one reason I’d use this pattern:

class DoubleAdd implements ScalarFunction<Double>, SupportsInvoke {
  Double produceResult(InternalRow row) {
    return produceResult(row.getDouble(0), row.getDouble(1));
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

There’s little overhead to adding the InternalRow variation, but we could call it in eval to avoid the reflect overhead. To the point about UDF developers, I think this is a reasonable cost.

Second, I think usability is better and helps avoid runtime issues. Here’s an example:

class StrLen implements ScalarFunction<Integer>, SupportsInvoke {
  Integer produceResult(InternalRow row) {
    return produceResult(row.getString(0));
  }

  Integer produceResult(String str) {
    return str.length();
  }
}

See the bug? I forgot to use UTF8String instead of String. With the InternalRow method, I get a compiler warning because getString produces UTF8String that can’t be passed to produceResult(String). If I decided to implement length separately, then we could still run the InternalRow version and log a warning. The code would be slightly slower, but wouldn’t fail.

There are similar situations with varargs where it’s better to call methods that produce concrete types than to cast from Object to some expected type.

I think that using invoke is a great extension to the proposal, but I don’t think that it should be the only way to call functions. By all means, let’s work on it in parallel and use it where possible. But I think we do need the fallback of using InternalRow and that it isn’t a usability problem to include it.

Oh, and one last thought is that we already have users that call Dataset.map and use InternalRow. This would allow converting that code directly to a UDF.

I think we’re closer to agreeing here than it actually looks. Hopefully you’ll agree that having the InternalRow method isn’t a big usability problem.


On Wed, Feb 17, 2021 at 11:51 PM Wenchen Fan <[hidden email]> wrote:
I don't see any objections to the rest of the proposal (loading functions from the catalog, function binding stuff, etc.) and I assume everyone is OK with it. We can commit that part first.

Currently, the discussion focuses on the `ScalarFunction` API, where I think it's better to directly take the input columns as the UDF parameter, instead of wrapping the input columns with InternalRow and taking the InternalRow as the UDF parameter. It's not only for better performance, but also for ease of use. For example, it's easier for the UDF developer to write `input1 + input2` than `inputRow.getLong(0) + inputRow.getLong(1)`, as they don't need to specify the type and index by themselves (getLong(0)) which is error-prone.

It does push more work to the Spark side, but I think it's worth it if implementing UDF gets easier. I don't think the work is very challenging, as we can leverage the infra we built for the expression encoder.

I think it's also important to look at the UDF API from the user's perspective (UDF developers). How do you like the UDF API without considering how Spark can support it? Do you prefer the individual-parameters version or the row-parameter version?

To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

On Thu, Feb 18, 2021 at 4:48 AM Ryan Blue <[hidden email]> wrote:
Thanks, Hyukjin. I think that's a fair summary. And I agree with the idea that we should focus on what Spark will do by default.

I think we should focus on the proposal, for two reasons: first, there is a straightforward path to incorporate Wenchen's suggestion via `SupportsInvoke`, and second, the proposal is more complete: it defines a solution for many concerns like loading a function and finding out what types to use -- not just how to call code -- and supports more use cases like varargs functions. I think we can continue to discuss the rest of the proposal and be confident that we can support an invoke code path where it makes sense.

Does everyone agree? If not, I think we would need to solve a lot of the challenges that I initially brought up with the invoke idea. It seems like a good way to call a function, but needs a real proposal behind it if we don't use it via `SupportsInvoke` in the current proposal.

On Tue, Feb 16, 2021 at 11:07 PM Hyukjin Kwon <[hidden email]> wrote:

Just to make sure we don’t move past, I think we haven’t decided yet:

  • if we’ll replace the current proposal to Wenchen’s approach as the default
  • if we want to have Wenchen’s approach as an optional mix-in on the top of Ryan’s proposal (SupportsInvoke)

From what I read, some people pointed out it as a replacement. Please correct me if I misread this discussion thread.

As Dongjoon pointed out, it would be good to know rough ETA to make sure making progress in this, and people can compare more easily.


FWIW, there’s the saying I like in the zen of Python:

There should be one— and preferably only one —obvious way to do it.

If multiple approaches have the way for developers to do the (almost) same thing, I would prefer to avoid it.

In addition, I would prefer to focus on what Spark does by default first.


2021년 2월 17일 (수) 오후 2:33, Dongjoon Hyun <[hidden email]>님이 작성:
Hi, Wenchen.

This thread seems to get enough attention. Also, I'm expecting more and more if we have this on the `master` branch because we are developing together.

    > Spark SQL has many active contributors/committers and this thread doesn't get much attention yet.

So, what's your ETA from now?

    > I think the problem here is we were discussing some very detailed things without actual code.
    > I'll implement my idea after the holiday and then we can have more effective discussions.
    > We can also do benchmarks and get some real numbers.
    > In the meantime, we can continue to discuss other parts of this proposal, and make a prototype if possible.

I'm looking forward to seeing your PR. I hope we can conclude this thread and have at least one implementation in the `master` branch this month (February).
If you need more time (one month or longer), why don't we have Ryan's suggestion in the `master` branch first and benchmark with your PR later during Apache Spark 3.2 timeframe.

Bests,
Dongjoon.


On Tue, Feb 16, 2021 at 9:26 AM Ryan Blue <[hidden email]> wrote:
Andrew,

The proposal already includes an API for aggregate functions and I think we would want to implement those right away.

Processing ColumnBatch is something we can easily extend the interfaces to support, similar to Wenchen's suggestion. The important thing right now is to agree on some basic functionality: how to look up functions and what the simple API should be. Like the TableCatalog interfaces, we will layer on more support through optional interfaces like `SupportsInvoke` or `SupportsColumnBatch`.

On Tue, Feb 16, 2021 at 9:00 AM Andrew Melo <[hidden email]> wrote:
Hello Ryan,

This proposal looks very interesting. Would future goals for this
functionality include both support for aggregation functions, as well
as support for processing ColumnBatch-es (instead of Row/InternalRow)?

Thanks
Andrew

On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue <[hidden email]> wrote:
>
> Thanks for the positive feedback, everyone. It sounds like there is a clear path forward for calling functions. Even without a prototype, the `invoke` plans show that Wenchen's suggested optimization can be done, and incorporating it as an optional extension to this proposal solves many of the unknowns.
>
> With that area now understood, is there any discussion about other parts of the proposal, besides the function call interface?
>
> On Fri, Feb 12, 2021 at 10:40 PM Chao Sun <[hidden email]> wrote:
>>
>> This is an important feature which can unblock several other projects including bucket join support for DataSource v2, complete support for enforcing DataSource v2 distribution requirements on the write path, etc. I like Ryan's proposals which look simple and elegant, with nice support on function overloading and variadic arguments. On the other hand, I think Wenchen made a very good point about performance. Overall, I'm excited to see active discussions on this topic and believe the community will come to a proposal with the best of both sides.
>>
>> Chao
>>
>> On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon <[hidden email]> wrote:
>>>
>>> +1 for Liang-chi's.
>>>
>>> Thanks Ryan and Wenchen for leading this.
>>>
>>>
>>> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh <[hidden email]>님이 작성:
>>>>
>>>> Basically I think the proposal makes sense to me and I'd like to support the
>>>> SPIP as it looks like we have strong need for the important feature.
>>>>
>>>> Thanks Ryan for working on this and I do also look forward to Wenchen's
>>>> implementation. Thanks for the discussion too.
>>>>
>>>> Actually I think the SupportsInvoke proposed by Ryan looks a good
>>>> alternative to me. Besides Wenchen's alternative implementation, is there a
>>>> chance we also have the SupportsInvoke for comparison?
>>>>
>>>>
>>>> John Zhuge wrote
>>>> > Excited to see our Spark community rallying behind this important feature!
>>>> >
>>>> > The proposal lays a solid foundation of minimal feature set with careful
>>>> > considerations for future optimizations and extensions. Can't wait to see
>>>> > it leading to more advanced functionalities like views with shared custom
>>>> > functions, function pushdown, lambda, etc. It has already borne fruit from
>>>> > the constructive collaborations in this thread. Looking forward to
>>>> > Wenchen's prototype and further discussions including the SupportsInvoke
>>>> > extension proposed by Ryan.
>>>> >
>>>> >
>>>> > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley &lt;
>>>>
>>>> > owen.omalley@
>>>>
>>>> > &gt;
>>>> > wrote:
>>>> >
>>>> >> I think this proposal is a very good thing giving Spark a standard way of
>>>> >> getting to and calling UDFs.
>>>> >>
>>>> >> I like having the ScalarFunction as the API to call the UDFs. It is
>>>> >> simple, yet covers all of the polymorphic type cases well. I think it
>>>> >> would
>>>> >> also simplify using the functions in other contexts like pushing down
>>>> >> filters into the ORC & Parquet readers although there are a lot of
>>>> >> details
>>>> >> that would need to be considered there.
>>>> >>
>>>> >> .. Owen
>>>> >>
>>>> >>
>>>> >> On Fri, Feb 12, 2021 at 11:07 PM Erik Krogen &lt;
>>>>
>>>> > ekrogen@.com
>>>>
>>>> > &gt;
>>>> >> wrote:
>>>> >>
>>>> >>> I agree that there is a strong need for a FunctionCatalog within Spark
>>>> >>> to
>>>> >>> provide support for shareable UDFs, as well as make movement towards
>>>> >>> more
>>>> >>> advanced functionality like views which themselves depend on UDFs, so I
>>>> >>> support this SPIP wholeheartedly.
>>>> >>>
>>>> >>> I find both of the proposed UDF APIs to be sufficiently user-friendly
>>>> >>> and
>>>> >>> extensible. I generally think Wenchen's proposal is easier for a user to
>>>> >>> work with in the common case, but has greater potential for confusing
>>>> >>> and
>>>> >>> hard-to-debug behavior due to use of reflective method signature
>>>> >>> searches.
>>>> >>> The merits on both sides can hopefully be more properly examined with
>>>> >>> code,
>>>> >>> so I look forward to seeing an implementation of Wenchen's ideas to
>>>> >>> provide
>>>> >>> a more concrete comparison. I am optimistic that we will not let the
>>>> >>> debate
>>>> >>> over this point unreasonably stall the SPIP from making progress.
>>>> >>>
>>>> >>> Thank you to both Wenchen and Ryan for your detailed consideration and
>>>> >>> evaluation of these ideas!
>>>> >>> ------------------------------
>>>> >>> *From:* Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> *Sent:* Wednesday, February 10, 2021 6:06 PM
>>>> >>> *To:* Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt;
>>>> >>> *Cc:* Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;; Hyukjin Kwon <
>>>> >>>
>>>>
>>>> > gurwls223@
>>>>
>>>> >>; Spark Dev List &lt;
>>>>
>>>> > dev@.apache
>>>>
>>>> > &gt;; Wenchen Fan
>>>> >>> &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt;
>>>> >>> *Subject:* Re: [DISCUSS] SPIP: FunctionCatalog
>>>> >>>
>>>> >>> BTW, I forgot to add my opinion explicitly in this thread because I was
>>>> >>> on the PR before this thread.
>>>> >>>
>>>> >>> 1. The `FunctionCatalog API` PR was made on May 9, 2019 and has been
>>>> >>> there for almost two years.
>>>> >>> 2. I already gave my +1 on that PR last Saturday because I agreed with
>>>> >>> the latest updated design docs and AS-IS PR.
>>>> >>>
>>>> >>> And, the rest of the progress in this thread is also very satisfying to
>>>> >>> me.
>>>> >>> (e.g. Ryan's extension suggestion and Wenchen's alternative)
>>>> >>>
>>>> >>> To All:
>>>> >>> Please take a look at the design doc and the PR, and give us some
>>>> >>> opinions.
>>>> >>> We really need your participation in order to make DSv2 more complete.
>>>> >>> This will unblock other DSv2 features, too.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:58 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Hi, Ryan.
>>>> >>>
>>>> >>> We didn't move past anything (both yours and Wenchen's). What Wenchen
>>>> >>> suggested is double-checking the alternatives with the implementation to
>>>> >>> give more momentum to our discussion.
>>>> >>>
>>>> >>> Your new suggestion about optional extention also sounds like a new
>>>> >>> reasonable alternative to me.
>>>> >>>
>>>> >>> We are still discussing this topic together and I hope we can make a
>>>> >>> conclude at this time (for Apache Spark 3.2) without being stucked like
>>>> >>> last time.
>>>> >>>
>>>> >>> I really appreciate your leadership in this dicsussion and the moving
>>>> >>> direction of this discussion looks constructive to me. Let's give some
>>>> >>> time
>>>> >>> to the alternatives.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:14 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> I don’t think we should so quickly move past the drawbacks of this
>>>> >>> approach. The problems are significant enough that using invoke is not
>>>> >>> sufficient on its own. But, I think we can add it as an optional
>>>> >>> extension
>>>> >>> to shore up the weaknesses.
>>>> >>>
>>>> >>> Here’s a summary of the drawbacks:
>>>> >>>
>>>> >>>    - Magic function signatures are error-prone
>>>> >>>    - Spark would need considerable code to help users find what went
>>>> >>>    wrong
>>>> >>>    - Spark would likely need to coerce arguments (e.g., String,
>>>> >>>    Option[Int]) for usability
>>>> >>>    - It is unclear how Spark will find the Java Method to call
>>>> >>>    - Use cases that require varargs fall back to casting; users will
>>>> >>>    also get this wrong (cast to String instead of UTF8String)
>>>> >>>    - The non-codegen path is significantly slower
>>>> >>>
>>>> >>> The benefit of invoke is to avoid moving data into a row, like this:
>>>> >>>
>>>> >>> -- using invoke
>>>> >>> int result = udfFunction(x, y)
>>>> >>>
>>>> >>> -- using row
>>>> >>> udfRow.update(0, x); -- actual: values[0] = x;
>>>> >>> udfRow.update(1, y);
>>>> >>> int result = udfFunction(udfRow);
>>>> >>>
>>>> >>> And, again, that won’t actually help much in cases that require varargs.
>>>> >>>
>>>> >>> I suggest we add a new marker trait for BoundMethod called
>>>> >>> SupportsInvoke.
>>>> >>> If that interface is implemented, then Spark will look for a method that
>>>> >>> matches the expected signature based on the bound input type. If it
>>>> >>> isn’t
>>>> >>> found, Spark can print a warning and fall back to the InternalRow call:
>>>> >>> “Cannot find udfFunction(int, int)”.
>>>> >>>
>>>> >>> This approach allows the invoke optimization, but solves many of the
>>>> >>> problems:
>>>> >>>
>>>> >>>    - The method to invoke is found using the proposed load and bind
>>>> >>>    approach
>>>> >>>    - Magic function signatures are optional and do not cause runtime
>>>> >>>    failures
>>>> >>>    - Because this is an optional optimization, Spark can be more strict
>>>> >>>    about types
>>>> >>>    - Varargs cases can still use rows
>>>> >>>    - Non-codegen can use an evaluation method rather than falling back
>>>> >>>    to slow Java reflection
>>>> >>>
>>>> >>> This seems like a good extension to me; this provides a plan for
>>>> >>> optimizing the UDF call to avoid building a row, while the existing
>>>> >>> proposal covers the other cases well and addresses how to locate these
>>>> >>> function calls.
>>>> >>>
>>>> >>> This also highlights that the approach used in DSv2 and this proposal is
>>>> >>> working: start small and use extensions to layer on more complex
>>>> >>> support.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 9:04 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Thank you all for making a giant move forward for Apache Spark 3.2.0.
>>>> >>> I'm really looking forward to seeing Wenchen's implementation.
>>>> >>> That would be greatly helpful to make a decision!
>>>> >>>
>>>> >>> > I'll implement my idea after the holiday and then we can have
>>>> >>> more effective discussions. We can also do benchmarks and get some real
>>>> >>> numbers.
>>>> >>> > FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067978066%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=iMWmHqqXPcT7EK%2Bovyzhy%2BZpU6Llih%2BwdZD53wvobmc%3D&amp;reserved=0&gt;
>>>> >>> also
>>>> >>> takes individual parameters instead of the row parameter. I think this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>> On Tue, Feb 9, 2021 at 10:18 PM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=ZSBCR7yx3PpwL4KY9V73JG42Z02ZodqkjxC0LweHt1g%3D&amp;reserved=0&gt;
>>>> >>> also takes individual parameters instead of the row parameter. I think
>>>> >>> this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:18 AM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi Holden,
>>>> >>>
>>>> >>> As Hyukjin said, following existing designs is not the principle of DS
>>>> >>> v2
>>>> >>> API design. We should make sure the DS v2 API makes sense. AFAIK we
>>>> >>> didn't
>>>> >>> fully follow the catalog API design from Hive and I believe Ryan also
>>>> >>> agrees with it.
>>>> >>>
>>>> >>> I think the problem here is we were discussing some very detailed things
>>>> >>> without actual code. I'll implement my idea after the holiday and then
>>>> >>> we
>>>> >>> can have more effective discussions. We can also do benchmarks and get
>>>> >>> some
>>>> >>> real numbers.
>>>> >>>
>>>> >>> In the meantime, we can continue to discuss other parts of this
>>>> >>> proposal,
>>>> >>> and make a prototype if possible. Spark SQL has many active
>>>> >>> contributors/committers and this thread doesn't get much attention yet.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 6:17 AM Hyukjin Kwon &lt;
>>>>
>>>> > gurwls223@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Just dropping a few lines. I remember that one of the goals in DSv2 is
>>>> >>> to
>>>> >>> correct the mistakes we made in the current Spark codes.
>>>> >>> It would not have much point if we will happen to just follow and mimic
>>>> >>> what Spark currently does. It might just end up with another copy of
>>>> >>> Spark
>>>> >>> APIs, e.g. Expression (internal) APIs. I sincerely would like to avoid
>>>> >>> this
>>>> >>> I do believe we have been stuck mainly due to trying to come up with a
>>>> >>> better design. We already have an ugly picture of the current Spark APIs
>>>> >>> to
>>>> >>> draw a better bigger picture.
>>>> >>>
>>>> >>>
>>>> >>> 2021년 2월 10일 (수) 오전 3:28, Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;님이 작성:
>>>> >>>
>>>> >>> I think this proposal is a good set of trade-offs and has existed in the
>>>> >>> community for a long period of time. I especially appreciate how the
>>>> >>> design
>>>> >>> is focused on a minimal useful component, with future optimizations
>>>> >>> considered from a point of view of making sure it's flexible, but actual
>>>> >>> concrete decisions left for the future once we see how this API is used.
>>>> >>> I
>>>> >>> think if we try and optimize everything right out of the gate, we'll
>>>> >>> quickly get stuck (again) and not make any progress.
>>>> >>>
>>>> >>> On Mon, Feb 8, 2021 at 10:46 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi everyone,
>>>> >>>
>>>> >>> I'd like to start a discussion for adding a FunctionCatalog interface to
>>>> >>> catalog plugins. This will allow catalogs to expose functions to Spark,
>>>> >>> similar to how the TableCatalog interface allows a catalog to expose
>>>> >>> tables. The proposal doc is available here:
>>>> >>> https://docs.google.com/document/d/1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U/edit
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U%2Fedit&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=Kyth8%2FhNUZ6GXG2FsgcknZ7t7s0%2BpxnDMPyxvsxLLqE%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Here's a high-level summary of some of the main design choices:
>>>> >>> * Adds the ability to list and load functions, not to create or modify
>>>> >>> them in an external catalog
>>>> >>> * Supports scalar, aggregate, and partial aggregate functions
>>>> >>> * Uses load and bind steps for better error messages and simpler
>>>> >>> implementations
>>>> >>> * Like the DSv2 table read and write APIs, it uses InternalRow to pass
>>>> >>> data
>>>> >>> * Can be extended using mix-in interfaces to add vectorization, codegen,
>>>> >>> and other future features
>>>> >>>
>>>> >>> There is also a PR with the proposed API:
>>>> >>> https://github.com/apache/spark/pull/24559/files
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F24559%2Ffiles&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=t3ZCqffdsrmCY3X%2FT8x1oMjMcNUiQ0wQNk%2ByAXQx1Io%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Let's discuss the proposal here rather than on that PR, to get better
>>>> >>> visibility. Also, please take the time to read the proposal first. That
>>>> >>> really helps clear up misconceptions.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Twitter: https://twitter.com/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=fVfSPIyazuUYv8VLfNu%2BUIHdc3ePM1AAKKH%2BlnIicF8%3D&amp;reserved=0&gt;
>>>> >>> Books (Learning Spark, High Performance Spark, etc.):
>>>> >>> https://amzn.to/2MaRAG9
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Famzn.to%2F2MaRAG9&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=NbRl9kK%2B6Wy0jWmDnztYp3JCPNLuJvmFsLHUrXzEhlk%3D&amp;reserved=0&gt;
>>>> >>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.youtube.com%2Fuser%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060068007935%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=OWXOBELzO3hBa2JI%2FOSBZ3oNyLq0yr%2FGXMkNn7bqYDM%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >
>>>> > --
>>>> > John Zhuge
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: [hidden email]
>>>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

xkrogen
> Correct me if I'm wrong, but it appears we've basically agreed upon the APIs proposed in the SPIP (forget the naming part):

I don't think that's the case. Wenchen's proposal is that the primary API is one discovered via reflection which detects methods by their types. The InternalRow API would be the add-on for supporting varargs. This is in opposition to Ryan's proposal which says the primary API is the InternalRow API, with a reflective API being the add-on. This is important because of Wenchen's point about forcing users to implement the InternalRow API even if they prefer the reflective API.

> > I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row.
> I have a different opinion about this. If the input is string type but the UDF implementation calls `row.getLong(0)`, it returns wrong data, which is very bad. With the individual-parameters approach, if you implement UDF with `def call(input: Long)` but the input is string type, analyzer can detect it and fail the query.

I am in agreement with Wenchen on this point. I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety. I believe that Wenchen's proposal will provide stronger query-compile-time safety (i.e. fewer runtime issues) at the expense of less Java-compile-time safety, which seems like a good tradeoff. This also pushes more complexity onto the Spark implementation side for the purposes of reflectively discovering methods and applying casts as necessary, but again, I see this as a good tradeoff for providing what seems to me to be a more user-friendly (albeit slightly more "magical") API.

The biggest questions to me are whether the Spark-side implementation for the reflective API will become too complex to implement well (one of the strengths of the InternalRow API is its simplicity), and whether type erasure will hurt the ability to do reasonable reflective discovery on complex types. To resolve these, I would love to see a POC of Wenchen's proposal.

I am supportive of moving forward with committing a version of the PR that does not include the UDF APIs to make concrete progress towards this SPIP while the discussion plays out, but I also feel Ryan's concern that the API is too integral to the SPIP to move forward without it is reasonable. At this time I'm not supportive of merging the PR as-is because I do not think the API debate has been reasonably settled and it will inevitably be harder to change later rather than getting right the first time.

Really appreciate the active and productive discussion on both sides here!
Thanks,
Erik

On Fri, Feb 26, 2021 at 3:38 PM Chao Sun <[hidden email]> wrote:
Correct me if I'm wrong, but it appears we've basically agreed upon the APIs proposed in the SPIP (forget the naming part):

interface ScalarFunction extends BoundFunction<R> { 
  R produceResult(InternalRow args); 

interface AggregateFunction<S, R> extends BoundFunction<R> { 
  S update(S state, InternalRow input); 
}

together with the rest of the design such as FunctionCatalog and binding process. 

The argument at the moment seems to be whether we want to have SupportsInvoke or [Scalar|Aggregate]FunctionN alongside these, is that correct?
 In order to move this forward, perhaps we can merge the PR as it is (maybe we'll need a vote?) and proceed to discuss these topics? We can also then present separate PRs on top of it, which can help a lot for people within this thread to provide comments.

WDYT?

Best,
Chao

On Wed, Feb 24, 2021 at 10:45 PM Wenchen Fan <[hidden email]> wrote:
I think there is one agreement between us: we need both the individual-parameters and row-parameter APIs(your SupportsInvoke proposal and my VarargsScalarFunction proposal). IIUC the argument now is how to compose these 2 APIs.

Your proposal is to put the row-parameter API in the base ScalaFunction interface, with an optional SupportsInvoke interface for the individual-parameters API. I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

My proposal is to leave the choice to the users. They can pick one from ScalarFunction0ScalarFunction1, ..., VarargsScalarFunction.

More replies below:

> We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

I don't think we agree with it. Whatever UDF API we choose at the end (either individual-parameters or row-parameter), both non-codegen and codegen code paths should just call these Java methods from the UDF API. It doesn't make sense to have different UDF APIs for non-codegen and codegen.

> The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

My initial idea is to not have these 9 interfaces and fully rely on Java reflection. We can do some benchmark, if reflection is not that slow, I think we don't need to add these 9 interfaces. Preso UDF API takes the same approach. And one correction: my proposal is to use InternalRow for varargs UDF, not Object[].

> Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement

You can take a look at the Spark ScalaUDF expression. It has a big match statement for the non-codegen path, but the codegen path is much simpler because we can generate the exact Java code to call the specific UDF. I don't think it's a big problem, or we can use reflection in the non-codegen path to avoid the big match statement.

> Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.

You seem to keep ignoring my proposal that we can check the UDF function signature at the analysis phase to make sure it matches the input types. And with codegen Spark can call the specific function to avoid boxing issues. If you missed my previous example, here is what the generated code looks like:

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

> I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row.

I have a different opinion about this. If the input is string type but the UDF implementation calls `row.getLong(0)`, it returns wrong data, which is very bad. With the individual-parameters approach, if you implement UDF with `def call(input: Long)` but the input is string type, analyzer can detect it and fail the query.

On Thu, Feb 25, 2021 at 6:48 AM Ryan Blue <[hidden email]> wrote:

How functions are called is a really big element of this effort. I don’t want to get in a position where we’ve started committing changes without clear agreement on something so fundamental to the proposal. I’d like to make sure we’re in agreement with a vote on the SPIP before committing anything. That is, after all, the point of the SPIPs.

If people think it would help to have an alternative API in a PR, then that’s fine with me.

Since that PR suggestion is intended to make it easier to understand the technical details, I’ll try to summarize where we’re at now:

  • We agree on the scope of adding FunctionCatalog to load functions
  • We agree with the FunctionCatalog methods and the function binding approach
  • We agree that a bound function will be either a ScalarFunction or an AggregateFunction (plus the mix-in PartialAggregateFunction)
  • We agree that values should be passed should be Spark’s internal representation to avoid translation
  • We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

The disagreement is about how to call functions when codegen isn’t used or when the function needs to support variable-length argument lists. There are two options:

The first option is for each function to have a method that accepts an InternalRow, from the proposed SPIP:

interface ScalarFunction extends BoundFunction<R> {
  R produceResult(InternalRow input);
}
interface AggregateFunction<S> extends BoundFunction<R> {
  S update(S state, InternalRow input);
  ...
}

The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

interface ScalarFunction1<T1> extends BoundFunction<R> {
  R produceResult(T1 one);
}
interface ScalarFunction2<T1, T2> extends BoundFunction<R> {
  R produceResult(T1 one, T2 two);
}
... 8 more ScalarFunction interfaces
interface ScalarFunctionVarargs extends BoundFunction<R> {
  R produceResult(Object[] args);
}
interface AggregateFunction<S, T1> extends BoundFunction<R> {
  S update(S state, T1 one);
}
interface AggregateFunction<S, T1, T2> extends BoundFunction<R> {
  S update(S state, T1 one, T2 two);
}
... 8 more AggregateFunction interfaces
interface AggregateFunctionVarargs<S> extends BoundFunction<R> {
  S update(S state, Object[] args);
}

Because this is for the non-invoke case, the two options have roughly the same performance characteristics.

The first option has some advantages:

  • It is simpler: there are few interfaces and Spark will always find the right method
  • Accessing a value returns a concrete type, so it is less error-prone. I’ve given an example where this helps identify a problem with an invoke method.

The second option’s advantage is that users have values broken out into arguments. That is, if I understand Wenchen correctly here: “I don’t like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers.”

Disadvantages with the second option:

  • There are 20+ more interfaces in the API
  • Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement that calls each interface separately (see UDFRegistration).
  • Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.
  • The varargs case will result in casting to expected types, which could also fail with ClassCastException

I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row. You get compile-time checks when using the wrong type like this: String val = input.getString(0); won’t compile.

Another important thing to note is that although the original idea was to keep the individual parameter approach simple, Wenchen has already suggested passing arrays as Java arrays, like UTF8String[]. This adds to the complexity of the overall solution and requires matching multiple types. How would Spark know to pass UTF8String[] or ArrayData?

If anyone disagrees with that summary, please point out where it’s incorrect. But barring a major misunderstanding, I think the choice is clear: the simpler approach that provides additional compile-time safety is the right way to go.


On Tue, Feb 23, 2021 at 1:48 AM Wenchen Fan <[hidden email]> wrote:
+1, as I already proposed we can move forward with PRs

> To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

Ryan, can we focus on the function loading and binding part and get it committed first? I can also fork your branch and put everything together, but that might be too big to review.

On Tue, Feb 23, 2021 at 4:35 PM Dongjoon Hyun <[hidden email]> wrote:
I've been still supporting Ryan's SPIP (original PR and its extension proposal discussed here) because of its simplicity.

According to this email thread context, I also understand the different perspectives like Hyukjin's concerns about having multiple ways and Wenchen's proposal and rationales.

It looks like we need more discussion to reach an agreement. And the technical details become more difficult to track because this is an email thread.

Although Ryan initially suggested discussing this on Apache email thread instead of the PR, can we have a PR to discuss?

Especially, Wenchen, could you make your PR based on Ryan's PR?

If we collect the scattered ideas into a single PR, that would be greatly helpful not only for further discussions, but also when we go on a vote on Ryan's PR or Wenchen's PR.

Bests,
Dongjoon.


On Mon, Feb 22, 2021 at 1:16 AM Wenchen Fan <[hidden email]> wrote:
Hi Walaa,

Thanks for sharing this! The type signature stuff is already covered by the unbound UDF API, which specifies the input and output data types. The problem is how to check the method signature of the bound UDF. As you said, Java has type erasure and we can't check `List<String>` for example.

My initial proposal is to do nothing and simply pass the Spark ArrayData, MapData, InternalRow to the UDF. This requires the UDF developers to ensure the type is matched, as they need to call something like `array.getLong(index)` with the corrected type name. It's as worse as the row-parameter version but seems fine as it only happens with nested types. And the type check is still done for the first level (the method signature must use ArrayData/MapData/InternalRow at least).

We can allow more types in the future to make the type check better. It might be too detailed for this discussion thread but just put a few thoughts:
1. Java array doesn't do type erasure. We can use UTF8String[] for example if the input type is array of string.
2. For struct type, we can allow Java beans/Scala case classes if the field name and type match the type signature.
3. For map type, it's actually struct<keys: array<key_type>, values: array<value_type>>, so we can also allow Java beans/Scala case classes here.

The general idea is to use stuff that can retain nested type information at compile-time, i.e. array, java bean, case classes.

Thanks,
Wenchen



On Mon, Feb 22, 2021 at 3:47 PM Walaa Eldin Moustafa <[hidden email]> wrote:
Wenchen, in Transport, users provide the input parameter signatures and output parameter signature as part of the API. Compile-time checks are done by parsing the type signatures and matching them to the type tree received at compile-time. This also helps with inferring the concrete output type.

The specification in the UDF API looks like this:

  @Override
  public List<String> getInputParameterSignatures() {
    return ImmutableList.of(
        "ARRAY(K)",
        "ARRAY(V)"
    );
  }

  @Override
  public String getOutputParameterSignature() {
    return "MAP(K,V)";
  }

The benefits of this type of type signature specification as opposed to inferring types from Java type signatures given in the Java method are:
  • For nested types, Java type erasure eliminates the information about nested types, so for something like my_function(List<String> a1, List<Integer> a2), the value of both a1.class or a2.class is just a List. However, we are planning to work around this in a future version in the case of Array and Map types. Struct types are discussed in the next point.
  • Without pre-code-generation there is no single Java type signature from which we can capture the Struct info. However, Struct info can be expressed in type signatures of the above type, e.g., ROW(FirstName VARCHAR, LastName VARCHAR).
When a Transport UDF represents a Spark UDF, the type signatures are matched against Spark native types, i.e., org.apache.spark.sql.types.{ArrayType, MapType, StructType}, and primitive types. The function that parses/compiles type signatures is found in AbstractTypeInference. This class represents the generic component that is common between all supported engines. Its Spark-specific extension is in SparkTypeInference. In the above example, at compile time, if the first Array happens to be of String element type, and the second Array happens to be of Integer element type, the UDF will communicate to the Spark analyzer that the output should be of type MapData<String, Integer> (i.e., based on what was seen in the input at compile time). The whole UDF becomes a Spark Expression at the end of the day.

Thanks,
Walaa.


On Sun, Feb 21, 2021 at 7:26 PM Wenchen Fan <[hidden email]> wrote:
I think I have made it clear that it's simpler for the UDF developers to deal with the input parameters directly, instead of getting them from a row, as you need to provide the index and type (e.g. row.getLong(0)). It's also coherent with the existing Spark Scala/Java UDF APIs, so that Spark users will be more familiar with the individual-parameters API.

And I have explained it already that we can use reflection to make sure the defined methods have the right types at query-compilation time. It's better than leaving this problem to the UDF developers and asking them to ensure the inputs are gotten from the row correctly with index and type. If there are people from Presto/Transport, it will be great if you can share how Presto/Transport do this check.

I don't like 22 additional interfaces too, but if you look at the examples I gave, the current Spark Java UDF only has 9 interfaces, and Transport has 8. I think it's good enough and people can use VarargsScalarFunction if they need to take more parameters or varargs. It resolves your concern about doing reflection in the non-codegen execution path that leads to bad performance, it also serves for documentation purpose as people can easily see the number of UDF inputs and their types by a quick glance.

As I said, we need to investigate how to avoid boxing. Since you are asking the question now, I spent sometime to think about it. I think the DoubleAdd example is the way to go. For non-codegen code path, we can just call the interface method. For the codegen code path, the generated Java code would look like (omit the null check logic):

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

Which invokes the primitive version automatically. AFAIK this is also how Scala supports primitive type parameter (generate an extra non-boxing version of the method). If the UDF doesn't have the primtive version method, this code will just call the boxed version and still works.

I don't like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers. Can other people share your opinions about the API?

On Sat, Feb 20, 2021 at 5:32 AM Ryan Blue <[hidden email]> wrote:

I don’t see any benefit to more complexity with 22 additional interfaces, instead of simply passing an InternalRow. Why not use a single interface with InternalRow? Maybe you could share your motivation?

That would also result in strange duplication, where the ScalarFunction2 method is just the boxed version:

class DoubleAdd implements ScalarFunction2<Double, Double, Double> {
  @Override
  Double produceResult(Double left, Double right) {
    return left + right;
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

This would work okay, but would be awkward if you wanted to use the same implementation for any number of arguments, like a sum method that adds all of the arguments together and returns the result. It also isn’t great for varargs, since it is basically the same as the invoke case.

The combination of an InternalRow method and the invoke method seems to be a good way to handle this to me. What is wrong with it? And, how would you solve the problem when implementations define methods with the wrong types? The InternalRow approach helps implementations catch that problem (as demonstrated above) and also provides a fallback when there is a but preventing the invoke optimization from working. That seems like a good approach to me.


On Thu, Feb 18, 2021 at 11:31 PM Wenchen Fan <[hidden email]> wrote:
If people have such a big concern about reflection, we can follow the current Spark Java UDF and Transport, and create ScalarFuncion0[R]ScalarFuncion1[T1, R], etc. to avoid reflection. But we may need to investigate how to avoid boxing with this API design.

To put a detailed proposal: let's have ScalarFuncion0ScalarFuncion1, ..., ScalarFuncion9 and VarargsScalarFunction. At execution time, if Spark sees ScalarFuncion0-9, pass the input columns to the UDF directly, one column one parameter. So string type input is UTF8String, array type input is ArrayData. If Spark sees VarargsScalarFunction, wrap the input columns with InternalRow and pass it to the UDF.

In general, if VarargsScalarFunction is implemented, the UDF should not implement ScalarFuncion0-9. We can also define a priority order to allow this. I don't have a strong preference here.

What do you think?

On Fri, Feb 19, 2021 at 1:24 PM Walaa Eldin Moustafa <[hidden email]> wrote:
I agree with Ryan on the questions around the expressivity of the Invoke method. It is not clear to me how the Invoke method can be used to declare UDFs with type-parameterized parameters. For example: a UDF to get the Nth element of an array (regardless of the Array element type) or a UDF to merge two Arrays (of generic types) to a Map.

Also, to address Wenchen's InternalRow question, can we create a number of Function classes, each corresponding to a number of input parameter length (e.g., ScalarFunction1, ScalarFunction2, etc)?

Thanks,
Walaa.
 

On Thu, Feb 18, 2021 at 6:07 PM Ryan Blue <[hidden email]> wrote:

I agree with you that it is better in many cases to directly call a method. But it it not better in all cases, which is why I don’t think it is the right general-purpose choice.

First, if codegen isn’t used for some reason, the reflection overhead is really significant. That gets much better when you have an interface to call. That’s one reason I’d use this pattern:

class DoubleAdd implements ScalarFunction<Double>, SupportsInvoke {
  Double produceResult(InternalRow row) {
    return produceResult(row.getDouble(0), row.getDouble(1));
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

There’s little overhead to adding the InternalRow variation, but we could call it in eval to avoid the reflect overhead. To the point about UDF developers, I think this is a reasonable cost.

Second, I think usability is better and helps avoid runtime issues. Here’s an example:

class StrLen implements ScalarFunction<Integer>, SupportsInvoke {
  Integer produceResult(InternalRow row) {
    return produceResult(row.getString(0));
  }

  Integer produceResult(String str) {
    return str.length();
  }
}

See the bug? I forgot to use UTF8String instead of String. With the InternalRow method, I get a compiler warning because getString produces UTF8String that can’t be passed to produceResult(String). If I decided to implement length separately, then we could still run the InternalRow version and log a warning. The code would be slightly slower, but wouldn’t fail.

There are similar situations with varargs where it’s better to call methods that produce concrete types than to cast from Object to some expected type.

I think that using invoke is a great extension to the proposal, but I don’t think that it should be the only way to call functions. By all means, let’s work on it in parallel and use it where possible. But I think we do need the fallback of using InternalRow and that it isn’t a usability problem to include it.

Oh, and one last thought is that we already have users that call Dataset.map and use InternalRow. This would allow converting that code directly to a UDF.

I think we’re closer to agreeing here than it actually looks. Hopefully you’ll agree that having the InternalRow method isn’t a big usability problem.


On Wed, Feb 17, 2021 at 11:51 PM Wenchen Fan <[hidden email]> wrote:
I don't see any objections to the rest of the proposal (loading functions from the catalog, function binding stuff, etc.) and I assume everyone is OK with it. We can commit that part first.

Currently, the discussion focuses on the `ScalarFunction` API, where I think it's better to directly take the input columns as the UDF parameter, instead of wrapping the input columns with InternalRow and taking the InternalRow as the UDF parameter. It's not only for better performance, but also for ease of use. For example, it's easier for the UDF developer to write `input1 + input2` than `inputRow.getLong(0) + inputRow.getLong(1)`, as they don't need to specify the type and index by themselves (getLong(0)) which is error-prone.

It does push more work to the Spark side, but I think it's worth it if implementing UDF gets easier. I don't think the work is very challenging, as we can leverage the infra we built for the expression encoder.

I think it's also important to look at the UDF API from the user's perspective (UDF developers). How do you like the UDF API without considering how Spark can support it? Do you prefer the individual-parameters version or the row-parameter version?

To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

On Thu, Feb 18, 2021 at 4:48 AM Ryan Blue <[hidden email]> wrote:
Thanks, Hyukjin. I think that's a fair summary. And I agree with the idea that we should focus on what Spark will do by default.

I think we should focus on the proposal, for two reasons: first, there is a straightforward path to incorporate Wenchen's suggestion via `SupportsInvoke`, and second, the proposal is more complete: it defines a solution for many concerns like loading a function and finding out what types to use -- not just how to call code -- and supports more use cases like varargs functions. I think we can continue to discuss the rest of the proposal and be confident that we can support an invoke code path where it makes sense.

Does everyone agree? If not, I think we would need to solve a lot of the challenges that I initially brought up with the invoke idea. It seems like a good way to call a function, but needs a real proposal behind it if we don't use it via `SupportsInvoke` in the current proposal.

On Tue, Feb 16, 2021 at 11:07 PM Hyukjin Kwon <[hidden email]> wrote:

Just to make sure we don’t move past, I think we haven’t decided yet:

  • if we’ll replace the current proposal to Wenchen’s approach as the default
  • if we want to have Wenchen’s approach as an optional mix-in on the top of Ryan’s proposal (SupportsInvoke)

From what I read, some people pointed out it as a replacement. Please correct me if I misread this discussion thread.

As Dongjoon pointed out, it would be good to know rough ETA to make sure making progress in this, and people can compare more easily.


FWIW, there’s the saying I like in the zen of Python:

There should be one— and preferably only one —obvious way to do it.

If multiple approaches have the way for developers to do the (almost) same thing, I would prefer to avoid it.

In addition, I would prefer to focus on what Spark does by default first.


2021년 2월 17일 (수) 오후 2:33, Dongjoon Hyun <[hidden email]>님이 작성:
Hi, Wenchen.

This thread seems to get enough attention. Also, I'm expecting more and more if we have this on the `master` branch because we are developing together.

    > Spark SQL has many active contributors/committers and this thread doesn't get much attention yet.

So, what's your ETA from now?

    > I think the problem here is we were discussing some very detailed things without actual code.
    > I'll implement my idea after the holiday and then we can have more effective discussions.
    > We can also do benchmarks and get some real numbers.
    > In the meantime, we can continue to discuss other parts of this proposal, and make a prototype if possible.

I'm looking forward to seeing your PR. I hope we can conclude this thread and have at least one implementation in the `master` branch this month (February).
If you need more time (one month or longer), why don't we have Ryan's suggestion in the `master` branch first and benchmark with your PR later during Apache Spark 3.2 timeframe.

Bests,
Dongjoon.


On Tue, Feb 16, 2021 at 9:26 AM Ryan Blue <[hidden email]> wrote:
Andrew,

The proposal already includes an API for aggregate functions and I think we would want to implement those right away.

Processing ColumnBatch is something we can easily extend the interfaces to support, similar to Wenchen's suggestion. The important thing right now is to agree on some basic functionality: how to look up functions and what the simple API should be. Like the TableCatalog interfaces, we will layer on more support through optional interfaces like `SupportsInvoke` or `SupportsColumnBatch`.

On Tue, Feb 16, 2021 at 9:00 AM Andrew Melo <[hidden email]> wrote:
Hello Ryan,

This proposal looks very interesting. Would future goals for this
functionality include both support for aggregation functions, as well
as support for processing ColumnBatch-es (instead of Row/InternalRow)?

Thanks
Andrew

On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue <[hidden email]> wrote:
>
> Thanks for the positive feedback, everyone. It sounds like there is a clear path forward for calling functions. Even without a prototype, the `invoke` plans show that Wenchen's suggested optimization can be done, and incorporating it as an optional extension to this proposal solves many of the unknowns.
>
> With that area now understood, is there any discussion about other parts of the proposal, besides the function call interface?
>
> On Fri, Feb 12, 2021 at 10:40 PM Chao Sun <[hidden email]> wrote:
>>
>> This is an important feature which can unblock several other projects including bucket join support for DataSource v2, complete support for enforcing DataSource v2 distribution requirements on the write path, etc. I like Ryan's proposals which look simple and elegant, with nice support on function overloading and variadic arguments. On the other hand, I think Wenchen made a very good point about performance. Overall, I'm excited to see active discussions on this topic and believe the community will come to a proposal with the best of both sides.
>>
>> Chao
>>
>> On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon <[hidden email]> wrote:
>>>
>>> +1 for Liang-chi's.
>>>
>>> Thanks Ryan and Wenchen for leading this.
>>>
>>>
>>> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh <[hidden email]>님이 작성:
>>>>
>>>> Basically I think the proposal makes sense to me and I'd like to support the
>>>> SPIP as it looks like we have strong need for the important feature.
>>>>
>>>> Thanks Ryan for working on this and I do also look forward to Wenchen's
>>>> implementation. Thanks for the discussion too.
>>>>
>>>> Actually I think the SupportsInvoke proposed by Ryan looks a good
>>>> alternative to me. Besides Wenchen's alternative implementation, is there a
>>>> chance we also have the SupportsInvoke for comparison?
>>>>
>>>>
>>>> John Zhuge wrote
>>>> > Excited to see our Spark community rallying behind this important feature!
>>>> >
>>>> > The proposal lays a solid foundation of minimal feature set with careful
>>>> > considerations for future optimizations and extensions. Can't wait to see
>>>> > it leading to more advanced functionalities like views with shared custom
>>>> > functions, function pushdown, lambda, etc. It has already borne fruit from
>>>> > the constructive collaborations in this thread. Looking forward to
>>>> > Wenchen's prototype and further discussions including the SupportsInvoke
>>>> > extension proposed by Ryan.
>>>> >
>>>> >
>>>> > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley &lt;
>>>>
>>>> > owen.omalley@
>>>>
>>>> > &gt;
>>>> > wrote:
>>>> >
>>>> >> I think this proposal is a very good thing giving Spark a standard way of
>>>> >> getting to and calling UDFs.
>>>> >>
>>>> >> I like having the ScalarFunction as the API to call the UDFs. It is
>>>> >> simple, yet covers all of the polymorphic type cases well. I think it
>>>> >> would
>>>> >> also simplify using the functions in other contexts like pushing down
>>>> >> filters into the ORC & Parquet readers although there are a lot of
>>>> >> details
>>>> >> that would need to be considered there.
>>>> >>
>>>> >> .. Owen
>>>> >>
>>>> >>
>>>> >> On Fri, Feb 12, 2021 at 11:07 PM Erik Krogen &lt;
>>>>
>>>> > ekrogen@.com
>>>>
>>>> > &gt;
>>>> >> wrote:
>>>> >>
>>>> >>> I agree that there is a strong need for a FunctionCatalog within Spark
>>>> >>> to
>>>> >>> provide support for shareable UDFs, as well as make movement towards
>>>> >>> more
>>>> >>> advanced functionality like views which themselves depend on UDFs, so I
>>>> >>> support this SPIP wholeheartedly.
>>>> >>>
>>>> >>> I find both of the proposed UDF APIs to be sufficiently user-friendly
>>>> >>> and
>>>> >>> extensible. I generally think Wenchen's proposal is easier for a user to
>>>> >>> work with in the common case, but has greater potential for confusing
>>>> >>> and
>>>> >>> hard-to-debug behavior due to use of reflective method signature
>>>> >>> searches.
>>>> >>> The merits on both sides can hopefully be more properly examined with
>>>> >>> code,
>>>> >>> so I look forward to seeing an implementation of Wenchen's ideas to
>>>> >>> provide
>>>> >>> a more concrete comparison. I am optimistic that we will not let the
>>>> >>> debate
>>>> >>> over this point unreasonably stall the SPIP from making progress.
>>>> >>>
>>>> >>> Thank you to both Wenchen and Ryan for your detailed consideration and
>>>> >>> evaluation of these ideas!
>>>> >>> ------------------------------
>>>> >>> *From:* Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> *Sent:* Wednesday, February 10, 2021 6:06 PM
>>>> >>> *To:* Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt;
>>>> >>> *Cc:* Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;; Hyukjin Kwon <
>>>> >>>
>>>>
>>>> > gurwls223@
>>>>
>>>> >>; Spark Dev List &lt;
>>>>
>>>> > dev@.apache
>>>>
>>>> > &gt;; Wenchen Fan
>>>> >>> &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt;
>>>> >>> *Subject:* Re: [DISCUSS] SPIP: FunctionCatalog
>>>> >>>
>>>> >>> BTW, I forgot to add my opinion explicitly in this thread because I was
>>>> >>> on the PR before this thread.
>>>> >>>
>>>> >>> 1. The `FunctionCatalog API` PR was made on May 9, 2019 and has been
>>>> >>> there for almost two years.
>>>> >>> 2. I already gave my +1 on that PR last Saturday because I agreed with
>>>> >>> the latest updated design docs and AS-IS PR.
>>>> >>>
>>>> >>> And, the rest of the progress in this thread is also very satisfying to
>>>> >>> me.
>>>> >>> (e.g. Ryan's extension suggestion and Wenchen's alternative)
>>>> >>>
>>>> >>> To All:
>>>> >>> Please take a look at the design doc and the PR, and give us some
>>>> >>> opinions.
>>>> >>> We really need your participation in order to make DSv2 more complete.
>>>> >>> This will unblock other DSv2 features, too.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:58 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Hi, Ryan.
>>>> >>>
>>>> >>> We didn't move past anything (both yours and Wenchen's). What Wenchen
>>>> >>> suggested is double-checking the alternatives with the implementation to
>>>> >>> give more momentum to our discussion.
>>>> >>>
>>>> >>> Your new suggestion about optional extention also sounds like a new
>>>> >>> reasonable alternative to me.
>>>> >>>
>>>> >>> We are still discussing this topic together and I hope we can make a
>>>> >>> conclude at this time (for Apache Spark 3.2) without being stucked like
>>>> >>> last time.
>>>> >>>
>>>> >>> I really appreciate your leadership in this dicsussion and the moving
>>>> >>> direction of this discussion looks constructive to me. Let's give some
>>>> >>> time
>>>> >>> to the alternatives.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:14 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> I don’t think we should so quickly move past the drawbacks of this
>>>> >>> approach. The problems are significant enough that using invoke is not
>>>> >>> sufficient on its own. But, I think we can add it as an optional
>>>> >>> extension
>>>> >>> to shore up the weaknesses.
>>>> >>>
>>>> >>> Here’s a summary of the drawbacks:
>>>> >>>
>>>> >>>    - Magic function signatures are error-prone
>>>> >>>    - Spark would need considerable code to help users find what went
>>>> >>>    wrong
>>>> >>>    - Spark would likely need to coerce arguments (e.g., String,
>>>> >>>    Option[Int]) for usability
>>>> >>>    - It is unclear how Spark will find the Java Method to call
>>>> >>>    - Use cases that require varargs fall back to casting; users will
>>>> >>>    also get this wrong (cast to String instead of UTF8String)
>>>> >>>    - The non-codegen path is significantly slower
>>>> >>>
>>>> >>> The benefit of invoke is to avoid moving data into a row, like this:
>>>> >>>
>>>> >>> -- using invoke
>>>> >>> int result = udfFunction(x, y)
>>>> >>>
>>>> >>> -- using row
>>>> >>> udfRow.update(0, x); -- actual: values[0] = x;
>>>> >>> udfRow.update(1, y);
>>>> >>> int result = udfFunction(udfRow);
>>>> >>>
>>>> >>> And, again, that won’t actually help much in cases that require varargs.
>>>> >>>
>>>> >>> I suggest we add a new marker trait for BoundMethod called
>>>> >>> SupportsInvoke.
>>>> >>> If that interface is implemented, then Spark will look for a method that
>>>> >>> matches the expected signature based on the bound input type. If it
>>>> >>> isn’t
>>>> >>> found, Spark can print a warning and fall back to the InternalRow call:
>>>> >>> “Cannot find udfFunction(int, int)”.
>>>> >>>
>>>> >>> This approach allows the invoke optimization, but solves many of the
>>>> >>> problems:
>>>> >>>
>>>> >>>    - The method to invoke is found using the proposed load and bind
>>>> >>>    approach
>>>> >>>    - Magic function signatures are optional and do not cause runtime
>>>> >>>    failures
>>>> >>>    - Because this is an optional optimization, Spark can be more strict
>>>> >>>    about types
>>>> >>>    - Varargs cases can still use rows
>>>> >>>    - Non-codegen can use an evaluation method rather than falling back
>>>> >>>    to slow Java reflection
>>>> >>>
>>>> >>> This seems like a good extension to me; this provides a plan for
>>>> >>> optimizing the UDF call to avoid building a row, while the existing
>>>> >>> proposal covers the other cases well and addresses how to locate these
>>>> >>> function calls.
>>>> >>>
>>>> >>> This also highlights that the approach used in DSv2 and this proposal is
>>>> >>> working: start small and use extensions to layer on more complex
>>>> >>> support.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 9:04 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Thank you all for making a giant move forward for Apache Spark 3.2.0.
>>>> >>> I'm really looking forward to seeing Wenchen's implementation.
>>>> >>> That would be greatly helpful to make a decision!
>>>> >>>
>>>> >>> > I'll implement my idea after the holiday and then we can have
>>>> >>> more effective discussions. We can also do benchmarks and get some real
>>>> >>> numbers.
>>>> >>> > FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067978066%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=iMWmHqqXPcT7EK%2Bovyzhy%2BZpU6Llih%2BwdZD53wvobmc%3D&amp;reserved=0&gt;
>>>> >>> also
>>>> >>> takes individual parameters instead of the row parameter. I think this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>> On Tue, Feb 9, 2021 at 10:18 PM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=ZSBCR7yx3PpwL4KY9V73JG42Z02ZodqkjxC0LweHt1g%3D&amp;reserved=0&gt;
>>>> >>> also takes individual parameters instead of the row parameter. I think
>>>> >>> this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:18 AM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi Holden,
>>>> >>>
>>>> >>> As Hyukjin said, following existing designs is not the principle of DS
>>>> >>> v2
>>>> >>> API design. We should make sure the DS v2 API makes sense. AFAIK we
>>>> >>> didn't
>>>> >>> fully follow the catalog API design from Hive and I believe Ryan also
>>>> >>> agrees with it.
>>>> >>>
>>>> >>> I think the problem here is we were discussing some very detailed things
>>>> >>> without actual code. I'll implement my idea after the holiday and then
>>>> >>> we
>>>> >>> can have more effective discussions. We can also do benchmarks and get
>>>> >>> some
>>>> >>> real numbers.
>>>> >>>
>>>> >>> In the meantime, we can continue to discuss other parts of this
>>>> >>> proposal,
>>>> >>> and make a prototype if possible. Spark SQL has many active
>>>> >>> contributors/committers and this thread doesn't get much attention yet.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 6:17 AM Hyukjin Kwon &lt;
>>>>
>>>> > gurwls223@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Just dropping a few lines. I remember that one of the goals in DSv2 is
>>>> >>> to
>>>> >>> correct the mistakes we made in the current Spark codes.
>>>> >>> It would not have much point if we will happen to just follow and mimic
>>>> >>> what Spark currently does. It might just end up with another copy of
>>>> >>> Spark
>>>> >>> APIs, e.g. Expression (internal) APIs. I sincerely would like to avoid
>>>> >>> this
>>>> >>> I do believe we have been stuck mainly due to trying to come up with a
>>>> >>> better design. We already have an ugly picture of the current Spark APIs
>>>> >>> to
>>>> >>> draw a better bigger picture.
>>>> >>>
>>>> >>>
>>>> >>> 2021년 2월 10일 (수) 오전 3:28, Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;님이 작성:
>>>> >>>
>>>> >>> I think this proposal is a good set of trade-offs and has existed in the
>>>> >>> community for a long period of time. I especially appreciate how the
>>>> >>> design
>>>> >>> is focused on a minimal useful component, with future optimizations
>>>> >>> considered from a point of view of making sure it's flexible, but actual
>>>> >>> concrete decisions left for the future once we see how this API is used.
>>>> >>> I
>>>> >>> think if we try and optimize everything right out of the gate, we'll
>>>> >>> quickly get stuck (again) and not make any progress.
>>>> >>>
>>>> >>> On Mon, Feb 8, 2021 at 10:46 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi everyone,
>>>> >>>
>>>> >>> I'd like to start a discussion for adding a FunctionCatalog interface to
>>>> >>> catalog plugins. This will allow catalogs to expose functions to Spark,
>>>> >>> similar to how the TableCatalog interface allows a catalog to expose
>>>> >>> tables. The proposal doc is available here:
>>>> >>> https://docs.google.com/document/d/1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U/edit
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U%2Fedit&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=Kyth8%2FhNUZ6GXG2FsgcknZ7t7s0%2BpxnDMPyxvsxLLqE%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Here's a high-level summary of some of the main design choices:
>>>> >>> * Adds the ability to list and load functions, not to create or modify
>>>> >>> them in an external catalog
>>>> >>> * Supports scalar, aggregate, and partial aggregate functions
>>>> >>> * Uses load and bind steps for better error messages and simpler
>>>> >>> implementations
>>>> >>> * Like the DSv2 table read and write APIs, it uses InternalRow to pass
>>>> >>> data
>>>> >>> * Can be extended using mix-in interfaces to add vectorization, codegen,
>>>> >>> and other future features
>>>> >>>
>>>> >>> There is also a PR with the proposed API:
>>>> >>> https://github.com/apache/spark/pull/24559/files
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F24559%2Ffiles&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=t3ZCqffdsrmCY3X%2FT8x1oMjMcNUiQ0wQNk%2ByAXQx1Io%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Let's discuss the proposal here rather than on that PR, to get better
>>>> >>> visibility. Also, please take the time to read the proposal first. That
>>>> >>> really helps clear up misconceptions.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Twitter: https://twitter.com/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=fVfSPIyazuUYv8VLfNu%2BUIHdc3ePM1AAKKH%2BlnIicF8%3D&amp;reserved=0&gt;
>>>> >>> Books (Learning Spark, High Performance Spark, etc.):
>>>> >>> https://amzn.to/2MaRAG9
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Famzn.to%2F2MaRAG9&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=NbRl9kK%2B6Wy0jWmDnztYp3JCPNLuJvmFsLHUrXzEhlk%3D&amp;reserved=0&gt;
>>>> >>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.youtube.com%2Fuser%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060068007935%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=OWXOBELzO3hBa2JI%2FOSBZ3oNyLq0yr%2FGXMkNn7bqYDM%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >
>>>> > --
>>>> > John Zhuge
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: [hidden email]
>>>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Chao Sun
I don't think that's the case. Wenchen's proposal is that the primary API is one discovered via reflection which detects methods by their types.

Hmm, I thought we've already moved on from the reflection proposal. quote: "If people have such a big concern about reflection, we can follow the current Spark Java UDF and Transport, and create ScalarFuncion0[R]ScalarFuncion1[T1, R], etc. to avoid reflection."

I think with the ScalarFunction/ScalarFunction0/../ScalarFunctionN approach, it is up to the UnboundFunction.bind to decide which implementation is to be returned based on the input type?

Chao


On Fri, Feb 26, 2021 at 4:48 PM xkrogen <[hidden email]> wrote:
> Correct me if I'm wrong, but it appears we've basically agreed upon the APIs proposed in the SPIP (forget the naming part):

I don't think that's the case. Wenchen's proposal is that the primary API is one discovered via reflection which detects methods by their types. The InternalRow API would be the add-on for supporting varargs. This is in opposition to Ryan's proposal which says the primary API is the InternalRow API, with a reflective API being the add-on. This is important because of Wenchen's point about forcing users to implement the InternalRow API even if they prefer the reflective API.

> > I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row.
> I have a different opinion about this. If the input is string type but the UDF implementation calls `row.getLong(0)`, it returns wrong data, which is very bad. With the individual-parameters approach, if you implement UDF with `def call(input: Long)` but the input is string type, analyzer can detect it and fail the query.

I am in agreement with Wenchen on this point. I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety. I believe that Wenchen's proposal will provide stronger query-compile-time safety (i.e. fewer runtime issues) at the expense of less Java-compile-time safety, which seems like a good tradeoff. This also pushes more complexity onto the Spark implementation side for the purposes of reflectively discovering methods and applying casts as necessary, but again, I see this as a good tradeoff for providing what seems to me to be a more user-friendly (albeit slightly more "magical") API.

The biggest questions to me are whether the Spark-side implementation for the reflective API will become too complex to implement well (one of the strengths of the InternalRow API is its simplicity), and whether type erasure will hurt the ability to do reasonable reflective discovery on complex types. To resolve these, I would love to see a POC of Wenchen's proposal.

I am supportive of moving forward with committing a version of the PR that does not include the UDF APIs to make concrete progress towards this SPIP while the discussion plays out, but I also feel Ryan's concern that the API is too integral to the SPIP to move forward without it is reasonable. At this time I'm not supportive of merging the PR as-is because I do not think the API debate has been reasonably settled and it will inevitably be harder to change later rather than getting right the first time.

Really appreciate the active and productive discussion on both sides here!
Thanks,
Erik

On Fri, Feb 26, 2021 at 3:38 PM Chao Sun <[hidden email]> wrote:
Correct me if I'm wrong, but it appears we've basically agreed upon the APIs proposed in the SPIP (forget the naming part):

interface ScalarFunction extends BoundFunction<R> { 
  R produceResult(InternalRow args); 

interface AggregateFunction<S, R> extends BoundFunction<R> { 
  S update(S state, InternalRow input); 
}

together with the rest of the design such as FunctionCatalog and binding process. 

The argument at the moment seems to be whether we want to have SupportsInvoke or [Scalar|Aggregate]FunctionN alongside these, is that correct?
 In order to move this forward, perhaps we can merge the PR as it is (maybe we'll need a vote?) and proceed to discuss these topics? We can also then present separate PRs on top of it, which can help a lot for people within this thread to provide comments.

WDYT?

Best,
Chao

On Wed, Feb 24, 2021 at 10:45 PM Wenchen Fan <[hidden email]> wrote:
I think there is one agreement between us: we need both the individual-parameters and row-parameter APIs(your SupportsInvoke proposal and my VarargsScalarFunction proposal). IIUC the argument now is how to compose these 2 APIs.

Your proposal is to put the row-parameter API in the base ScalaFunction interface, with an optional SupportsInvoke interface for the individual-parameters API. I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

My proposal is to leave the choice to the users. They can pick one from ScalarFunction0ScalarFunction1, ..., VarargsScalarFunction.

More replies below:

> We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

I don't think we agree with it. Whatever UDF API we choose at the end (either individual-parameters or row-parameter), both non-codegen and codegen code paths should just call these Java methods from the UDF API. It doesn't make sense to have different UDF APIs for non-codegen and codegen.

> The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

My initial idea is to not have these 9 interfaces and fully rely on Java reflection. We can do some benchmark, if reflection is not that slow, I think we don't need to add these 9 interfaces. Preso UDF API takes the same approach. And one correction: my proposal is to use InternalRow for varargs UDF, not Object[].

> Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement

You can take a look at the Spark ScalaUDF expression. It has a big match statement for the non-codegen path, but the codegen path is much simpler because we can generate the exact Java code to call the specific UDF. I don't think it's a big problem, or we can use reflection in the non-codegen path to avoid the big match statement.

> Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.

You seem to keep ignoring my proposal that we can check the UDF function signature at the analysis phase to make sure it matches the input types. And with codegen Spark can call the specific function to avoid boxing issues. If you missed my previous example, here is what the generated code looks like:

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

> I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row.

I have a different opinion about this. If the input is string type but the UDF implementation calls `row.getLong(0)`, it returns wrong data, which is very bad. With the individual-parameters approach, if you implement UDF with `def call(input: Long)` but the input is string type, analyzer can detect it and fail the query.

On Thu, Feb 25, 2021 at 6:48 AM Ryan Blue <[hidden email]> wrote:

How functions are called is a really big element of this effort. I don’t want to get in a position where we’ve started committing changes without clear agreement on something so fundamental to the proposal. I’d like to make sure we’re in agreement with a vote on the SPIP before committing anything. That is, after all, the point of the SPIPs.

If people think it would help to have an alternative API in a PR, then that’s fine with me.

Since that PR suggestion is intended to make it easier to understand the technical details, I’ll try to summarize where we’re at now:

  • We agree on the scope of adding FunctionCatalog to load functions
  • We agree with the FunctionCatalog methods and the function binding approach
  • We agree that a bound function will be either a ScalarFunction or an AggregateFunction (plus the mix-in PartialAggregateFunction)
  • We agree that values should be passed should be Spark’s internal representation to avoid translation
  • We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

The disagreement is about how to call functions when codegen isn’t used or when the function needs to support variable-length argument lists. There are two options:

The first option is for each function to have a method that accepts an InternalRow, from the proposed SPIP:

interface ScalarFunction extends BoundFunction<R> {
  R produceResult(InternalRow input);
}
interface AggregateFunction<S> extends BoundFunction<R> {
  S update(S state, InternalRow input);
  ...
}

The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

interface ScalarFunction1<T1> extends BoundFunction<R> {
  R produceResult(T1 one);
}
interface ScalarFunction2<T1, T2> extends BoundFunction<R> {
  R produceResult(T1 one, T2 two);
}
... 8 more ScalarFunction interfaces
interface ScalarFunctionVarargs extends BoundFunction<R> {
  R produceResult(Object[] args);
}
interface AggregateFunction<S, T1> extends BoundFunction<R> {
  S update(S state, T1 one);
}
interface AggregateFunction<S, T1, T2> extends BoundFunction<R> {
  S update(S state, T1 one, T2 two);
}
... 8 more AggregateFunction interfaces
interface AggregateFunctionVarargs<S> extends BoundFunction<R> {
  S update(S state, Object[] args);
}

Because this is for the non-invoke case, the two options have roughly the same performance characteristics.

The first option has some advantages:

  • It is simpler: there are few interfaces and Spark will always find the right method
  • Accessing a value returns a concrete type, so it is less error-prone. I’ve given an example where this helps identify a problem with an invoke method.

The second option’s advantage is that users have values broken out into arguments. That is, if I understand Wenchen correctly here: “I don’t like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers.”

Disadvantages with the second option:

  • There are 20+ more interfaces in the API
  • Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement that calls each interface separately (see UDFRegistration).
  • Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.
  • The varargs case will result in casting to expected types, which could also fail with ClassCastException

I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row. You get compile-time checks when using the wrong type like this: String val = input.getString(0); won’t compile.

Another important thing to note is that although the original idea was to keep the individual parameter approach simple, Wenchen has already suggested passing arrays as Java arrays, like UTF8String[]. This adds to the complexity of the overall solution and requires matching multiple types. How would Spark know to pass UTF8String[] or ArrayData?

If anyone disagrees with that summary, please point out where it’s incorrect. But barring a major misunderstanding, I think the choice is clear: the simpler approach that provides additional compile-time safety is the right way to go.


On Tue, Feb 23, 2021 at 1:48 AM Wenchen Fan <[hidden email]> wrote:
+1, as I already proposed we can move forward with PRs

> To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

Ryan, can we focus on the function loading and binding part and get it committed first? I can also fork your branch and put everything together, but that might be too big to review.

On Tue, Feb 23, 2021 at 4:35 PM Dongjoon Hyun <[hidden email]> wrote:
I've been still supporting Ryan's SPIP (original PR and its extension proposal discussed here) because of its simplicity.

According to this email thread context, I also understand the different perspectives like Hyukjin's concerns about having multiple ways and Wenchen's proposal and rationales.

It looks like we need more discussion to reach an agreement. And the technical details become more difficult to track because this is an email thread.

Although Ryan initially suggested discussing this on Apache email thread instead of the PR, can we have a PR to discuss?

Especially, Wenchen, could you make your PR based on Ryan's PR?

If we collect the scattered ideas into a single PR, that would be greatly helpful not only for further discussions, but also when we go on a vote on Ryan's PR or Wenchen's PR.

Bests,
Dongjoon.


On Mon, Feb 22, 2021 at 1:16 AM Wenchen Fan <[hidden email]> wrote:
Hi Walaa,

Thanks for sharing this! The type signature stuff is already covered by the unbound UDF API, which specifies the input and output data types. The problem is how to check the method signature of the bound UDF. As you said, Java has type erasure and we can't check `List<String>` for example.

My initial proposal is to do nothing and simply pass the Spark ArrayData, MapData, InternalRow to the UDF. This requires the UDF developers to ensure the type is matched, as they need to call something like `array.getLong(index)` with the corrected type name. It's as worse as the row-parameter version but seems fine as it only happens with nested types. And the type check is still done for the first level (the method signature must use ArrayData/MapData/InternalRow at least).

We can allow more types in the future to make the type check better. It might be too detailed for this discussion thread but just put a few thoughts:
1. Java array doesn't do type erasure. We can use UTF8String[] for example if the input type is array of string.
2. For struct type, we can allow Java beans/Scala case classes if the field name and type match the type signature.
3. For map type, it's actually struct<keys: array<key_type>, values: array<value_type>>, so we can also allow Java beans/Scala case classes here.

The general idea is to use stuff that can retain nested type information at compile-time, i.e. array, java bean, case classes.

Thanks,
Wenchen



On Mon, Feb 22, 2021 at 3:47 PM Walaa Eldin Moustafa <[hidden email]> wrote:
Wenchen, in Transport, users provide the input parameter signatures and output parameter signature as part of the API. Compile-time checks are done by parsing the type signatures and matching them to the type tree received at compile-time. This also helps with inferring the concrete output type.

The specification in the UDF API looks like this:

  @Override
  public List<String> getInputParameterSignatures() {
    return ImmutableList.of(
        "ARRAY(K)",
        "ARRAY(V)"
    );
  }

  @Override
  public String getOutputParameterSignature() {
    return "MAP(K,V)";
  }

The benefits of this type of type signature specification as opposed to inferring types from Java type signatures given in the Java method are:
  • For nested types, Java type erasure eliminates the information about nested types, so for something like my_function(List<String> a1, List<Integer> a2), the value of both a1.class or a2.class is just a List. However, we are planning to work around this in a future version in the case of Array and Map types. Struct types are discussed in the next point.
  • Without pre-code-generation there is no single Java type signature from which we can capture the Struct info. However, Struct info can be expressed in type signatures of the above type, e.g., ROW(FirstName VARCHAR, LastName VARCHAR).
When a Transport UDF represents a Spark UDF, the type signatures are matched against Spark native types, i.e., org.apache.spark.sql.types.{ArrayType, MapType, StructType}, and primitive types. The function that parses/compiles type signatures is found in AbstractTypeInference. This class represents the generic component that is common between all supported engines. Its Spark-specific extension is in SparkTypeInference. In the above example, at compile time, if the first Array happens to be of String element type, and the second Array happens to be of Integer element type, the UDF will communicate to the Spark analyzer that the output should be of type MapData<String, Integer> (i.e., based on what was seen in the input at compile time). The whole UDF becomes a Spark Expression at the end of the day.

Thanks,
Walaa.


On Sun, Feb 21, 2021 at 7:26 PM Wenchen Fan <[hidden email]> wrote:
I think I have made it clear that it's simpler for the UDF developers to deal with the input parameters directly, instead of getting them from a row, as you need to provide the index and type (e.g. row.getLong(0)). It's also coherent with the existing Spark Scala/Java UDF APIs, so that Spark users will be more familiar with the individual-parameters API.

And I have explained it already that we can use reflection to make sure the defined methods have the right types at query-compilation time. It's better than leaving this problem to the UDF developers and asking them to ensure the inputs are gotten from the row correctly with index and type. If there are people from Presto/Transport, it will be great if you can share how Presto/Transport do this check.

I don't like 22 additional interfaces too, but if you look at the examples I gave, the current Spark Java UDF only has 9 interfaces, and Transport has 8. I think it's good enough and people can use VarargsScalarFunction if they need to take more parameters or varargs. It resolves your concern about doing reflection in the non-codegen execution path that leads to bad performance, it also serves for documentation purpose as people can easily see the number of UDF inputs and their types by a quick glance.

As I said, we need to investigate how to avoid boxing. Since you are asking the question now, I spent sometime to think about it. I think the DoubleAdd example is the way to go. For non-codegen code path, we can just call the interface method. For the codegen code path, the generated Java code would look like (omit the null check logic):

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

Which invokes the primitive version automatically. AFAIK this is also how Scala supports primitive type parameter (generate an extra non-boxing version of the method). If the UDF doesn't have the primtive version method, this code will just call the boxed version and still works.

I don't like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers. Can other people share your opinions about the API?

On Sat, Feb 20, 2021 at 5:32 AM Ryan Blue <[hidden email]> wrote:

I don’t see any benefit to more complexity with 22 additional interfaces, instead of simply passing an InternalRow. Why not use a single interface with InternalRow? Maybe you could share your motivation?

That would also result in strange duplication, where the ScalarFunction2 method is just the boxed version:

class DoubleAdd implements ScalarFunction2<Double, Double, Double> {
  @Override
  Double produceResult(Double left, Double right) {
    return left + right;
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

This would work okay, but would be awkward if you wanted to use the same implementation for any number of arguments, like a sum method that adds all of the arguments together and returns the result. It also isn’t great for varargs, since it is basically the same as the invoke case.

The combination of an InternalRow method and the invoke method seems to be a good way to handle this to me. What is wrong with it? And, how would you solve the problem when implementations define methods with the wrong types? The InternalRow approach helps implementations catch that problem (as demonstrated above) and also provides a fallback when there is a but preventing the invoke optimization from working. That seems like a good approach to me.


On Thu, Feb 18, 2021 at 11:31 PM Wenchen Fan <[hidden email]> wrote:
If people have such a big concern about reflection, we can follow the current Spark Java UDF and Transport, and create ScalarFuncion0[R]ScalarFuncion1[T1, R], etc. to avoid reflection. But we may need to investigate how to avoid boxing with this API design.

To put a detailed proposal: let's have ScalarFuncion0ScalarFuncion1, ..., ScalarFuncion9 and VarargsScalarFunction. At execution time, if Spark sees ScalarFuncion0-9, pass the input columns to the UDF directly, one column one parameter. So string type input is UTF8String, array type input is ArrayData. If Spark sees VarargsScalarFunction, wrap the input columns with InternalRow and pass it to the UDF.

In general, if VarargsScalarFunction is implemented, the UDF should not implement ScalarFuncion0-9. We can also define a priority order to allow this. I don't have a strong preference here.

What do you think?

On Fri, Feb 19, 2021 at 1:24 PM Walaa Eldin Moustafa <[hidden email]> wrote:
I agree with Ryan on the questions around the expressivity of the Invoke method. It is not clear to me how the Invoke method can be used to declare UDFs with type-parameterized parameters. For example: a UDF to get the Nth element of an array (regardless of the Array element type) or a UDF to merge two Arrays (of generic types) to a Map.

Also, to address Wenchen's InternalRow question, can we create a number of Function classes, each corresponding to a number of input parameter length (e.g., ScalarFunction1, ScalarFunction2, etc)?

Thanks,
Walaa.
 

On Thu, Feb 18, 2021 at 6:07 PM Ryan Blue <[hidden email]> wrote:

I agree with you that it is better in many cases to directly call a method. But it it not better in all cases, which is why I don’t think it is the right general-purpose choice.

First, if codegen isn’t used for some reason, the reflection overhead is really significant. That gets much better when you have an interface to call. That’s one reason I’d use this pattern:

class DoubleAdd implements ScalarFunction<Double>, SupportsInvoke {
  Double produceResult(InternalRow row) {
    return produceResult(row.getDouble(0), row.getDouble(1));
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

There’s little overhead to adding the InternalRow variation, but we could call it in eval to avoid the reflect overhead. To the point about UDF developers, I think this is a reasonable cost.

Second, I think usability is better and helps avoid runtime issues. Here’s an example:

class StrLen implements ScalarFunction<Integer>, SupportsInvoke {
  Integer produceResult(InternalRow row) {
    return produceResult(row.getString(0));
  }

  Integer produceResult(String str) {
    return str.length();
  }
}

See the bug? I forgot to use UTF8String instead of String. With the InternalRow method, I get a compiler warning because getString produces UTF8String that can’t be passed to produceResult(String). If I decided to implement length separately, then we could still run the InternalRow version and log a warning. The code would be slightly slower, but wouldn’t fail.

There are similar situations with varargs where it’s better to call methods that produce concrete types than to cast from Object to some expected type.

I think that using invoke is a great extension to the proposal, but I don’t think that it should be the only way to call functions. By all means, let’s work on it in parallel and use it where possible. But I think we do need the fallback of using InternalRow and that it isn’t a usability problem to include it.

Oh, and one last thought is that we already have users that call Dataset.map and use InternalRow. This would allow converting that code directly to a UDF.

I think we’re closer to agreeing here than it actually looks. Hopefully you’ll agree that having the InternalRow method isn’t a big usability problem.


On Wed, Feb 17, 2021 at 11:51 PM Wenchen Fan <[hidden email]> wrote:
I don't see any objections to the rest of the proposal (loading functions from the catalog, function binding stuff, etc.) and I assume everyone is OK with it. We can commit that part first.

Currently, the discussion focuses on the `ScalarFunction` API, where I think it's better to directly take the input columns as the UDF parameter, instead of wrapping the input columns with InternalRow and taking the InternalRow as the UDF parameter. It's not only for better performance, but also for ease of use. For example, it's easier for the UDF developer to write `input1 + input2` than `inputRow.getLong(0) + inputRow.getLong(1)`, as they don't need to specify the type and index by themselves (getLong(0)) which is error-prone.

It does push more work to the Spark side, but I think it's worth it if implementing UDF gets easier. I don't think the work is very challenging, as we can leverage the infra we built for the expression encoder.

I think it's also important to look at the UDF API from the user's perspective (UDF developers). How do you like the UDF API without considering how Spark can support it? Do you prefer the individual-parameters version or the row-parameter version?

To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

On Thu, Feb 18, 2021 at 4:48 AM Ryan Blue <[hidden email]> wrote:
Thanks, Hyukjin. I think that's a fair summary. And I agree with the idea that we should focus on what Spark will do by default.

I think we should focus on the proposal, for two reasons: first, there is a straightforward path to incorporate Wenchen's suggestion via `SupportsInvoke`, and second, the proposal is more complete: it defines a solution for many concerns like loading a function and finding out what types to use -- not just how to call code -- and supports more use cases like varargs functions. I think we can continue to discuss the rest of the proposal and be confident that we can support an invoke code path where it makes sense.

Does everyone agree? If not, I think we would need to solve a lot of the challenges that I initially brought up with the invoke idea. It seems like a good way to call a function, but needs a real proposal behind it if we don't use it via `SupportsInvoke` in the current proposal.

On Tue, Feb 16, 2021 at 11:07 PM Hyukjin Kwon <[hidden email]> wrote:

Just to make sure we don’t move past, I think we haven’t decided yet:

  • if we’ll replace the current proposal to Wenchen’s approach as the default
  • if we want to have Wenchen’s approach as an optional mix-in on the top of Ryan’s proposal (SupportsInvoke)

From what I read, some people pointed out it as a replacement. Please correct me if I misread this discussion thread.

As Dongjoon pointed out, it would be good to know rough ETA to make sure making progress in this, and people can compare more easily.


FWIW, there’s the saying I like in the zen of Python:

There should be one— and preferably only one —obvious way to do it.

If multiple approaches have the way for developers to do the (almost) same thing, I would prefer to avoid it.

In addition, I would prefer to focus on what Spark does by default first.


2021년 2월 17일 (수) 오후 2:33, Dongjoon Hyun <[hidden email]>님이 작성:
Hi, Wenchen.

This thread seems to get enough attention. Also, I'm expecting more and more if we have this on the `master` branch because we are developing together.

    > Spark SQL has many active contributors/committers and this thread doesn't get much attention yet.

So, what's your ETA from now?

    > I think the problem here is we were discussing some very detailed things without actual code.
    > I'll implement my idea after the holiday and then we can have more effective discussions.
    > We can also do benchmarks and get some real numbers.
    > In the meantime, we can continue to discuss other parts of this proposal, and make a prototype if possible.

I'm looking forward to seeing your PR. I hope we can conclude this thread and have at least one implementation in the `master` branch this month (February).
If you need more time (one month or longer), why don't we have Ryan's suggestion in the `master` branch first and benchmark with your PR later during Apache Spark 3.2 timeframe.

Bests,
Dongjoon.


On Tue, Feb 16, 2021 at 9:26 AM Ryan Blue <[hidden email]> wrote:
Andrew,

The proposal already includes an API for aggregate functions and I think we would want to implement those right away.

Processing ColumnBatch is something we can easily extend the interfaces to support, similar to Wenchen's suggestion. The important thing right now is to agree on some basic functionality: how to look up functions and what the simple API should be. Like the TableCatalog interfaces, we will layer on more support through optional interfaces like `SupportsInvoke` or `SupportsColumnBatch`.

On Tue, Feb 16, 2021 at 9:00 AM Andrew Melo <[hidden email]> wrote:
Hello Ryan,

This proposal looks very interesting. Would future goals for this
functionality include both support for aggregation functions, as well
as support for processing ColumnBatch-es (instead of Row/InternalRow)?

Thanks
Andrew

On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue <[hidden email]> wrote:
>
> Thanks for the positive feedback, everyone. It sounds like there is a clear path forward for calling functions. Even without a prototype, the `invoke` plans show that Wenchen's suggested optimization can be done, and incorporating it as an optional extension to this proposal solves many of the unknowns.
>
> With that area now understood, is there any discussion about other parts of the proposal, besides the function call interface?
>
> On Fri, Feb 12, 2021 at 10:40 PM Chao Sun <[hidden email]> wrote:
>>
>> This is an important feature which can unblock several other projects including bucket join support for DataSource v2, complete support for enforcing DataSource v2 distribution requirements on the write path, etc. I like Ryan's proposals which look simple and elegant, with nice support on function overloading and variadic arguments. On the other hand, I think Wenchen made a very good point about performance. Overall, I'm excited to see active discussions on this topic and believe the community will come to a proposal with the best of both sides.
>>
>> Chao
>>
>> On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon <[hidden email]> wrote:
>>>
>>> +1 for Liang-chi's.
>>>
>>> Thanks Ryan and Wenchen for leading this.
>>>
>>>
>>> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh <[hidden email]>님이 작성:
>>>>
>>>> Basically I think the proposal makes sense to me and I'd like to support the
>>>> SPIP as it looks like we have strong need for the important feature.
>>>>
>>>> Thanks Ryan for working on this and I do also look forward to Wenchen's
>>>> implementation. Thanks for the discussion too.
>>>>
>>>> Actually I think the SupportsInvoke proposed by Ryan looks a good
>>>> alternative to me. Besides Wenchen's alternative implementation, is there a
>>>> chance we also have the SupportsInvoke for comparison?
>>>>
>>>>
>>>> John Zhuge wrote
>>>> > Excited to see our Spark community rallying behind this important feature!
>>>> >
>>>> > The proposal lays a solid foundation of minimal feature set with careful
>>>> > considerations for future optimizations and extensions. Can't wait to see
>>>> > it leading to more advanced functionalities like views with shared custom
>>>> > functions, function pushdown, lambda, etc. It has already borne fruit from
>>>> > the constructive collaborations in this thread. Looking forward to
>>>> > Wenchen's prototype and further discussions including the SupportsInvoke
>>>> > extension proposed by Ryan.
>>>> >
>>>> >
>>>> > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley &lt;
>>>>
>>>> > owen.omalley@
>>>>
>>>> > &gt;
>>>> > wrote:
>>>> >
>>>> >> I think this proposal is a very good thing giving Spark a standard way of
>>>> >> getting to and calling UDFs.
>>>> >>
>>>> >> I like having the ScalarFunction as the API to call the UDFs. It is
>>>> >> simple, yet covers all of the polymorphic type cases well. I think it
>>>> >> would
>>>> >> also simplify using the functions in other contexts like pushing down
>>>> >> filters into the ORC & Parquet readers although there are a lot of
>>>> >> details
>>>> >> that would need to be considered there.
>>>> >>
>>>> >> .. Owen
>>>> >>
>>>> >>
>>>> >> On Fri, Feb 12, 2021 at 11:07 PM Erik Krogen &lt;
>>>>
>>>> > ekrogen@.com
>>>>
>>>> > &gt;
>>>> >> wrote:
>>>> >>
>>>> >>> I agree that there is a strong need for a FunctionCatalog within Spark
>>>> >>> to
>>>> >>> provide support for shareable UDFs, as well as make movement towards
>>>> >>> more
>>>> >>> advanced functionality like views which themselves depend on UDFs, so I
>>>> >>> support this SPIP wholeheartedly.
>>>> >>>
>>>> >>> I find both of the proposed UDF APIs to be sufficiently user-friendly
>>>> >>> and
>>>> >>> extensible. I generally think Wenchen's proposal is easier for a user to
>>>> >>> work with in the common case, but has greater potential for confusing
>>>> >>> and
>>>> >>> hard-to-debug behavior due to use of reflective method signature
>>>> >>> searches.
>>>> >>> The merits on both sides can hopefully be more properly examined with
>>>> >>> code,
>>>> >>> so I look forward to seeing an implementation of Wenchen's ideas to
>>>> >>> provide
>>>> >>> a more concrete comparison. I am optimistic that we will not let the
>>>> >>> debate
>>>> >>> over this point unreasonably stall the SPIP from making progress.
>>>> >>>
>>>> >>> Thank you to both Wenchen and Ryan for your detailed consideration and
>>>> >>> evaluation of these ideas!
>>>> >>> ------------------------------
>>>> >>> *From:* Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> *Sent:* Wednesday, February 10, 2021 6:06 PM
>>>> >>> *To:* Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt;
>>>> >>> *Cc:* Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;; Hyukjin Kwon <
>>>> >>>
>>>>
>>>> > gurwls223@
>>>>
>>>> >>; Spark Dev List &lt;
>>>>
>>>> > dev@.apache
>>>>
>>>> > &gt;; Wenchen Fan
>>>> >>> &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt;
>>>> >>> *Subject:* Re: [DISCUSS] SPIP: FunctionCatalog
>>>> >>>
>>>> >>> BTW, I forgot to add my opinion explicitly in this thread because I was
>>>> >>> on the PR before this thread.
>>>> >>>
>>>> >>> 1. The `FunctionCatalog API` PR was made on May 9, 2019 and has been
>>>> >>> there for almost two years.
>>>> >>> 2. I already gave my +1 on that PR last Saturday because I agreed with
>>>> >>> the latest updated design docs and AS-IS PR.
>>>> >>>
>>>> >>> And, the rest of the progress in this thread is also very satisfying to
>>>> >>> me.
>>>> >>> (e.g. Ryan's extension suggestion and Wenchen's alternative)
>>>> >>>
>>>> >>> To All:
>>>> >>> Please take a look at the design doc and the PR, and give us some
>>>> >>> opinions.
>>>> >>> We really need your participation in order to make DSv2 more complete.
>>>> >>> This will unblock other DSv2 features, too.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:58 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Hi, Ryan.
>>>> >>>
>>>> >>> We didn't move past anything (both yours and Wenchen's). What Wenchen
>>>> >>> suggested is double-checking the alternatives with the implementation to
>>>> >>> give more momentum to our discussion.
>>>> >>>
>>>> >>> Your new suggestion about optional extention also sounds like a new
>>>> >>> reasonable alternative to me.
>>>> >>>
>>>> >>> We are still discussing this topic together and I hope we can make a
>>>> >>> conclude at this time (for Apache Spark 3.2) without being stucked like
>>>> >>> last time.
>>>> >>>
>>>> >>> I really appreciate your leadership in this dicsussion and the moving
>>>> >>> direction of this discussion looks constructive to me. Let's give some
>>>> >>> time
>>>> >>> to the alternatives.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:14 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> I don’t think we should so quickly move past the drawbacks of this
>>>> >>> approach. The problems are significant enough that using invoke is not
>>>> >>> sufficient on its own. But, I think we can add it as an optional
>>>> >>> extension
>>>> >>> to shore up the weaknesses.
>>>> >>>
>>>> >>> Here’s a summary of the drawbacks:
>>>> >>>
>>>> >>>    - Magic function signatures are error-prone
>>>> >>>    - Spark would need considerable code to help users find what went
>>>> >>>    wrong
>>>> >>>    - Spark would likely need to coerce arguments (e.g., String,
>>>> >>>    Option[Int]) for usability
>>>> >>>    - It is unclear how Spark will find the Java Method to call
>>>> >>>    - Use cases that require varargs fall back to casting; users will
>>>> >>>    also get this wrong (cast to String instead of UTF8String)
>>>> >>>    - The non-codegen path is significantly slower
>>>> >>>
>>>> >>> The benefit of invoke is to avoid moving data into a row, like this:
>>>> >>>
>>>> >>> -- using invoke
>>>> >>> int result = udfFunction(x, y)
>>>> >>>
>>>> >>> -- using row
>>>> >>> udfRow.update(0, x); -- actual: values[0] = x;
>>>> >>> udfRow.update(1, y);
>>>> >>> int result = udfFunction(udfRow);
>>>> >>>
>>>> >>> And, again, that won’t actually help much in cases that require varargs.
>>>> >>>
>>>> >>> I suggest we add a new marker trait for BoundMethod called
>>>> >>> SupportsInvoke.
>>>> >>> If that interface is implemented, then Spark will look for a method that
>>>> >>> matches the expected signature based on the bound input type. If it
>>>> >>> isn’t
>>>> >>> found, Spark can print a warning and fall back to the InternalRow call:
>>>> >>> “Cannot find udfFunction(int, int)”.
>>>> >>>
>>>> >>> This approach allows the invoke optimization, but solves many of the
>>>> >>> problems:
>>>> >>>
>>>> >>>    - The method to invoke is found using the proposed load and bind
>>>> >>>    approach
>>>> >>>    - Magic function signatures are optional and do not cause runtime
>>>> >>>    failures
>>>> >>>    - Because this is an optional optimization, Spark can be more strict
>>>> >>>    about types
>>>> >>>    - Varargs cases can still use rows
>>>> >>>    - Non-codegen can use an evaluation method rather than falling back
>>>> >>>    to slow Java reflection
>>>> >>>
>>>> >>> This seems like a good extension to me; this provides a plan for
>>>> >>> optimizing the UDF call to avoid building a row, while the existing
>>>> >>> proposal covers the other cases well and addresses how to locate these
>>>> >>> function calls.
>>>> >>>
>>>> >>> This also highlights that the approach used in DSv2 and this proposal is
>>>> >>> working: start small and use extensions to layer on more complex
>>>> >>> support.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 9:04 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Thank you all for making a giant move forward for Apache Spark 3.2.0.
>>>> >>> I'm really looking forward to seeing Wenchen's implementation.
>>>> >>> That would be greatly helpful to make a decision!
>>>> >>>
>>>> >>> > I'll implement my idea after the holiday and then we can have
>>>> >>> more effective discussions. We can also do benchmarks and get some real
>>>> >>> numbers.
>>>> >>> > FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067978066%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=iMWmHqqXPcT7EK%2Bovyzhy%2BZpU6Llih%2BwdZD53wvobmc%3D&amp;reserved=0&gt;
>>>> >>> also
>>>> >>> takes individual parameters instead of the row parameter. I think this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>> On Tue, Feb 9, 2021 at 10:18 PM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=ZSBCR7yx3PpwL4KY9V73JG42Z02ZodqkjxC0LweHt1g%3D&amp;reserved=0&gt;
>>>> >>> also takes individual parameters instead of the row parameter. I think
>>>> >>> this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:18 AM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi Holden,
>>>> >>>
>>>> >>> As Hyukjin said, following existing designs is not the principle of DS
>>>> >>> v2
>>>> >>> API design. We should make sure the DS v2 API makes sense. AFAIK we
>>>> >>> didn't
>>>> >>> fully follow the catalog API design from Hive and I believe Ryan also
>>>> >>> agrees with it.
>>>> >>>
>>>> >>> I think the problem here is we were discussing some very detailed things
>>>> >>> without actual code. I'll implement my idea after the holiday and then
>>>> >>> we
>>>> >>> can have more effective discussions. We can also do benchmarks and get
>>>> >>> some
>>>> >>> real numbers.
>>>> >>>
>>>> >>> In the meantime, we can continue to discuss other parts of this
>>>> >>> proposal,
>>>> >>> and make a prototype if possible. Spark SQL has many active
>>>> >>> contributors/committers and this thread doesn't get much attention yet.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 6:17 AM Hyukjin Kwon &lt;
>>>>
>>>> > gurwls223@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Just dropping a few lines. I remember that one of the goals in DSv2 is
>>>> >>> to
>>>> >>> correct the mistakes we made in the current Spark codes.
>>>> >>> It would not have much point if we will happen to just follow and mimic
>>>> >>> what Spark currently does. It might just end up with another copy of
>>>> >>> Spark
>>>> >>> APIs, e.g. Expression (internal) APIs. I sincerely would like to avoid
>>>> >>> this
>>>> >>> I do believe we have been stuck mainly due to trying to come up with a
>>>> >>> better design. We already have an ugly picture of the current Spark APIs
>>>> >>> to
>>>> >>> draw a better bigger picture.
>>>> >>>
>>>> >>>
>>>> >>> 2021년 2월 10일 (수) 오전 3:28, Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;님이 작성:
>>>> >>>
>>>> >>> I think this proposal is a good set of trade-offs and has existed in the
>>>> >>> community for a long period of time. I especially appreciate how the
>>>> >>> design
>>>> >>> is focused on a minimal useful component, with future optimizations
>>>> >>> considered from a point of view of making sure it's flexible, but actual
>>>> >>> concrete decisions left for the future once we see how this API is used.
>>>> >>> I
>>>> >>> think if we try and optimize everything right out of the gate, we'll
>>>> >>> quickly get stuck (again) and not make any progress.
>>>> >>>
>>>> >>> On Mon, Feb 8, 2021 at 10:46 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi everyone,
>>>> >>>
>>>> >>> I'd like to start a discussion for adding a FunctionCatalog interface to
>>>> >>> catalog plugins. This will allow catalogs to expose functions to Spark,
>>>> >>> similar to how the TableCatalog interface allows a catalog to expose
>>>> >>> tables. The proposal doc is available here:
>>>> >>> https://docs.google.com/document/d/1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U/edit
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U%2Fedit&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=Kyth8%2FhNUZ6GXG2FsgcknZ7t7s0%2BpxnDMPyxvsxLLqE%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Here's a high-level summary of some of the main design choices:
>>>> >>> * Adds the ability to list and load functions, not to create or modify
>>>> >>> them in an external catalog
>>>> >>> * Supports scalar, aggregate, and partial aggregate functions
>>>> >>> * Uses load and bind steps for better error messages and simpler
>>>> >>> implementations
>>>> >>> * Like the DSv2 table read and write APIs, it uses InternalRow to pass
>>>> >>> data
>>>> >>> * Can be extended using mix-in interfaces to add vectorization, codegen,
>>>> >>> and other future features
>>>> >>>
>>>> >>> There is also a PR with the proposed API:
>>>> >>> https://github.com/apache/spark/pull/24559/files
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F24559%2Ffiles&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=t3ZCqffdsrmCY3X%2FT8x1oMjMcNUiQ0wQNk%2ByAXQx1Io%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Let's discuss the proposal here rather than on that PR, to get better
>>>> >>> visibility. Also, please take the time to read the proposal first. That
>>>> >>> really helps clear up misconceptions.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Twitter: https://twitter.com/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=fVfSPIyazuUYv8VLfNu%2BUIHdc3ePM1AAKKH%2BlnIicF8%3D&amp;reserved=0&gt;
>>>> >>> Books (Learning Spark, High Performance Spark, etc.):
>>>> >>> https://amzn.to/2MaRAG9
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Famzn.to%2F2MaRAG9&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=NbRl9kK%2B6Wy0jWmDnztYp3JCPNLuJvmFsLHUrXzEhlk%3D&amp;reserved=0&gt;
>>>> >>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.youtube.com%2Fuser%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060068007935%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=OWXOBELzO3hBa2JI%2FOSBZ3oNyLq0yr%2FGXMkNn7bqYDM%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >
>>>> > --
>>>> > John Zhuge
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: [hidden email]
>>>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

cloud0fan
I don't have a strong opinion about "magical methods" vs. ScalarFuncion0[R]ScalarFuncion1[T1, R]. Some people like more Java compile-time type-safety, some people hate too many additional APIs. But whatever the choice we make, we can always do analysis type checks using reflection to check UDF method signature.

My main point is, we should not make the row-parameter API the primary API. We can either let users pick one from ScalarFunction0ScalarFunction1, ..., VarargsScalarFunction., or use the "magical methods" as the primary API.

Again, I'm supportive to merge the PR without the UDF API first, and implement the function loading/binding later. Then we can start POC of different UDF API proposals. I'm also OK if people want to wait and finalize the API discussion first.

On Sat, Feb 27, 2021 at 9:10 AM Chao Sun <[hidden email]> wrote:
I don't think that's the case. Wenchen's proposal is that the primary API is one discovered via reflection which detects methods by their types.

Hmm, I thought we've already moved on from the reflection proposal. quote: "If people have such a big concern about reflection, we can follow the current Spark Java UDF and Transport, and create ScalarFuncion0[R]ScalarFuncion1[T1, R], etc. to avoid reflection."

I think with the ScalarFunction/ScalarFunction0/../ScalarFunctionN approach, it is up to the UnboundFunction.bind to decide which implementation is to be returned based on the input type?

Chao


On Fri, Feb 26, 2021 at 4:48 PM xkrogen <[hidden email]> wrote:
> Correct me if I'm wrong, but it appears we've basically agreed upon the APIs proposed in the SPIP (forget the naming part):

I don't think that's the case. Wenchen's proposal is that the primary API is one discovered via reflection which detects methods by their types. The InternalRow API would be the add-on for supporting varargs. This is in opposition to Ryan's proposal which says the primary API is the InternalRow API, with a reflective API being the add-on. This is important because of Wenchen's point about forcing users to implement the InternalRow API even if they prefer the reflective API.

> > I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row.
> I have a different opinion about this. If the input is string type but the UDF implementation calls `row.getLong(0)`, it returns wrong data, which is very bad. With the individual-parameters approach, if you implement UDF with `def call(input: Long)` but the input is string type, analyzer can detect it and fail the query.

I am in agreement with Wenchen on this point. I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety. I believe that Wenchen's proposal will provide stronger query-compile-time safety (i.e. fewer runtime issues) at the expense of less Java-compile-time safety, which seems like a good tradeoff. This also pushes more complexity onto the Spark implementation side for the purposes of reflectively discovering methods and applying casts as necessary, but again, I see this as a good tradeoff for providing what seems to me to be a more user-friendly (albeit slightly more "magical") API.

The biggest questions to me are whether the Spark-side implementation for the reflective API will become too complex to implement well (one of the strengths of the InternalRow API is its simplicity), and whether type erasure will hurt the ability to do reasonable reflective discovery on complex types. To resolve these, I would love to see a POC of Wenchen's proposal.

I am supportive of moving forward with committing a version of the PR that does not include the UDF APIs to make concrete progress towards this SPIP while the discussion plays out, but I also feel Ryan's concern that the API is too integral to the SPIP to move forward without it is reasonable. At this time I'm not supportive of merging the PR as-is because I do not think the API debate has been reasonably settled and it will inevitably be harder to change later rather than getting right the first time.

Really appreciate the active and productive discussion on both sides here!
Thanks,
Erik

On Fri, Feb 26, 2021 at 3:38 PM Chao Sun <[hidden email]> wrote:
Correct me if I'm wrong, but it appears we've basically agreed upon the APIs proposed in the SPIP (forget the naming part):

interface ScalarFunction extends BoundFunction<R> { 
  R produceResult(InternalRow args); 

interface AggregateFunction<S, R> extends BoundFunction<R> { 
  S update(S state, InternalRow input); 
}

together with the rest of the design such as FunctionCatalog and binding process. 

The argument at the moment seems to be whether we want to have SupportsInvoke or [Scalar|Aggregate]FunctionN alongside these, is that correct?
 In order to move this forward, perhaps we can merge the PR as it is (maybe we'll need a vote?) and proceed to discuss these topics? We can also then present separate PRs on top of it, which can help a lot for people within this thread to provide comments.

WDYT?

Best,
Chao

On Wed, Feb 24, 2021 at 10:45 PM Wenchen Fan <[hidden email]> wrote:
I think there is one agreement between us: we need both the individual-parameters and row-parameter APIs(your SupportsInvoke proposal and my VarargsScalarFunction proposal). IIUC the argument now is how to compose these 2 APIs.

Your proposal is to put the row-parameter API in the base ScalaFunction interface, with an optional SupportsInvoke interface for the individual-parameters API. I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

My proposal is to leave the choice to the users. They can pick one from ScalarFunction0ScalarFunction1, ..., VarargsScalarFunction.

More replies below:

> We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

I don't think we agree with it. Whatever UDF API we choose at the end (either individual-parameters or row-parameter), both non-codegen and codegen code paths should just call these Java methods from the UDF API. It doesn't make sense to have different UDF APIs for non-codegen and codegen.

> The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

My initial idea is to not have these 9 interfaces and fully rely on Java reflection. We can do some benchmark, if reflection is not that slow, I think we don't need to add these 9 interfaces. Preso UDF API takes the same approach. And one correction: my proposal is to use InternalRow for varargs UDF, not Object[].

> Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement

You can take a look at the Spark ScalaUDF expression. It has a big match statement for the non-codegen path, but the codegen path is much simpler because we can generate the exact Java code to call the specific UDF. I don't think it's a big problem, or we can use reflection in the non-codegen path to avoid the big match statement.

> Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.

You seem to keep ignoring my proposal that we can check the UDF function signature at the analysis phase to make sure it matches the input types. And with codegen Spark can call the specific function to avoid boxing issues. If you missed my previous example, here is what the generated code looks like:

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

> I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row.

I have a different opinion about this. If the input is string type but the UDF implementation calls `row.getLong(0)`, it returns wrong data, which is very bad. With the individual-parameters approach, if you implement UDF with `def call(input: Long)` but the input is string type, analyzer can detect it and fail the query.

On Thu, Feb 25, 2021 at 6:48 AM Ryan Blue <[hidden email]> wrote:

How functions are called is a really big element of this effort. I don’t want to get in a position where we’ve started committing changes without clear agreement on something so fundamental to the proposal. I’d like to make sure we’re in agreement with a vote on the SPIP before committing anything. That is, after all, the point of the SPIPs.

If people think it would help to have an alternative API in a PR, then that’s fine with me.

Since that PR suggestion is intended to make it easier to understand the technical details, I’ll try to summarize where we’re at now:

  • We agree on the scope of adding FunctionCatalog to load functions
  • We agree with the FunctionCatalog methods and the function binding approach
  • We agree that a bound function will be either a ScalarFunction or an AggregateFunction (plus the mix-in PartialAggregateFunction)
  • We agree that values should be passed should be Spark’s internal representation to avoid translation
  • We agree that ScalarFunction and AggregateFunction can optionally define methods for Spark to directly call in codegen

The disagreement is about how to call functions when codegen isn’t used or when the function needs to support variable-length argument lists. There are two options:

The first option is for each function to have a method that accepts an InternalRow, from the proposed SPIP:

interface ScalarFunction extends BoundFunction<R> {
  R produceResult(InternalRow input);
}
interface AggregateFunction<S> extends BoundFunction<R> {
  S update(S state, InternalRow input);
  ...
}

The second option is to introduce 9 or more interfaces to break out the fields of the input row, and an additional Object[] variation for varargs:

interface ScalarFunction1<T1> extends BoundFunction<R> {
  R produceResult(T1 one);
}
interface ScalarFunction2<T1, T2> extends BoundFunction<R> {
  R produceResult(T1 one, T2 two);
}
... 8 more ScalarFunction interfaces
interface ScalarFunctionVarargs extends BoundFunction<R> {
  R produceResult(Object[] args);
}
interface AggregateFunction<S, T1> extends BoundFunction<R> {
  S update(S state, T1 one);
}
interface AggregateFunction<S, T1, T2> extends BoundFunction<R> {
  S update(S state, T1 one, T2 two);
}
... 8 more AggregateFunction interfaces
interface AggregateFunctionVarargs<S> extends BoundFunction<R> {
  S update(S state, Object[] args);
}

Because this is for the non-invoke case, the two options have roughly the same performance characteristics.

The first option has some advantages:

  • It is simpler: there are few interfaces and Spark will always find the right method
  • Accessing a value returns a concrete type, so it is less error-prone. I’ve given an example where this helps identify a problem with an invoke method.

The second option’s advantage is that users have values broken out into arguments. That is, if I understand Wenchen correctly here: “I don’t like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers.”

Disadvantages with the second option:

  • There are 20+ more interfaces in the API
  • Spark will need additional code to call the right method based on input, so it will either have 10 wrapper classes or a big match statement that calls each interface separately (see UDFRegistration).
  • Spark is always going to essentially call the raw interface with no specific type parameters. As a result, incorrect types (like String) will compile but fail at runtime with ClassCastException.
  • The varargs case will result in casting to expected types, which could also fail with ClassCastException

I think that the InternalRow option is easier to build against because it provides at least some type checking when accessing values from the input row. You get compile-time checks when using the wrong type like this: String val = input.getString(0); won’t compile.

Another important thing to note is that although the original idea was to keep the individual parameter approach simple, Wenchen has already suggested passing arrays as Java arrays, like UTF8String[]. This adds to the complexity of the overall solution and requires matching multiple types. How would Spark know to pass UTF8String[] or ArrayData?

If anyone disagrees with that summary, please point out where it’s incorrect. But barring a major misunderstanding, I think the choice is clear: the simpler approach that provides additional compile-time safety is the right way to go.


On Tue, Feb 23, 2021 at 1:48 AM Wenchen Fan <[hidden email]> wrote:
+1, as I already proposed we can move forward with PRs

> To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

Ryan, can we focus on the function loading and binding part and get it committed first? I can also fork your branch and put everything together, but that might be too big to review.

On Tue, Feb 23, 2021 at 4:35 PM Dongjoon Hyun <[hidden email]> wrote:
I've been still supporting Ryan's SPIP (original PR and its extension proposal discussed here) because of its simplicity.

According to this email thread context, I also understand the different perspectives like Hyukjin's concerns about having multiple ways and Wenchen's proposal and rationales.

It looks like we need more discussion to reach an agreement. And the technical details become more difficult to track because this is an email thread.

Although Ryan initially suggested discussing this on Apache email thread instead of the PR, can we have a PR to discuss?

Especially, Wenchen, could you make your PR based on Ryan's PR?

If we collect the scattered ideas into a single PR, that would be greatly helpful not only for further discussions, but also when we go on a vote on Ryan's PR or Wenchen's PR.

Bests,
Dongjoon.


On Mon, Feb 22, 2021 at 1:16 AM Wenchen Fan <[hidden email]> wrote:
Hi Walaa,

Thanks for sharing this! The type signature stuff is already covered by the unbound UDF API, which specifies the input and output data types. The problem is how to check the method signature of the bound UDF. As you said, Java has type erasure and we can't check `List<String>` for example.

My initial proposal is to do nothing and simply pass the Spark ArrayData, MapData, InternalRow to the UDF. This requires the UDF developers to ensure the type is matched, as they need to call something like `array.getLong(index)` with the corrected type name. It's as worse as the row-parameter version but seems fine as it only happens with nested types. And the type check is still done for the first level (the method signature must use ArrayData/MapData/InternalRow at least).

We can allow more types in the future to make the type check better. It might be too detailed for this discussion thread but just put a few thoughts:
1. Java array doesn't do type erasure. We can use UTF8String[] for example if the input type is array of string.
2. For struct type, we can allow Java beans/Scala case classes if the field name and type match the type signature.
3. For map type, it's actually struct<keys: array<key_type>, values: array<value_type>>, so we can also allow Java beans/Scala case classes here.

The general idea is to use stuff that can retain nested type information at compile-time, i.e. array, java bean, case classes.

Thanks,
Wenchen



On Mon, Feb 22, 2021 at 3:47 PM Walaa Eldin Moustafa <[hidden email]> wrote:
Wenchen, in Transport, users provide the input parameter signatures and output parameter signature as part of the API. Compile-time checks are done by parsing the type signatures and matching them to the type tree received at compile-time. This also helps with inferring the concrete output type.

The specification in the UDF API looks like this:

  @Override
  public List<String> getInputParameterSignatures() {
    return ImmutableList.of(
        "ARRAY(K)",
        "ARRAY(V)"
    );
  }

  @Override
  public String getOutputParameterSignature() {
    return "MAP(K,V)";
  }

The benefits of this type of type signature specification as opposed to inferring types from Java type signatures given in the Java method are:
  • For nested types, Java type erasure eliminates the information about nested types, so for something like my_function(List<String> a1, List<Integer> a2), the value of both a1.class or a2.class is just a List. However, we are planning to work around this in a future version in the case of Array and Map types. Struct types are discussed in the next point.
  • Without pre-code-generation there is no single Java type signature from which we can capture the Struct info. However, Struct info can be expressed in type signatures of the above type, e.g., ROW(FirstName VARCHAR, LastName VARCHAR).
When a Transport UDF represents a Spark UDF, the type signatures are matched against Spark native types, i.e., org.apache.spark.sql.types.{ArrayType, MapType, StructType}, and primitive types. The function that parses/compiles type signatures is found in AbstractTypeInference. This class represents the generic component that is common between all supported engines. Its Spark-specific extension is in SparkTypeInference. In the above example, at compile time, if the first Array happens to be of String element type, and the second Array happens to be of Integer element type, the UDF will communicate to the Spark analyzer that the output should be of type MapData<String, Integer> (i.e., based on what was seen in the input at compile time). The whole UDF becomes a Spark Expression at the end of the day.

Thanks,
Walaa.


On Sun, Feb 21, 2021 at 7:26 PM Wenchen Fan <[hidden email]> wrote:
I think I have made it clear that it's simpler for the UDF developers to deal with the input parameters directly, instead of getting them from a row, as you need to provide the index and type (e.g. row.getLong(0)). It's also coherent with the existing Spark Scala/Java UDF APIs, so that Spark users will be more familiar with the individual-parameters API.

And I have explained it already that we can use reflection to make sure the defined methods have the right types at query-compilation time. It's better than leaving this problem to the UDF developers and asking them to ensure the inputs are gotten from the row correctly with index and type. If there are people from Presto/Transport, it will be great if you can share how Presto/Transport do this check.

I don't like 22 additional interfaces too, but if you look at the examples I gave, the current Spark Java UDF only has 9 interfaces, and Transport has 8. I think it's good enough and people can use VarargsScalarFunction if they need to take more parameters or varargs. It resolves your concern about doing reflection in the non-codegen execution path that leads to bad performance, it also serves for documentation purpose as people can easily see the number of UDF inputs and their types by a quick glance.

As I said, we need to investigate how to avoid boxing. Since you are asking the question now, I spent sometime to think about it. I think the DoubleAdd example is the way to go. For non-codegen code path, we can just call the interface method. For the codegen code path, the generated Java code would look like (omit the null check logic):

double input1 = ...;
double input2 = ...;
DoubleAdd udf = ...;
double res = udf.call(input1, input2);

Which invokes the primitive version automatically. AFAIK this is also how Scala supports primitive type parameter (generate an extra non-boxing version of the method). If the UDF doesn't have the primtive version method, this code will just call the boxed version and still works.

I don't like the SupportsInvoke approach as it still promotes the row-parameter API. I think the individual-parameters API is better for UDF developers. Can other people share your opinions about the API?

On Sat, Feb 20, 2021 at 5:32 AM Ryan Blue <[hidden email]> wrote:

I don’t see any benefit to more complexity with 22 additional interfaces, instead of simply passing an InternalRow. Why not use a single interface with InternalRow? Maybe you could share your motivation?

That would also result in strange duplication, where the ScalarFunction2 method is just the boxed version:

class DoubleAdd implements ScalarFunction2<Double, Double, Double> {
  @Override
  Double produceResult(Double left, Double right) {
    return left + right;
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

This would work okay, but would be awkward if you wanted to use the same implementation for any number of arguments, like a sum method that adds all of the arguments together and returns the result. It also isn’t great for varargs, since it is basically the same as the invoke case.

The combination of an InternalRow method and the invoke method seems to be a good way to handle this to me. What is wrong with it? And, how would you solve the problem when implementations define methods with the wrong types? The InternalRow approach helps implementations catch that problem (as demonstrated above) and also provides a fallback when there is a but preventing the invoke optimization from working. That seems like a good approach to me.


On Thu, Feb 18, 2021 at 11:31 PM Wenchen Fan <[hidden email]> wrote:
If people have such a big concern about reflection, we can follow the current Spark Java UDF and Transport, and create ScalarFuncion0[R]ScalarFuncion1[T1, R], etc. to avoid reflection. But we may need to investigate how to avoid boxing with this API design.

To put a detailed proposal: let's have ScalarFuncion0ScalarFuncion1, ..., ScalarFuncion9 and VarargsScalarFunction. At execution time, if Spark sees ScalarFuncion0-9, pass the input columns to the UDF directly, one column one parameter. So string type input is UTF8String, array type input is ArrayData. If Spark sees VarargsScalarFunction, wrap the input columns with InternalRow and pass it to the UDF.

In general, if VarargsScalarFunction is implemented, the UDF should not implement ScalarFuncion0-9. We can also define a priority order to allow this. I don't have a strong preference here.

What do you think?

On Fri, Feb 19, 2021 at 1:24 PM Walaa Eldin Moustafa <[hidden email]> wrote:
I agree with Ryan on the questions around the expressivity of the Invoke method. It is not clear to me how the Invoke method can be used to declare UDFs with type-parameterized parameters. For example: a UDF to get the Nth element of an array (regardless of the Array element type) or a UDF to merge two Arrays (of generic types) to a Map.

Also, to address Wenchen's InternalRow question, can we create a number of Function classes, each corresponding to a number of input parameter length (e.g., ScalarFunction1, ScalarFunction2, etc)?

Thanks,
Walaa.
 

On Thu, Feb 18, 2021 at 6:07 PM Ryan Blue <[hidden email]> wrote:

I agree with you that it is better in many cases to directly call a method. But it it not better in all cases, which is why I don’t think it is the right general-purpose choice.

First, if codegen isn’t used for some reason, the reflection overhead is really significant. That gets much better when you have an interface to call. That’s one reason I’d use this pattern:

class DoubleAdd implements ScalarFunction<Double>, SupportsInvoke {
  Double produceResult(InternalRow row) {
    return produceResult(row.getDouble(0), row.getDouble(1));
  }

  double produceResult(double left, double right) {
    return left + right;
  }
}

There’s little overhead to adding the InternalRow variation, but we could call it in eval to avoid the reflect overhead. To the point about UDF developers, I think this is a reasonable cost.

Second, I think usability is better and helps avoid runtime issues. Here’s an example:

class StrLen implements ScalarFunction<Integer>, SupportsInvoke {
  Integer produceResult(InternalRow row) {
    return produceResult(row.getString(0));
  }

  Integer produceResult(String str) {
    return str.length();
  }
}

See the bug? I forgot to use UTF8String instead of String. With the InternalRow method, I get a compiler warning because getString produces UTF8String that can’t be passed to produceResult(String). If I decided to implement length separately, then we could still run the InternalRow version and log a warning. The code would be slightly slower, but wouldn’t fail.

There are similar situations with varargs where it’s better to call methods that produce concrete types than to cast from Object to some expected type.

I think that using invoke is a great extension to the proposal, but I don’t think that it should be the only way to call functions. By all means, let’s work on it in parallel and use it where possible. But I think we do need the fallback of using InternalRow and that it isn’t a usability problem to include it.

Oh, and one last thought is that we already have users that call Dataset.map and use InternalRow. This would allow converting that code directly to a UDF.

I think we’re closer to agreeing here than it actually looks. Hopefully you’ll agree that having the InternalRow method isn’t a big usability problem.


On Wed, Feb 17, 2021 at 11:51 PM Wenchen Fan <[hidden email]> wrote:
I don't see any objections to the rest of the proposal (loading functions from the catalog, function binding stuff, etc.) and I assume everyone is OK with it. We can commit that part first.

Currently, the discussion focuses on the `ScalarFunction` API, where I think it's better to directly take the input columns as the UDF parameter, instead of wrapping the input columns with InternalRow and taking the InternalRow as the UDF parameter. It's not only for better performance, but also for ease of use. For example, it's easier for the UDF developer to write `input1 + input2` than `inputRow.getLong(0) + inputRow.getLong(1)`, as they don't need to specify the type and index by themselves (getLong(0)) which is error-prone.

It does push more work to the Spark side, but I think it's worth it if implementing UDF gets easier. I don't think the work is very challenging, as we can leverage the infra we built for the expression encoder.

I think it's also important to look at the UDF API from the user's perspective (UDF developers). How do you like the UDF API without considering how Spark can support it? Do you prefer the individual-parameters version or the row-parameter version?

To move forward, how about we implement the function loading and binding first? Then we can have PRs for both the individual-parameters (I can take it) and row-parameter approaches, if we still can't reach a consensus at that time and need to see all the details.

On Thu, Feb 18, 2021 at 4:48 AM Ryan Blue <[hidden email]> wrote:
Thanks, Hyukjin. I think that's a fair summary. And I agree with the idea that we should focus on what Spark will do by default.

I think we should focus on the proposal, for two reasons: first, there is a straightforward path to incorporate Wenchen's suggestion via `SupportsInvoke`, and second, the proposal is more complete: it defines a solution for many concerns like loading a function and finding out what types to use -- not just how to call code -- and supports more use cases like varargs functions. I think we can continue to discuss the rest of the proposal and be confident that we can support an invoke code path where it makes sense.

Does everyone agree? If not, I think we would need to solve a lot of the challenges that I initially brought up with the invoke idea. It seems like a good way to call a function, but needs a real proposal behind it if we don't use it via `SupportsInvoke` in the current proposal.

On Tue, Feb 16, 2021 at 11:07 PM Hyukjin Kwon <[hidden email]> wrote:

Just to make sure we don’t move past, I think we haven’t decided yet:

  • if we’ll replace the current proposal to Wenchen’s approach as the default
  • if we want to have Wenchen’s approach as an optional mix-in on the top of Ryan’s proposal (SupportsInvoke)

From what I read, some people pointed out it as a replacement. Please correct me if I misread this discussion thread.

As Dongjoon pointed out, it would be good to know rough ETA to make sure making progress in this, and people can compare more easily.


FWIW, there’s the saying I like in the zen of Python:

There should be one— and preferably only one —obvious way to do it.

If multiple approaches have the way for developers to do the (almost) same thing, I would prefer to avoid it.

In addition, I would prefer to focus on what Spark does by default first.


2021년 2월 17일 (수) 오후 2:33, Dongjoon Hyun <[hidden email]>님이 작성:
Hi, Wenchen.

This thread seems to get enough attention. Also, I'm expecting more and more if we have this on the `master` branch because we are developing together.

    > Spark SQL has many active contributors/committers and this thread doesn't get much attention yet.

So, what's your ETA from now?

    > I think the problem here is we were discussing some very detailed things without actual code.
    > I'll implement my idea after the holiday and then we can have more effective discussions.
    > We can also do benchmarks and get some real numbers.
    > In the meantime, we can continue to discuss other parts of this proposal, and make a prototype if possible.

I'm looking forward to seeing your PR. I hope we can conclude this thread and have at least one implementation in the `master` branch this month (February).
If you need more time (one month or longer), why don't we have Ryan's suggestion in the `master` branch first and benchmark with your PR later during Apache Spark 3.2 timeframe.

Bests,
Dongjoon.


On Tue, Feb 16, 2021 at 9:26 AM Ryan Blue <[hidden email]> wrote:
Andrew,

The proposal already includes an API for aggregate functions and I think we would want to implement those right away.

Processing ColumnBatch is something we can easily extend the interfaces to support, similar to Wenchen's suggestion. The important thing right now is to agree on some basic functionality: how to look up functions and what the simple API should be. Like the TableCatalog interfaces, we will layer on more support through optional interfaces like `SupportsInvoke` or `SupportsColumnBatch`.

On Tue, Feb 16, 2021 at 9:00 AM Andrew Melo <[hidden email]> wrote:
Hello Ryan,

This proposal looks very interesting. Would future goals for this
functionality include both support for aggregation functions, as well
as support for processing ColumnBatch-es (instead of Row/InternalRow)?

Thanks
Andrew

On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue <[hidden email]> wrote:
>
> Thanks for the positive feedback, everyone. It sounds like there is a clear path forward for calling functions. Even without a prototype, the `invoke` plans show that Wenchen's suggested optimization can be done, and incorporating it as an optional extension to this proposal solves many of the unknowns.
>
> With that area now understood, is there any discussion about other parts of the proposal, besides the function call interface?
>
> On Fri, Feb 12, 2021 at 10:40 PM Chao Sun <[hidden email]> wrote:
>>
>> This is an important feature which can unblock several other projects including bucket join support for DataSource v2, complete support for enforcing DataSource v2 distribution requirements on the write path, etc. I like Ryan's proposals which look simple and elegant, with nice support on function overloading and variadic arguments. On the other hand, I think Wenchen made a very good point about performance. Overall, I'm excited to see active discussions on this topic and believe the community will come to a proposal with the best of both sides.
>>
>> Chao
>>
>> On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon <[hidden email]> wrote:
>>>
>>> +1 for Liang-chi's.
>>>
>>> Thanks Ryan and Wenchen for leading this.
>>>
>>>
>>> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh <[hidden email]>님이 작성:
>>>>
>>>> Basically I think the proposal makes sense to me and I'd like to support the
>>>> SPIP as it looks like we have strong need for the important feature.
>>>>
>>>> Thanks Ryan for working on this and I do also look forward to Wenchen's
>>>> implementation. Thanks for the discussion too.
>>>>
>>>> Actually I think the SupportsInvoke proposed by Ryan looks a good
>>>> alternative to me. Besides Wenchen's alternative implementation, is there a
>>>> chance we also have the SupportsInvoke for comparison?
>>>>
>>>>
>>>> John Zhuge wrote
>>>> > Excited to see our Spark community rallying behind this important feature!
>>>> >
>>>> > The proposal lays a solid foundation of minimal feature set with careful
>>>> > considerations for future optimizations and extensions. Can't wait to see
>>>> > it leading to more advanced functionalities like views with shared custom
>>>> > functions, function pushdown, lambda, etc. It has already borne fruit from
>>>> > the constructive collaborations in this thread. Looking forward to
>>>> > Wenchen's prototype and further discussions including the SupportsInvoke
>>>> > extension proposed by Ryan.
>>>> >
>>>> >
>>>> > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley &lt;
>>>>
>>>> > owen.omalley@
>>>>
>>>> > &gt;
>>>> > wrote:
>>>> >
>>>> >> I think this proposal is a very good thing giving Spark a standard way of
>>>> >> getting to and calling UDFs.
>>>> >>
>>>> >> I like having the ScalarFunction as the API to call the UDFs. It is
>>>> >> simple, yet covers all of the polymorphic type cases well. I think it
>>>> >> would
>>>> >> also simplify using the functions in other contexts like pushing down
>>>> >> filters into the ORC & Parquet readers although there are a lot of
>>>> >> details
>>>> >> that would need to be considered there.
>>>> >>
>>>> >> .. Owen
>>>> >>
>>>> >>
>>>> >> On Fri, Feb 12, 2021 at 11:07 PM Erik Krogen &lt;
>>>>
>>>> > ekrogen@.com
>>>>
>>>> > &gt;
>>>> >> wrote:
>>>> >>
>>>> >>> I agree that there is a strong need for a FunctionCatalog within Spark
>>>> >>> to
>>>> >>> provide support for shareable UDFs, as well as make movement towards
>>>> >>> more
>>>> >>> advanced functionality like views which themselves depend on UDFs, so I
>>>> >>> support this SPIP wholeheartedly.
>>>> >>>
>>>> >>> I find both of the proposed UDF APIs to be sufficiently user-friendly
>>>> >>> and
>>>> >>> extensible. I generally think Wenchen's proposal is easier for a user to
>>>> >>> work with in the common case, but has greater potential for confusing
>>>> >>> and
>>>> >>> hard-to-debug behavior due to use of reflective method signature
>>>> >>> searches.
>>>> >>> The merits on both sides can hopefully be more properly examined with
>>>> >>> code,
>>>> >>> so I look forward to seeing an implementation of Wenchen's ideas to
>>>> >>> provide
>>>> >>> a more concrete comparison. I am optimistic that we will not let the
>>>> >>> debate
>>>> >>> over this point unreasonably stall the SPIP from making progress.
>>>> >>>
>>>> >>> Thank you to both Wenchen and Ryan for your detailed consideration and
>>>> >>> evaluation of these ideas!
>>>> >>> ------------------------------
>>>> >>> *From:* Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> *Sent:* Wednesday, February 10, 2021 6:06 PM
>>>> >>> *To:* Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt;
>>>> >>> *Cc:* Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;; Hyukjin Kwon <
>>>> >>>
>>>>
>>>> > gurwls223@
>>>>
>>>> >>; Spark Dev List &lt;
>>>>
>>>> > dev@.apache
>>>>
>>>> > &gt;; Wenchen Fan
>>>> >>> &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt;
>>>> >>> *Subject:* Re: [DISCUSS] SPIP: FunctionCatalog
>>>> >>>
>>>> >>> BTW, I forgot to add my opinion explicitly in this thread because I was
>>>> >>> on the PR before this thread.
>>>> >>>
>>>> >>> 1. The `FunctionCatalog API` PR was made on May 9, 2019 and has been
>>>> >>> there for almost two years.
>>>> >>> 2. I already gave my +1 on that PR last Saturday because I agreed with
>>>> >>> the latest updated design docs and AS-IS PR.
>>>> >>>
>>>> >>> And, the rest of the progress in this thread is also very satisfying to
>>>> >>> me.
>>>> >>> (e.g. Ryan's extension suggestion and Wenchen's alternative)
>>>> >>>
>>>> >>> To All:
>>>> >>> Please take a look at the design doc and the PR, and give us some
>>>> >>> opinions.
>>>> >>> We really need your participation in order to make DSv2 more complete.
>>>> >>> This will unblock other DSv2 features, too.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:58 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Hi, Ryan.
>>>> >>>
>>>> >>> We didn't move past anything (both yours and Wenchen's). What Wenchen
>>>> >>> suggested is double-checking the alternatives with the implementation to
>>>> >>> give more momentum to our discussion.
>>>> >>>
>>>> >>> Your new suggestion about optional extention also sounds like a new
>>>> >>> reasonable alternative to me.
>>>> >>>
>>>> >>> We are still discussing this topic together and I hope we can make a
>>>> >>> conclude at this time (for Apache Spark 3.2) without being stucked like
>>>> >>> last time.
>>>> >>>
>>>> >>> I really appreciate your leadership in this dicsussion and the moving
>>>> >>> direction of this discussion looks constructive to me. Let's give some
>>>> >>> time
>>>> >>> to the alternatives.
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:14 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> I don’t think we should so quickly move past the drawbacks of this
>>>> >>> approach. The problems are significant enough that using invoke is not
>>>> >>> sufficient on its own. But, I think we can add it as an optional
>>>> >>> extension
>>>> >>> to shore up the weaknesses.
>>>> >>>
>>>> >>> Here’s a summary of the drawbacks:
>>>> >>>
>>>> >>>    - Magic function signatures are error-prone
>>>> >>>    - Spark would need considerable code to help users find what went
>>>> >>>    wrong
>>>> >>>    - Spark would likely need to coerce arguments (e.g., String,
>>>> >>>    Option[Int]) for usability
>>>> >>>    - It is unclear how Spark will find the Java Method to call
>>>> >>>    - Use cases that require varargs fall back to casting; users will
>>>> >>>    also get this wrong (cast to String instead of UTF8String)
>>>> >>>    - The non-codegen path is significantly slower
>>>> >>>
>>>> >>> The benefit of invoke is to avoid moving data into a row, like this:
>>>> >>>
>>>> >>> -- using invoke
>>>> >>> int result = udfFunction(x, y)
>>>> >>>
>>>> >>> -- using row
>>>> >>> udfRow.update(0, x); -- actual: values[0] = x;
>>>> >>> udfRow.update(1, y);
>>>> >>> int result = udfFunction(udfRow);
>>>> >>>
>>>> >>> And, again, that won’t actually help much in cases that require varargs.
>>>> >>>
>>>> >>> I suggest we add a new marker trait for BoundMethod called
>>>> >>> SupportsInvoke.
>>>> >>> If that interface is implemented, then Spark will look for a method that
>>>> >>> matches the expected signature based on the bound input type. If it
>>>> >>> isn’t
>>>> >>> found, Spark can print a warning and fall back to the InternalRow call:
>>>> >>> “Cannot find udfFunction(int, int)”.
>>>> >>>
>>>> >>> This approach allows the invoke optimization, but solves many of the
>>>> >>> problems:
>>>> >>>
>>>> >>>    - The method to invoke is found using the proposed load and bind
>>>> >>>    approach
>>>> >>>    - Magic function signatures are optional and do not cause runtime
>>>> >>>    failures
>>>> >>>    - Because this is an optional optimization, Spark can be more strict
>>>> >>>    about types
>>>> >>>    - Varargs cases can still use rows
>>>> >>>    - Non-codegen can use an evaluation method rather than falling back
>>>> >>>    to slow Java reflection
>>>> >>>
>>>> >>> This seems like a good extension to me; this provides a plan for
>>>> >>> optimizing the UDF call to avoid building a row, while the existing
>>>> >>> proposal covers the other cases well and addresses how to locate these
>>>> >>> function calls.
>>>> >>>
>>>> >>> This also highlights that the approach used in DSv2 and this proposal is
>>>> >>> working: start small and use extensions to layer on more complex
>>>> >>> support.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 9:04 AM Dongjoon Hyun &lt;
>>>>
>>>> > dongjoon.hyun@
>>>>
>>>> > &gt;
>>>> >>> wrote:
>>>> >>>
>>>> >>> Thank you all for making a giant move forward for Apache Spark 3.2.0.
>>>> >>> I'm really looking forward to seeing Wenchen's implementation.
>>>> >>> That would be greatly helpful to make a decision!
>>>> >>>
>>>> >>> > I'll implement my idea after the holiday and then we can have
>>>> >>> more effective discussions. We can also do benchmarks and get some real
>>>> >>> numbers.
>>>> >>> > FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067978066%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=iMWmHqqXPcT7EK%2Bovyzhy%2BZpU6Llih%2BwdZD53wvobmc%3D&amp;reserved=0&gt;
>>>> >>> also
>>>> >>> takes individual parameters instead of the row parameter. I think this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> Bests,
>>>> >>> Dongjoon.
>>>> >>>
>>>> >>>
>>>> >>> On Tue, Feb 9, 2021 at 10:18 PM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> FYI: the Presto UDF API
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fprestodb.io%2Fdocs%2Fcurrent%2Fdevelop%2Ffunctions.html&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=ZSBCR7yx3PpwL4KY9V73JG42Z02ZodqkjxC0LweHt1g%3D&amp;reserved=0&gt;
>>>> >>> also takes individual parameters instead of the row parameter. I think
>>>> >>> this
>>>> >>> direction at least worth a try so that we can see the performance
>>>> >>> difference. It's also mentioned in the design doc as an alternative
>>>> >>> (Trino).
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 10:18 AM Wenchen Fan &lt;
>>>>
>>>> > cloud0fan@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi Holden,
>>>> >>>
>>>> >>> As Hyukjin said, following existing designs is not the principle of DS
>>>> >>> v2
>>>> >>> API design. We should make sure the DS v2 API makes sense. AFAIK we
>>>> >>> didn't
>>>> >>> fully follow the catalog API design from Hive and I believe Ryan also
>>>> >>> agrees with it.
>>>> >>>
>>>> >>> I think the problem here is we were discussing some very detailed things
>>>> >>> without actual code. I'll implement my idea after the holiday and then
>>>> >>> we
>>>> >>> can have more effective discussions. We can also do benchmarks and get
>>>> >>> some
>>>> >>> real numbers.
>>>> >>>
>>>> >>> In the meantime, we can continue to discuss other parts of this
>>>> >>> proposal,
>>>> >>> and make a prototype if possible. Spark SQL has many active
>>>> >>> contributors/committers and this thread doesn't get much attention yet.
>>>> >>>
>>>> >>> On Wed, Feb 10, 2021 at 6:17 AM Hyukjin Kwon &lt;
>>>>
>>>> > gurwls223@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Just dropping a few lines. I remember that one of the goals in DSv2 is
>>>> >>> to
>>>> >>> correct the mistakes we made in the current Spark codes.
>>>> >>> It would not have much point if we will happen to just follow and mimic
>>>> >>> what Spark currently does. It might just end up with another copy of
>>>> >>> Spark
>>>> >>> APIs, e.g. Expression (internal) APIs. I sincerely would like to avoid
>>>> >>> this
>>>> >>> I do believe we have been stuck mainly due to trying to come up with a
>>>> >>> better design. We already have an ugly picture of the current Spark APIs
>>>> >>> to
>>>> >>> draw a better bigger picture.
>>>> >>>
>>>> >>>
>>>> >>> 2021년 2월 10일 (수) 오전 3:28, Holden Karau &lt;
>>>>
>>>> > holden@
>>>>
>>>> > &gt;님이 작성:
>>>> >>>
>>>> >>> I think this proposal is a good set of trade-offs and has existed in the
>>>> >>> community for a long period of time. I especially appreciate how the
>>>> >>> design
>>>> >>> is focused on a minimal useful component, with future optimizations
>>>> >>> considered from a point of view of making sure it's flexible, but actual
>>>> >>> concrete decisions left for the future once we see how this API is used.
>>>> >>> I
>>>> >>> think if we try and optimize everything right out of the gate, we'll
>>>> >>> quickly get stuck (again) and not make any progress.
>>>> >>>
>>>> >>> On Mon, Feb 8, 2021 at 10:46 AM Ryan Blue &lt;
>>>>
>>>> > blue@
>>>>
>>>> > &gt; wrote:
>>>> >>>
>>>> >>> Hi everyone,
>>>> >>>
>>>> >>> I'd like to start a discussion for adding a FunctionCatalog interface to
>>>> >>> catalog plugins. This will allow catalogs to expose functions to Spark,
>>>> >>> similar to how the TableCatalog interface allows a catalog to expose
>>>> >>> tables. The proposal doc is available here:
>>>> >>> https://docs.google.com/document/d/1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U/edit
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1PLBieHIlxZjmoUB0ERF-VozCRJ0xw2j3qKvUNWpWA2U%2Fedit&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=Kyth8%2FhNUZ6GXG2FsgcknZ7t7s0%2BpxnDMPyxvsxLLqE%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Here's a high-level summary of some of the main design choices:
>>>> >>> * Adds the ability to list and load functions, not to create or modify
>>>> >>> them in an external catalog
>>>> >>> * Supports scalar, aggregate, and partial aggregate functions
>>>> >>> * Uses load and bind steps for better error messages and simpler
>>>> >>> implementations
>>>> >>> * Like the DSv2 table read and write APIs, it uses InternalRow to pass
>>>> >>> data
>>>> >>> * Can be extended using mix-in interfaces to add vectorization, codegen,
>>>> >>> and other future features
>>>> >>>
>>>> >>> There is also a PR with the proposed API:
>>>> >>> https://github.com/apache/spark/pull/24559/files
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F24559%2Ffiles&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067988024%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=t3ZCqffdsrmCY3X%2FT8x1oMjMcNUiQ0wQNk%2ByAXQx1Io%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> Let's discuss the proposal here rather than on that PR, to get better
>>>> >>> visibility. Also, please take the time to read the proposal first. That
>>>> >>> really helps clear up misconceptions.
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Twitter: https://twitter.com/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=fVfSPIyazuUYv8VLfNu%2BUIHdc3ePM1AAKKH%2BlnIicF8%3D&amp;reserved=0&gt;
>>>> >>> Books (Learning Spark, High Performance Spark, etc.):
>>>> >>> https://amzn.to/2MaRAG9
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Famzn.to%2F2MaRAG9&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060067997978%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=NbRl9kK%2B6Wy0jWmDnztYp3JCPNLuJvmFsLHUrXzEhlk%3D&amp;reserved=0&gt;
>>>> >>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>> >>> &lt;https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.youtube.com%2Fuser%2Fholdenkarau&amp;data=04%7C01%7Cekrogen%40linkedin.com%7C0ccf8c15abd74dfc974f08d8ce31ae4d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637486060068007935%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=OWXOBELzO3hBa2JI%2FOSBZ3oNyLq0yr%2FGXMkNn7bqYDM%3D&amp;reserved=0&gt;
>>>> >>>
>>>> >>> --
>>>> >>> Ryan Blue
>>>> >>>
>>>> >>>
>>>> >
>>>> > --
>>>> > John Zhuge
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: [hidden email]
>>>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Ryan Blue

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

cloud0fan
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Ryan Blue

Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing

The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit.

If we do want to reuse one UDF for different types, using “magical methods” solves the problem

Yes, that’s correct. We agree that magic methods are a good option for this.

Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long> you’d have to fall back to ScalarFunction2<Object, Object> and cast.

The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose.

The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting.

I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.

I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types.

And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time.

I agree that Object[] is worse than InternalRow

Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited.

Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types.


On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan <[hidden email]> wrote:
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Dongjoon Hyun-2
Hi, All.

We shared many opinions in different perspectives.
However, we didn't reach a consensus even on a partial merge by excluding something
(on the PR by me, on this mailing thread by Wenchen).

For the following claims, we have another alternative to mitigate it.

    > I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

Why don't we merge the AS-IS PR by adding something instead of excluding something?

    - R produceResult(InternalRow input);
    + default R produceResult(InternalRow input) throws Exception {
    +   throw new UnsupportedOperationException();
    + }

By providing the default implementation, it will not *forcing users to implement it* technically.
And, we can provide a document about our expected usage properly.
What do you think?

Bests,
Dongjoon.



On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue <[hidden email]> wrote:

Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing

The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit.

If we do want to reuse one UDF for different types, using “magical methods” solves the problem

Yes, that’s correct. We agree that magic methods are a good option for this.

Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long> you’d have to fall back to ScalarFunction2<Object, Object> and cast.

The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose.

The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting.

I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.

I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types.

And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time.

I agree that Object[] is worse than InternalRow

Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited.

Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types.


On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan <[hidden email]> wrote:
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Ryan Blue

Good point, Dongjoon. I think we can probably come to some compromise here:

  • Remove SupportsInvoke since it isn’t really needed. We should always try to find the right method to invoke in the codegen path.
  • Add a default implementation of produceResult so that implementations don’t have to use it. If they don’t implement it and a magic function can’t be found, then it will throw UnsupportedOperationException

This is assuming that we can agree not to introduce all of the ScalarFunction interface variations, which would have limited utility because of type erasure.

Does that sound like a good plan to everyone? If so, I’ll update the SPIP doc so we can move forward.


On Wed, Mar 3, 2021 at 4:36 PM Dongjoon Hyun <[hidden email]> wrote:
Hi, All.

We shared many opinions in different perspectives.
However, we didn't reach a consensus even on a partial merge by excluding something
(on the PR by me, on this mailing thread by Wenchen).

For the following claims, we have another alternative to mitigate it.

    > I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

Why don't we merge the AS-IS PR by adding something instead of excluding something?

    - R produceResult(InternalRow input);
    + default R produceResult(InternalRow input) throws Exception {
    +   throw new UnsupportedOperationException();
    + }

By providing the default implementation, it will not *forcing users to implement it* technically.
And, we can provide a document about our expected usage properly.
What do you think?

Bests,
Dongjoon.



On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue <[hidden email]> wrote:

Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing

The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit.

If we do want to reuse one UDF for different types, using “magical methods” solves the problem

Yes, that’s correct. We agree that magic methods are a good option for this.

Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long> you’d have to fall back to ScalarFunction2<Object, Object> and cast.

The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose.

The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting.

I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.

I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types.

And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time.

I agree that Object[] is worse than InternalRow

Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited.

Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types.


On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan <[hidden email]> wrote:
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

cloud0fan
+1 to this proposal. If people don't like the ScalarFunction0,1, ... variants and prefer the "magical methods", then we can have a single ScalarFunction interface which has the row-parameter API (with a default implementation to fail) and documents to describe the "magical methods" (which can be done later).

I'll start the PR review this week to check the naming, doc, etc.

Thanks all for the discussion here and let's move forward!

On Thu, Mar 4, 2021 at 9:53 AM Ryan Blue <[hidden email]> wrote:

Good point, Dongjoon. I think we can probably come to some compromise here:

  • Remove SupportsInvoke since it isn’t really needed. We should always try to find the right method to invoke in the codegen path.
  • Add a default implementation of produceResult so that implementations don’t have to use it. If they don’t implement it and a magic function can’t be found, then it will throw UnsupportedOperationException

This is assuming that we can agree not to introduce all of the ScalarFunction interface variations, which would have limited utility because of type erasure.

Does that sound like a good plan to everyone? If so, I’ll update the SPIP doc so we can move forward.


On Wed, Mar 3, 2021 at 4:36 PM Dongjoon Hyun <[hidden email]> wrote:
Hi, All.

We shared many opinions in different perspectives.
However, we didn't reach a consensus even on a partial merge by excluding something
(on the PR by me, on this mailing thread by Wenchen).

For the following claims, we have another alternative to mitigate it.

    > I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

Why don't we merge the AS-IS PR by adding something instead of excluding something?

    - R produceResult(InternalRow input);
    + default R produceResult(InternalRow input) throws Exception {
    +   throw new UnsupportedOperationException();
    + }

By providing the default implementation, it will not *forcing users to implement it* technically.
And, we can provide a document about our expected usage properly.
What do you think?

Bests,
Dongjoon.



On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue <[hidden email]> wrote:

Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing

The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit.

If we do want to reuse one UDF for different types, using “magical methods” solves the problem

Yes, that’s correct. We agree that magic methods are a good option for this.

Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long> you’d have to fall back to ScalarFunction2<Object, Object> and cast.

The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose.

The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting.

I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.

I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types.

And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time.

I agree that Object[] is worse than InternalRow

Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited.

Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types.


On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan <[hidden email]> wrote:
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

John Zhuge
+1 Good plan to move forward.

Thank you all for the constructive and comprehensive discussions in this thread! Decisions on this important feature will have ramifications for years to come.

On Wed, Mar 3, 2021 at 7:42 PM Wenchen Fan <[hidden email]> wrote:
+1 to this proposal. If people don't like the ScalarFunction0,1, ... variants and prefer the "magical methods", then we can have a single ScalarFunction interface which has the row-parameter API (with a default implementation to fail) and documents to describe the "magical methods" (which can be done later).

I'll start the PR review this week to check the naming, doc, etc.

Thanks all for the discussion here and let's move forward!

On Thu, Mar 4, 2021 at 9:53 AM Ryan Blue <[hidden email]> wrote:

Good point, Dongjoon. I think we can probably come to some compromise here:

  • Remove SupportsInvoke since it isn’t really needed. We should always try to find the right method to invoke in the codegen path.
  • Add a default implementation of produceResult so that implementations don’t have to use it. If they don’t implement it and a magic function can’t be found, then it will throw UnsupportedOperationException

This is assuming that we can agree not to introduce all of the ScalarFunction interface variations, which would have limited utility because of type erasure.

Does that sound like a good plan to everyone? If so, I’ll update the SPIP doc so we can move forward.


On Wed, Mar 3, 2021 at 4:36 PM Dongjoon Hyun <[hidden email]> wrote:
Hi, All.

We shared many opinions in different perspectives.
However, we didn't reach a consensus even on a partial merge by excluding something
(on the PR by me, on this mailing thread by Wenchen).

For the following claims, we have another alternative to mitigate it.

    > I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

Why don't we merge the AS-IS PR by adding something instead of excluding something?

    - R produceResult(InternalRow input);
    + default R produceResult(InternalRow input) throws Exception {
    +   throw new UnsupportedOperationException();
    + }

By providing the default implementation, it will not *forcing users to implement it* technically.
And, we can provide a document about our expected usage properly.
What do you think?

Bests,
Dongjoon.



On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue <[hidden email]> wrote:

Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing

The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit.

If we do want to reuse one UDF for different types, using “magical methods” solves the problem

Yes, that’s correct. We agree that magic methods are a good option for this.

Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long> you’d have to fall back to ScalarFunction2<Object, Object> and cast.

The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose.

The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting.

I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.

I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types.

And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time.

I agree that Object[] is worse than InternalRow

Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited.

Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types.


On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan <[hidden email]> wrote:
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Erik Krogen
+1 on Dongjoon's proposal. This is a very nice compromise between the reflective/magic-method approach and the InternalRow approach, providing a lot of flexibility for our users, and allowing for the more complicated reflection-based approach to evolve at its own pace, since you can always fall back to InternalRow for situations which aren't yet supported by reflection.

We can even consider having Spark code detect that you haven't overridden the default produceResult (IIRC this is discoverable via reflection), and raise an error at query analysis time instead of at runtime when it can't find a reflective method or an overridden produceResult.

I'm very pleased we have found a compromise that everyone seems happy with! Big thanks to everyone who participated.

On Wed, Mar 3, 2021 at 8:34 PM John Zhuge <[hidden email]> wrote:
+1 Good plan to move forward.

Thank you all for the constructive and comprehensive discussions in this thread! Decisions on this important feature will have ramifications for years to come.

On Wed, Mar 3, 2021 at 7:42 PM Wenchen Fan <[hidden email]> wrote:
+1 to this proposal. If people don't like the ScalarFunction0,1, ... variants and prefer the "magical methods", then we can have a single ScalarFunction interface which has the row-parameter API (with a default implementation to fail) and documents to describe the "magical methods" (which can be done later).

I'll start the PR review this week to check the naming, doc, etc.

Thanks all for the discussion here and let's move forward!

On Thu, Mar 4, 2021 at 9:53 AM Ryan Blue <[hidden email]> wrote:

Good point, Dongjoon. I think we can probably come to some compromise here:

  • Remove SupportsInvoke since it isn’t really needed. We should always try to find the right method to invoke in the codegen path.
  • Add a default implementation of produceResult so that implementations don’t have to use it. If they don’t implement it and a magic function can’t be found, then it will throw UnsupportedOperationException

This is assuming that we can agree not to introduce all of the ScalarFunction interface variations, which would have limited utility because of type erasure.

Does that sound like a good plan to everyone? If so, I’ll update the SPIP doc so we can move forward.


On Wed, Mar 3, 2021 at 4:36 PM Dongjoon Hyun <[hidden email]> wrote:
Hi, All.

We shared many opinions in different perspectives.
However, we didn't reach a consensus even on a partial merge by excluding something
(on the PR by me, on this mailing thread by Wenchen).

For the following claims, we have another alternative to mitigate it.

    > I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

Why don't we merge the AS-IS PR by adding something instead of excluding something?

    - R produceResult(InternalRow input);
    + default R produceResult(InternalRow input) throws Exception {
    +   throw new UnsupportedOperationException();
    + }

By providing the default implementation, it will not *forcing users to implement it* technically.
And, we can provide a document about our expected usage properly.
What do you think?

Bests,
Dongjoon.



On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue <[hidden email]> wrote:

Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing

The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit.

If we do want to reuse one UDF for different types, using “magical methods” solves the problem

Yes, that’s correct. We agree that magic methods are a good option for this.

Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long> you’d have to fall back to ScalarFunction2<Object, Object> and cast.

The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose.

The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting.

I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.

I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types.

And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time.

I agree that Object[] is worse than InternalRow

Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited.

Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types.


On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan <[hidden email]> wrote:
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Ryan Blue
Okay, great. I'll update the SPIP doc and call a vote in the next day or two.

On Thu, Mar 4, 2021 at 8:26 AM Erik Krogen <[hidden email]> wrote:
+1 on Dongjoon's proposal. This is a very nice compromise between the reflective/magic-method approach and the InternalRow approach, providing a lot of flexibility for our users, and allowing for the more complicated reflection-based approach to evolve at its own pace, since you can always fall back to InternalRow for situations which aren't yet supported by reflection.

We can even consider having Spark code detect that you haven't overridden the default produceResult (IIRC this is discoverable via reflection), and raise an error at query analysis time instead of at runtime when it can't find a reflective method or an overridden produceResult.

I'm very pleased we have found a compromise that everyone seems happy with! Big thanks to everyone who participated.

On Wed, Mar 3, 2021 at 8:34 PM John Zhuge <[hidden email]> wrote:
+1 Good plan to move forward.

Thank you all for the constructive and comprehensive discussions in this thread! Decisions on this important feature will have ramifications for years to come.

On Wed, Mar 3, 2021 at 7:42 PM Wenchen Fan <[hidden email]> wrote:
+1 to this proposal. If people don't like the ScalarFunction0,1, ... variants and prefer the "magical methods", then we can have a single ScalarFunction interface which has the row-parameter API (with a default implementation to fail) and documents to describe the "magical methods" (which can be done later).

I'll start the PR review this week to check the naming, doc, etc.

Thanks all for the discussion here and let's move forward!

On Thu, Mar 4, 2021 at 9:53 AM Ryan Blue <[hidden email]> wrote:

Good point, Dongjoon. I think we can probably come to some compromise here:

  • Remove SupportsInvoke since it isn’t really needed. We should always try to find the right method to invoke in the codegen path.
  • Add a default implementation of produceResult so that implementations don’t have to use it. If they don’t implement it and a magic function can’t be found, then it will throw UnsupportedOperationException

This is assuming that we can agree not to introduce all of the ScalarFunction interface variations, which would have limited utility because of type erasure.

Does that sound like a good plan to everyone? If so, I’ll update the SPIP doc so we can move forward.


On Wed, Mar 3, 2021 at 4:36 PM Dongjoon Hyun <[hidden email]> wrote:
Hi, All.

We shared many opinions in different perspectives.
However, we didn't reach a consensus even on a partial merge by excluding something
(on the PR by me, on this mailing thread by Wenchen).

For the following claims, we have another alternative to mitigate it.

    > I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

Why don't we merge the AS-IS PR by adding something instead of excluding something?

    - R produceResult(InternalRow input);
    + default R produceResult(InternalRow input) throws Exception {
    +   throw new UnsupportedOperationException();
    + }

By providing the default implementation, it will not *forcing users to implement it* technically.
And, we can provide a document about our expected usage properly.
What do you think?

Bests,
Dongjoon.



On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue <[hidden email]> wrote:

Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing

The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit.

If we do want to reuse one UDF for different types, using “magical methods” solves the problem

Yes, that’s correct. We agree that magic methods are a good option for this.

Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long> you’d have to fall back to ScalarFunction2<Object, Object> and cast.

The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose.

The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting.

I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.

I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types.

And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time.

I agree that Object[] is worse than InternalRow

Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited.

Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types.


On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan <[hidden email]> wrote:
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Chao Sun
+1 on Dongjoon's proposal. Great to see this is getting moved forward and thanks everyone for the insightful discussion!



On Thu, Mar 4, 2021 at 8:58 AM Ryan Blue <[hidden email]> wrote:
Okay, great. I'll update the SPIP doc and call a vote in the next day or two.

On Thu, Mar 4, 2021 at 8:26 AM Erik Krogen <[hidden email]> wrote:
+1 on Dongjoon's proposal. This is a very nice compromise between the reflective/magic-method approach and the InternalRow approach, providing a lot of flexibility for our users, and allowing for the more complicated reflection-based approach to evolve at its own pace, since you can always fall back to InternalRow for situations which aren't yet supported by reflection.

We can even consider having Spark code detect that you haven't overridden the default produceResult (IIRC this is discoverable via reflection), and raise an error at query analysis time instead of at runtime when it can't find a reflective method or an overridden produceResult.

I'm very pleased we have found a compromise that everyone seems happy with! Big thanks to everyone who participated.

On Wed, Mar 3, 2021 at 8:34 PM John Zhuge <[hidden email]> wrote:
+1 Good plan to move forward.

Thank you all for the constructive and comprehensive discussions in this thread! Decisions on this important feature will have ramifications for years to come.

On Wed, Mar 3, 2021 at 7:42 PM Wenchen Fan <[hidden email]> wrote:
+1 to this proposal. If people don't like the ScalarFunction0,1, ... variants and prefer the "magical methods", then we can have a single ScalarFunction interface which has the row-parameter API (with a default implementation to fail) and documents to describe the "magical methods" (which can be done later).

I'll start the PR review this week to check the naming, doc, etc.

Thanks all for the discussion here and let's move forward!

On Thu, Mar 4, 2021 at 9:53 AM Ryan Blue <[hidden email]> wrote:

Good point, Dongjoon. I think we can probably come to some compromise here:

  • Remove SupportsInvoke since it isn’t really needed. We should always try to find the right method to invoke in the codegen path.
  • Add a default implementation of produceResult so that implementations don’t have to use it. If they don’t implement it and a magic function can’t be found, then it will throw UnsupportedOperationException

This is assuming that we can agree not to introduce all of the ScalarFunction interface variations, which would have limited utility because of type erasure.

Does that sound like a good plan to everyone? If so, I’ll update the SPIP doc so we can move forward.


On Wed, Mar 3, 2021 at 4:36 PM Dongjoon Hyun <[hidden email]> wrote:
Hi, All.

We shared many opinions in different perspectives.
However, we didn't reach a consensus even on a partial merge by excluding something
(on the PR by me, on this mailing thread by Wenchen).

For the following claims, we have another alternative to mitigate it.

    > I don't like it because it promotes the row-parameter API and forces users to implement it, even if the users want to only use the individual-parameters API.

Why don't we merge the AS-IS PR by adding something instead of excluding something?

    - R produceResult(InternalRow input);
    + default R produceResult(InternalRow input) throws Exception {
    +   throw new UnsupportedOperationException();
    + }

By providing the default implementation, it will not *forcing users to implement it* technically.
And, we can provide a document about our expected usage properly.
What do you think?

Bests,
Dongjoon.



On Wed, Mar 3, 2021 at 10:28 AM Ryan Blue <[hidden email]> wrote:

Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing

The question is not whether to use the magic functions, which would not need boxing. The question here is whether to use multiple ScalarFunction interfaces. Those interfaces would require boxing or using Object[] so there isn’t a benefit.

If we do want to reuse one UDF for different types, using “magical methods” solves the problem

Yes, that’s correct. We agree that magic methods are a good option for this.

Again, the question we need to decide is whether to use InternalRow or interfaces like ScalarFunction2 for non-codegen. The option to use multiple interfaces is limited by type erasure because you can only have one set of type parameters. If you wanted to support both ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long> you’d have to fall back to ScalarFunction2<Object, Object> and cast.

The point is that type erasure will commonly lead either to many different implementation classes (one for each type combination) or will lead to parameterizing by Object, which defeats the purpose.

The alternative adds safety because correct types are produced by calls like getLong(0). Yes, this depends on the implementation making the correct calls, but it is better than using Object and casting.

I can’t think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.

I think this is addressed by the type erasure discussion above. A simple Plus method would require Object or 12 implementations for 2 arguments and 4 numeric types.

And basically all varargs cases would need to use Object[]. Consider a UDF to create a map that requires string keys and some consistent type for values. This would be easy with the InternalRow API because you can use getString(pos) and get(pos + 1, valueType) to get the key/value pairs. Use of UTF8String vs String will be checked at compile time.

I agree that Object[] is worse than InternalRow

Yes, and if we are always using Object because of type erasure or using magic methods to get better performance, the utility of the parameterized interfaces is very limited.

Because we want to expose the magic functions, the use of ScalarFunction2 and similar is extremely limited because it is only for non-codegen. Varargs is by far the more common case. The InternalRow interface is also a very simple way to get started and ensures that Spark can always find the right method after the function is bound to input types.


On Tue, Mar 2, 2021 at 6:35 AM Wenchen Fan <[hidden email]> wrote:
Yes, GenericInternalRow is safe if when type mismatches, with the cost of using Object[], and primitive types need to do boxing. And this is a runtime failure, which is absolutely worse than query-compile-time checks. Also, don't forget my previous point: users need to specify the type and index such as row.getLong(0), which is error-prone.

> But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

I'd say this is a must-have if we go with the individual-parameters approach. The Scala UDF today checks the method signature at compile-time, thanks to the Scala type tag. The Java UDF today doesn't check and is hard to use.

> You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>.

Can you elaborate? We have function binding and we can use different UDFs for different input types. If we do want to reuse one UDF for different types, using "magical methods" solves the problem:
class MyUDF {
  def call(i: Int): Int = ...
  def call(l: Long): Long = ...
}

On the other side, I don't think the row-parameter approach can solve this problem. The best I can think of is:
class MyUDF implement ScalaFunction[Object] {
  def call(row: InternalRow): Object = {
    if (int input) row.getInt(0) ... else row.getLong(0) ...
  }
}

This is worse because: 1) it needs to do if-else to check different input types. 2) the return type can only be Object and cause boxing issues.

I agree that Object[] is worse than InternalRow. But I can't think of real use cases that will force the individual-parameters approach to use Object instead of concrete types.


On Tue, Mar 2, 2021 at 3:36 AM Ryan Blue <[hidden email]> wrote:

Thanks for adding your perspective, Erik!

If the input is string type but the UDF implementation calls row.getLong(0), it returns wrong data

I think this is misleading. It is true for UnsafeRow, but there is no reason why InternalRow should return incorrect values.

The implementation in GenericInternalRow would throw a ClassCastException. I don’t think that using a row is a bad option simply because UnsafeRow is unsafe.

It’s unlikely that UnsafeRow would be used to pass the data. The implementation would evaluate each argument expression and set the result in a generic row, then pass that row to the UDF. We can use whatever implementation we choose to provide better guarantees than unsafe.

I think we should consider query-compile-time checks as nearly-as-good as Java-compile-time checks for the purposes of safety.

I don’t think I agree with this. A failure at query analysis time vs runtime still requires going back to a separate project, fixing something, and rebuilding. The time needed to fix a problem goes up significantly vs. compile-time checks. And that is even worse if the UDF is maintained by someone else.

I think we also need to consider how common it would be that a use case can have the query-compile-time checks. Going through this in more detail below makes me think that it is unlikely that these checks would be used often because of the limitations of using an interface with type erasure.

I believe that Wenchen’s proposal will provide stronger query-compile-time safety

The proposal could have better safety for each argument, assuming that we detect failures by looking at the parameter types using reflection in the analyzer. But we don’t do that for any of the similar UDFs today so I’m skeptical that this would actually be a high enough priority to implement.

As Erik pointed out, type erasure also limits the effectiveness. You can’t implement ScalarFunction2<Integer, Integer> and ScalarFunction2<Long, Long>. You can handle those cases using InternalRow or you can handle them using VarargScalarFunction<Object>. That forces many use cases into varargs with Object, where you don’t get any of the proposed analyzer benefits and lose compile-time checks. The only time the additional checks (if implemented) would help is when only one set of argument types is needed because implementing ScalarFunction<Object, Object> defeats the purpose.

It’s worth noting that safety for the magic methods would be identical between the two options, so the trade-off to consider is for varargs and non-codegen cases. Combining the limitations discussed, this has better safety guarantees only if you need just one set of types for each number of arguments and are using the non-codegen path. Since varargs is one of the primary reasons to use this API, then I don’t think that it is a good idea to use Object[] instead of InternalRow.

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


--
John Zhuge


--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] SPIP: FunctionCatalog

Liang-Chi Hsieh

Yeah, in short this is a great compromise approach and I do like to see this
proposal move forward to next step. This discussion is valuable.


Chao Sun wrote
> +1 on Dongjoon's proposal. Great to see this is getting moved forward and
> thanks everyone for the insightful discussion!
>
>
>
> On Thu, Mar 4, 2021 at 8:58 AM Ryan Blue &lt;

> rblue@

> &gt; wrote:
>
>> Okay, great. I'll update the SPIP doc and call a vote in the next day or
>> two.





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

123