UDAFs have an inefficiency problem

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

UDAFs have an inefficiency problem

Erik Erlandson-2
I describe some of the details here:

The short version of the story is that aggregating data structures (UDTs) used by UDAFs are serialized to a Row object, and de-serialized, for every row in a data frame.
Cheers,
Erik

Reply | Threaded
Open this post in threaded view
|

Re: UDAFs have an inefficiency problem

rxin
Yes this is known and an issue for performance. Do you have any thoughts on how to fix this?

On Wed, Mar 27, 2019 at 4:19 PM Erik Erlandson <[hidden email]> wrote:
I describe some of the details here:

The short version of the story is that aggregating data structures (UDTs) used by UDAFs are serialized to a Row object, and de-serialized, for every row in a data frame.
Cheers,
Erik

Reply | Threaded
Open this post in threaded view
|

Re: UDAFs have an inefficiency problem

Erik Erlandson-2

At a high level, some candidate strategies are:
1. "fix" the logic in ScalaUDAF (possibly in conjunction with mods to UDAF trait itself) so that the update method can do the right thing.
2. Expose TypedImperativeAggregate to users for defining their own, since it already does the right thing.
3. As a workaround, allow users to define their own sub-classes of DataType.  It would essentially allow one to define the sqlType of the UDT to be the aggregating object itself and make ser/de a no-op.  I tried doing this and it will compile, but spark's internals only consider a predefined universe of DataType classes.

All of these options are likely to have implications for the catalyst systems. I'm not sure if they are minor more substantial.

On Wed, Mar 27, 2019 at 4:20 PM Reynold Xin <[hidden email]> wrote:
Yes this is known and an issue for performance. Do you have any thoughts on how to fix this?

On Wed, Mar 27, 2019 at 4:19 PM Erik Erlandson <[hidden email]> wrote:
I describe some of the details here:

The short version of the story is that aggregating data structures (UDTs) used by UDAFs are serialized to a Row object, and de-serialized, for every row in a data frame.
Cheers,
Erik

Reply | Threaded
Open this post in threaded view
|

Re: UDAFs have an inefficiency problem

rxin
They are unfortunately all pretty substantial (which is why this problem exists) ...


On Wed, Mar 27, 2019 at 4:36 PM, Erik Erlandson <[hidden email]> wrote:
At a high level, some candidate strategies are:
1. "fix" the logic in ScalaUDAF (possibly in conjunction with mods to UDAF trait itself) so that the update method can do the right thing.
2. Expose TypedImperativeAggregate to users for defining their own, since it already does the right thing.
3. As a workaround, allow users to define their own sub-classes of DataType.  It would essentially allow one to define the sqlType of the UDT to be the aggregating object itself and make ser/de a no-op.  I tried doing this and it will compile, but spark's internals only consider a predefined universe of DataType classes.

All of these options are likely to have implications for the catalyst systems. I'm not sure if they are minor more substantial.

On Wed, Mar 27, 2019 at 4:20 PM Reynold Xin <[hidden email]> wrote:
Yes this is known and an issue for performance. Do you have any thoughts on how to fix this?

On Wed, Mar 27, 2019 at 4:19 PM Erik Erlandson <[hidden email]> wrote:
I describe some of the details here:

The short version of the story is that aggregating data structures (UDTs) used by UDAFs are serialized to a Row object, and de-serialized, for every row in a data frame.
Cheers,
Erik

Reply | Threaded
Open this post in threaded view
|

Re: UDAFs have an inefficiency problem

Erik Erlandson-2
In reply to this post by Erik Erlandson-2

BTW, if this is known, is there an existing JIRA I should link to?

On Wed, Mar 27, 2019 at 4:36 PM Erik Erlandson <[hidden email]> wrote:

