[DISCUSS] PySpark Window UDF

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] PySpark Window UDF

Li Jin
Hi All,

I have been looking into leverage the Arrow and Pandas UDF work we have done so far for Window UDF in PySpark. I have done some investigation and believe there is a way to do PySpark window UDF efficiently. 

The basic idea is instead of passing each window to Python separately, we can pass a "batch of windows" as an Arrow Batch of rows + begin/end indices for each window (indices are computed on the Java side), and then rolling over the begin/end indices in Python and applies the UDF.

I have written my investigation in more details here:

I think this is a pretty promising and hope to get some feedback from the community about this approach. Let's discuss! :)

Li
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] PySpark Window UDF

Li Jin
Hello again!

I recently implemented a proof-of-concept implementation of proposal above. I think the results are pretty exciting so I want to share my findings with the community. I have implemented two variants of the pandas window UDF - one that takes pandas.Series as input and one that takes numpy array as input. I benchmarked with rolling mean on 1M doubles and here are some results:

Spark SQL window function: 20s
Pandas variant: ~60s
Numpy variant: 10s
Numpy variant with numba: 4s

You can see the benchmark code here:

I think the results are quite exciting because:
(1) numpy variant even outperforms the Spark SQL window function
(2) numpy variant with numba has the best performance as well as the flexibility to allow users to write window functions in pure python

The Pandas variant is not bad either (1.5x faster than existing UDF with collect_list) but the numpy variant definitely has much better performance.
 
So far all Pandas UDFs interacts with Pandas data structure rather than numpy data structure, but the window UDF result might be a good reason to open up numpy variants of Pandas UDFs. What do people think? I'd love to hear community's feedbacks.


Links:
You can reproduce benchmark with numpy variant by using the branch:

PR link:

On Wed, May 16, 2018 at 3:34 PM Li Jin <[hidden email]> wrote:
Hi All,

I have been looking into leverage the Arrow and Pandas UDF work we have done so far for Window UDF in PySpark. I have done some investigation and believe there is a way to do PySpark window UDF efficiently. 

The basic idea is instead of passing each window to Python separately, we can pass a "batch of windows" as an Arrow Batch of rows + begin/end indices for each window (indices are computed on the Java side), and then rolling over the begin/end indices in Python and applies the UDF.

I have written my investigation in more details here:

I think this is a pretty promising and hope to get some feedback from the community about this approach. Let's discuss! :)

Li
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] PySpark Window UDF

Wes McKinney-2
hi Li,

These results are very cool. I'm excited to see you continuing to push
this effort forward.

- Wes
On Wed, Sep 5, 2018 at 5:52 PM Li Jin <[hidden email]> wrote:

>
> Hello again!
>
> I recently implemented a proof-of-concept implementation of proposal above. I think the results are pretty exciting so I want to share my findings with the community. I have implemented two variants of the pandas window UDF - one that takes pandas.Series as input and one that takes numpy array as input. I benchmarked with rolling mean on 1M doubles and here are some results:
>
> Spark SQL window function: 20s
> Pandas variant: ~60s
> Numpy variant: 10s
> Numpy variant with numba: 4s
>
> You can see the benchmark code here:
> https://gist.github.com/icexelloss/845beb3d0d6bfc3d51b3c7419edf0dcb
>
> I think the results are quite exciting because:
> (1) numpy variant even outperforms the Spark SQL window function
> (2) numpy variant with numba has the best performance as well as the flexibility to allow users to write window functions in pure python
>
> The Pandas variant is not bad either (1.5x faster than existing UDF with collect_list) but the numpy variant definitely has much better performance.
>
> So far all Pandas UDFs interacts with Pandas data structure rather than numpy data structure, but the window UDF result might be a good reason to open up numpy variants of Pandas UDFs. What do people think? I'd love to hear community's feedbacks.
>
>
> Links:
> You can reproduce benchmark with numpy variant by using the branch:
> https://github.com/icexelloss/spark/tree/window-udf-numpy
>
> PR link:
> https://github.com/apache/spark/pull/22305
>
> On Wed, May 16, 2018 at 3:34 PM Li Jin <[hidden email]> wrote:
>>
>> Hi All,
>>
>> I have been looking into leverage the Arrow and Pandas UDF work we have done so far for Window UDF in PySpark. I have done some investigation and believe there is a way to do PySpark window UDF efficiently.
>>
>> The basic idea is instead of passing each window to Python separately, we can pass a "batch of windows" as an Arrow Batch of rows + begin/end indices for each window (indices are computed on the Java side), and then rolling over the begin/end indices in Python and applies the UDF.
>>
>> I have written my investigation in more details here:
>> https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#
>>
>> I think this is a pretty promising and hope to get some feedback from the community about this approach. Let's discuss! :)
>>
>> Li

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] PySpark Window UDF

Felix Cheung
Definitely!
numba numbers are amazing 
 

From: Wes McKinney <[hidden email]>
Sent: Saturday, September 8, 2018 7:46 AM
To: Li Jin
Cc: [hidden email]
Subject: Re: [DISCUSS] PySpark Window UDF
 
hi Li,

These results are very cool. I'm excited to see you continuing to push
this effort forward.

- Wes
On Wed, Sep 5, 2018 at 5:52 PM Li Jin <[hidden email]> wrote:
>
> Hello again!
>
> I recently implemented a proof-of-concept implementation of proposal above. I think the results are pretty exciting so I want to share my findings with the community. I have implemented two variants of the pandas window UDF - one that takes pandas.Series as input and one that takes numpy array as input. I benchmarked with rolling mean on 1M doubles and here are some results:
>
> Spark SQL window function: 20s
> Pandas variant: ~60s
> Numpy variant: 10s
> Numpy variant with numba: 4s
>
> You can see the benchmark code here:
> https://gist.github.com/icexelloss/845beb3d0d6bfc3d51b3c7419edf0dcb
>
> I think the results are quite exciting because:
> (1) numpy variant even outperforms the Spark SQL window function
> (2) numpy variant with numba has the best performance as well as the flexibility to allow users to write window functions in pure python
>
> The Pandas variant is not bad either (1.5x faster than existing UDF with collect_list) but the numpy variant definitely has much better performance.
>
> So far all Pandas UDFs interacts with Pandas data structure rather than numpy data structure, but the window UDF result might be a good reason to open up numpy variants of Pandas UDFs. What do people think? I'd love to hear community's feedbacks.
>
>
> Links:
> You can reproduce benchmark with numpy variant by using the branch:
> https://github.com/icexelloss/spark/tree/window-udf-numpy
>
> PR link:
> https://github.com/apache/spark/pull/22305
>
> On Wed, May 16, 2018 at 3:34 PM Li Jin <[hidden email]> wrote:
>>
>> Hi All,
>>
>> I have been looking into leverage the Arrow and Pandas UDF work we have done so far for Window UDF in PySpark. I have done some investigation and believe there is a way to do PySpark window UDF efficiently.
>>
>> The basic idea is instead of passing each window to Python separately, we can pass a "batch of windows" as an Arrow Batch of rows + begin/end indices for each window (indices are computed on the Java side), and then rolling over the begin/end indices in Python and applies the UDF.
>>
>> I have written my investigation in more details here:
>> https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#
>>
>> I think this is a pretty promising and hope to get some feedback from the community about this approach. Let's discuss! :)
>>
>> Li

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] PySpark Window UDF

Li Jin
Thanks Wes and Felix!

I have finished the initial development work and the PR is in a good state for review (have pinged a couple of people to review this too). I am excited to work with the community to push this work forward.

Li

On Thu, Sep 20, 2018 at 6:20 AM Felix Cheung <[hidden email]> wrote:
Definitely!
numba numbers are amazing 
 

From: Wes McKinney <[hidden email]>
Sent: Saturday, September 8, 2018 7:46 AM
To: Li Jin
Cc: [hidden email]
Subject: Re: [DISCUSS] PySpark Window UDF
 
hi Li,

These results are very cool. I'm excited to see you continuing to push
this effort forward.

- Wes
On Wed, Sep 5, 2018 at 5:52 PM Li Jin <[hidden email]> wrote:
>
> Hello again!
>
> I recently implemented a proof-of-concept implementation of proposal above. I think the results are pretty exciting so I want to share my findings with the community. I have implemented two variants of the pandas window UDF - one that takes pandas.Series as input and one that takes numpy array as input. I benchmarked with rolling mean on 1M doubles and here are some results:
>
> Spark SQL window function: 20s
> Pandas variant: ~60s
> Numpy variant: 10s
> Numpy variant with numba: 4s
>
> You can see the benchmark code here:
> https://gist.github.com/icexelloss/845beb3d0d6bfc3d51b3c7419edf0dcb
>
> I think the results are quite exciting because:
> (1) numpy variant even outperforms the Spark SQL window function
> (2) numpy variant with numba has the best performance as well as the flexibility to allow users to write window functions in pure python
>
> The Pandas variant is not bad either (1.5x faster than existing UDF with collect_list) but the numpy variant definitely has much better performance.
>
> So far all Pandas UDFs interacts with Pandas data structure rather than numpy data structure, but the window UDF result might be a good reason to open up numpy variants of Pandas UDFs. What do people think? I'd love to hear community's feedbacks.
>
>
> Links:
> You can reproduce benchmark with numpy variant by using the branch:
> https://github.com/icexelloss/spark/tree/window-udf-numpy
>
> PR link:
> https://github.com/apache/spark/pull/22305
>
> On Wed, May 16, 2018 at 3:34 PM Li Jin <[hidden email]> wrote:
>>
>> Hi All,
>>
>> I have been looking into leverage the Arrow and Pandas UDF work we have done so far for Window UDF in PySpark. I have done some investigation and believe there is a way to do PySpark window UDF efficiently.
>>
>> The basic idea is instead of passing each window to Python separately, we can pass a "batch of windows" as an Arrow Batch of rows + begin/end indices for each window (indices are computed on the Java side), and then rolling over the begin/end indices in Python and applies the UDF.
>>
>> I have written my investigation in more details here:
>> https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#
>>
>> I think this is a pretty promising and hope to get some feedback from the community about this approach. Let's discuss! :)
>>
>> Li

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]