At a high level, some candidate strategies are:
1. "fix" the logic in ScalaUDAF (possibly in conjunction with mods to UDAF trait itself) so that the update method can do the right thing.
2. Expose TypedImperativeAggregate to users for defining their own, since it already does the right thing.
3. As a workaround, allow users to define their own sub-classes of DataType.  It would essentially allow one to define the sqlType of the UDT to be the aggregating object itself and make ser/de a no-op.  I tried doing this and it will compile, but spark's internals only consider a predefined universe of DataType classes.

All of these options are likely to have implications for the catalyst systems. I'm not sure if they are minor more substantial.

On Wed, Mar 27, 2019 at 4:20 PM Reynold Xin <[hidden email]> wrote:
Yes this is known and an issue for performance. Do you have any thoughts on how to fix this?

On Wed, Mar 27, 2019 at 4:19 PM Erik Erlandson <[hidden email]> wrote:
I describe some of the details here:

The short version of the story is that aggregating data structures (UDTs) used by UDAFs are serialized to a Row object, and de-serialized, for every row in a data frame.
Cheers,
Erik

Reply | Threaded
Open this post in threaded view
|

Re: UDAFs have an inefficiency problem

rxin
Not that I know of. We did do some work to make it work faster in the case of lower cardinality: https://issues.apache.org/jira/browse/SPARK-17949


On Wed, Mar 27, 2019 at 4:40 PM, Erik Erlandson <[hidden email]> wrote:
BTW, if this is known, is there an existing JIRA I should link to?

On Wed, Mar 27, 2019 at 4:36 PM Erik Erlandson <[hidden email]> wrote:

At a high level, some candidate strategies are:
1. "fix" the logic in ScalaUDAF (possibly in conjunction with mods to UDAF trait itself) so that the update method can do the right thing.
2. Expose TypedImperativeAggregate to users for defining their own, since it already does the right thing.
3. As a workaround, allow users to define their own sub-classes of DataType.  It would essentially allow one to define the sqlType of the UDT to be the aggregating object itself and make ser/de a no-op.  I tried doing this and it will compile, but spark's internals only consider a predefined universe of DataType classes.

All of these options are likely to have implications for the catalyst systems. I'm not sure if they are minor more substantial.

On Wed, Mar 27, 2019 at 4:20 PM Reynold Xin <[hidden email]> wrote:
Yes this is known and an issue for performance. Do you have any thoughts on how to fix this?

On Wed, Mar 27, 2019 at 4:19 PM Erik Erlandson <[hidden email]> wrote:
I describe some of the details here:

The short version of the story is that aggregating data structures (UDTs) used by UDAFs are serialized to a Row object, and de-serialized, for every row in a data frame.
Cheers,
Erik

Reply | Threaded
Open this post in threaded view
|

Re: UDAFs have an inefficiency problem

Erik Erlandson-2
In reply to this post by Erik Erlandson-2

On Wed, Mar 27, 2019 at 4:19 PM Erik Erlandson <[hidden email]> wrote:
I describe some of the details here:

The short version of the story is that aggregating data structures (UDTs) used by UDAFs are serialized to a Row object, and de-serialized, for every row in a data frame.
Cheers,
Erik

Reply | Threaded
Open this post in threaded view
|

Re: UDAFs have an inefficiency problem

Erik Erlandson-2
In reply to this post by Erik Erlandson-2

On the PR review, there were questions about adding a new aggregating class, and whether or not Aggregator[IN,BUF,OUT] could be used.  I added a proof of concept solution based on enhancing Aggregator to the pull-req:

I wrote up my findings on the PR but the gist is that Aggregator is a feasible option, however it does not provide *total* feature parity with UDAF.  Note that this PR now includes two candidate solutions, for comparison purposes, as well as an extra test file (tdigest.scala). Eventually one of these solutions will be removed, depending on what option is selected.

I'm pushing this forward now with the goal of getting a solution into the upcoming 3.0 branch cut

On Wed, Mar 27, 2019 at 4:19 PM Erik Erlandson <[hidden email]> wrote:
I describe some of the details here:

The short version of the story is that aggregating data structures (UDTs) used by UDAFs are serialized to a Row object, and de-serialized, for every row in a data frame.
Cheers,
Erik