[pyspark] dataframe map_partition

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

[pyspark] dataframe map_partition

peng yu
There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.

I'm wondering why we don't have that in python? 

I'm trying to have a map_partition function with pandas_udf supported 

thanks! 
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

Sean Owen-2
Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already

On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.

I'm wondering why we don't have that in python? 

I'm trying to have a map_partition function with pandas_udf supported 

thanks! 
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

peng yu
I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe. 

```
@pandas_udf(df.schema, PandasUDFType.MAP)
def do_nothing(pandas_df):
    return pandas_df


new_df = df.mapPartition(do_nothing)
```
pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?  

On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already

On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.

I'm wondering why we don't have that in python? 

I'm trying to have a map_partition function with pandas_udf supported 

thanks! 
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

Sean Owen-2
Are you looking for SCALAR? that lets you map one row to one row, but
do it more efficiently in batch. What are you trying to do?

On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:

>
> I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>
> ```
> @pandas_udf(df.schema, PandasUDFType.MAP)
> def do_nothing(pandas_df):
>     return pandas_df
>
>
> new_df = df.mapPartition(do_nothing)
> ```
> pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>
> On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>>
>> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>>>
>>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>>>
>>> I'm wondering why we don't have that in python?
>>>
>>> I'm trying to have a map_partition function with pandas_udf supported
>>>
>>> thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

peng yu
it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row. 

I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108 

Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?

On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
Are you looking for SCALAR? that lets you map one row to one row, but
do it more efficiently in batch. What are you trying to do?

On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>
> I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>
> ```
> @pandas_udf(df.schema, PandasUDFType.MAP)
> def do_nothing(pandas_df):
>     return pandas_df
>
>
> new_df = df.mapPartition(do_nothing)
> ```
> pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>
> On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>>
>> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>>>
>>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>>>
>>> I'm wondering why we don't have that in python?
>>>
>>> I'm trying to have a map_partition function with pandas_udf supported
>>>
>>> thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

Sean Owen-2
Are you just applying a function to every row in the DataFrame? you
don't need pandas at all. Just get the RDD of Row from it and map a
UDF that makes another Row, and go back to DataFrame. Or make a UDF
that operates on all columns and returns a new value. mapPartitions is
also available if you want to transform an iterator of Row to another
iterator of Row.

On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:

>
> it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>
> I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>
> Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>
> On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you looking for SCALAR? that lets you map one row to one row, but
>> do it more efficiently in batch. What are you trying to do?
>>
>> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >
>> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >
>> > ```
>> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> > def do_nothing(pandas_df):
>> >     return pandas_df
>> >
>> >
>> > new_df = df.mapPartition(do_nothing)
>> > ```
>> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >
>> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >>
>> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >>>
>> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >>>
>> >>> I'm wondering why we don't have that in python?
>> >>>
>> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >>>
>> >>> thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

peng yu
pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd. 

But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ? 

On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
Are you just applying a function to every row in the DataFrame? you
don't need pandas at all. Just get the RDD of Row from it and map a
UDF that makes another Row, and go back to DataFrame. Or make a UDF
that operates on all columns and returns a new value. mapPartitions is
also available if you want to transform an iterator of Row to another
iterator of Row.

On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>
> it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>
> I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>
> Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>
> On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you looking for SCALAR? that lets you map one row to one row, but
>> do it more efficiently in batch. What are you trying to do?
>>
>> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >
>> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >
>> > ```
>> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> > def do_nothing(pandas_df):
>> >     return pandas_df
>> >
>> >
>> > new_df = df.mapPartition(do_nothing)
>> > ```
>> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >
>> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >>
>> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >>>
>> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >>>
>> >>> I'm wondering why we don't have that in python?
>> >>>
>> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >>>
>> >>> thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

peng yu
and in this case, i'm actually benefiting from the columns of arrow support, so that i can pass the whole data block to tensorflow to obtain the block of prediction all at once.


On Thu, Mar 7, 2019 at 3:45 PM peng yu <[hidden email]> wrote:
pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd. 

But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ? 

On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
Are you just applying a function to every row in the DataFrame? you
don't need pandas at all. Just get the RDD of Row from it and map a
UDF that makes another Row, and go back to DataFrame. Or make a UDF
that operates on all columns and returns a new value. mapPartitions is
also available if you want to transform an iterator of Row to another
iterator of Row.

On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>
> it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>
> I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>
> Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>
> On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you looking for SCALAR? that lets you map one row to one row, but
>> do it more efficiently in batch. What are you trying to do?
>>
>> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >
>> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >
>> > ```
>> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> > def do_nothing(pandas_df):
>> >     return pandas_df
>> >
>> >
>> > new_df = df.mapPartition(do_nothing)
>> > ```
>> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >
>> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >>
>> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >>>
>> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >>>
>> >>> I'm wondering why we don't have that in python?
>> >>>
>> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >>>
>> >>> thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

Sean Owen-2
In reply to this post by peng yu
Maybe, it depends on what you're doing. It sounds like you are trying
to do row-at-a-time mapping, even on a pandas DataFrame. Is what
you're doing vectorized? may not help much.
Just make the pandas Series into a DataFrame if you want? and a single
col back to Series?

On Thu, Mar 7, 2019 at 2:45 PM peng yu <[hidden email]> wrote:

>
> pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd.
>
> But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ?
>
> On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you just applying a function to every row in the DataFrame? you
>> don't need pandas at all. Just get the RDD of Row from it and map a
>> UDF that makes another Row, and go back to DataFrame. Or make a UDF
>> that operates on all columns and returns a new value. mapPartitions is
>> also available if you want to transform an iterator of Row to another
>> iterator of Row.
>>
>> On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>> >
>> > it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>> >
>> > I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>> >
>> > Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>> >
>> > On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for SCALAR? that lets you map one row to one row, but
>> >> do it more efficiently in batch. What are you trying to do?
>> >>
>> >> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >> >
>> >> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >> >
>> >> > ```
>> >> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> >> > def do_nothing(pandas_df):
>> >> >     return pandas_df
>> >> >
>> >> >
>> >> > new_df = df.mapPartition(do_nothing)
>> >> > ```
>> >> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >> >
>> >> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >> >>
>> >> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >> >>
>> >> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >> >>>
>> >> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >> >>>
>> >> >>> I'm wondering why we don't have that in python?
>> >> >>>
>> >> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >> >>>
>> >> >>> thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

peng yu
right now, i'm using the colums-at-a-time mapping https://github.com/yupbank/tf-spark-serving/blob/master/tss/utils.py#L129 



On Thu, Mar 7, 2019 at 4:00 PM Sean Owen <[hidden email]> wrote:
Maybe, it depends on what you're doing. It sounds like you are trying
to do row-at-a-time mapping, even on a pandas DataFrame. Is what
you're doing vectorized? may not help much.
Just make the pandas Series into a DataFrame if you want? and a single
col back to Series?

On Thu, Mar 7, 2019 at 2:45 PM peng yu <[hidden email]> wrote:
>
> pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd.
>
> But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ?
>
> On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you just applying a function to every row in the DataFrame? you
>> don't need pandas at all. Just get the RDD of Row from it and map a
>> UDF that makes another Row, and go back to DataFrame. Or make a UDF
>> that operates on all columns and returns a new value. mapPartitions is
>> also available if you want to transform an iterator of Row to another
>> iterator of Row.
>>
>> On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>> >
>> > it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>> >
>> > I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>> >
>> > Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>> >
>> > On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for SCALAR? that lets you map one row to one row, but
>> >> do it more efficiently in batch. What are you trying to do?
>> >>
>> >> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >> >
>> >> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >> >
>> >> > ```
>> >> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> >> > def do_nothing(pandas_df):
>> >> >     return pandas_df
>> >> >
>> >> >
>> >> > new_df = df.mapPartition(do_nothing)
>> >> > ```
>> >> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >> >
>> >> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >> >>
>> >> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >> >>
>> >> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >> >>>
>> >> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >> >>>
>> >> >>> I'm wondering why we don't have that in python?
>> >> >>>
>> >> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >> >>>
>> >> >>> thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

Bryan Cutler
Hi Peng,

I just added support for scalar Pandas UDF to return a StructType as a Pandas DataFrame in https://issues.apache.org/jira/browse/SPARK-23836. Is that the functionality you are looking for?

Bryan

On Thu, Mar 7, 2019 at 1:13 PM peng yu <[hidden email]> wrote:
right now, i'm using the colums-at-a-time mapping https://github.com/yupbank/tf-spark-serving/blob/master/tss/utils.py#L129 



On Thu, Mar 7, 2019 at 4:00 PM Sean Owen <[hidden email]> wrote:
Maybe, it depends on what you're doing. It sounds like you are trying
to do row-at-a-time mapping, even on a pandas DataFrame. Is what
you're doing vectorized? may not help much.
Just make the pandas Series into a DataFrame if you want? and a single
col back to Series?

On Thu, Mar 7, 2019 at 2:45 PM peng yu <[hidden email]> wrote:
>
> pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd.
>
> But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ?
>
> On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you just applying a function to every row in the DataFrame? you
>> don't need pandas at all. Just get the RDD of Row from it and map a
>> UDF that makes another Row, and go back to DataFrame. Or make a UDF
>> that operates on all columns and returns a new value. mapPartitions is
>> also available if you want to transform an iterator of Row to another
>> iterator of Row.
>>
>> On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>> >
>> > it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>> >
>> > I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>> >
>> > Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>> >
>> > On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for SCALAR? that lets you map one row to one row, but
>> >> do it more efficiently in batch. What are you trying to do?
>> >>
>> >> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >> >
>> >> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >> >
>> >> > ```
>> >> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> >> > def do_nothing(pandas_df):
>> >> >     return pandas_df
>> >> >
>> >> >
>> >> > new_df = df.mapPartition(do_nothing)
>> >> > ```
>> >> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >> >
>> >> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >> >>
>> >> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >> >>
>> >> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >> >>>
>> >> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >> >>>
>> >> >>> I'm wondering why we don't have that in python?
>> >> >>>
>> >> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >> >>>
>> >> >>> thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

peng yu
Yeah, that seems most likely i have wanted, does the scalar Pandas UDF support input is a StructType too ? 

On Fri, Mar 8, 2019 at 2:25 PM Bryan Cutler <[hidden email]> wrote:
Hi Peng,

I just added support for scalar Pandas UDF to return a StructType as a Pandas DataFrame in https://issues.apache.org/jira/browse/SPARK-23836. Is that the functionality you are looking for?

Bryan

On Thu, Mar 7, 2019 at 1:13 PM peng yu <[hidden email]> wrote:
right now, i'm using the colums-at-a-time mapping https://github.com/yupbank/tf-spark-serving/blob/master/tss/utils.py#L129 



On Thu, Mar 7, 2019 at 4:00 PM Sean Owen <[hidden email]> wrote:
Maybe, it depends on what you're doing. It sounds like you are trying
to do row-at-a-time mapping, even on a pandas DataFrame. Is what
you're doing vectorized? may not help much.
Just make the pandas Series into a DataFrame if you want? and a single
col back to Series?

On Thu, Mar 7, 2019 at 2:45 PM peng yu <[hidden email]> wrote:
>
> pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd.
>
> But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ?
>
> On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you just applying a function to every row in the DataFrame? you
>> don't need pandas at all. Just get the RDD of Row from it and map a
>> UDF that makes another Row, and go back to DataFrame. Or make a UDF
>> that operates on all columns and returns a new value. mapPartitions is
>> also available if you want to transform an iterator of Row to another
>> iterator of Row.
>>
>> On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>> >
>> > it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>> >
>> > I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>> >
>> > Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>> >
>> > On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for SCALAR? that lets you map one row to one row, but
>> >> do it more efficiently in batch. What are you trying to do?
>> >>
>> >> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >> >
>> >> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >> >
>> >> > ```
>> >> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> >> > def do_nothing(pandas_df):
>> >> >     return pandas_df
>> >> >
>> >> >
>> >> > new_df = df.mapPartition(do_nothing)
>> >> > ```
>> >> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >> >
>> >> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >> >>
>> >> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >> >>
>> >> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >> >>>
>> >> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >> >>>
>> >> >>> I'm wondering why we don't have that in python?
>> >> >>>
>> >> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >> >>>
>> >> >>> thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

Li Jin
Hi,

Pandas UDF supports input as struct type. However, note that it will be turned into python dict because pandas itself does not have native struct type.
On Fri, Mar 8, 2019 at 2:55 PM peng yu <[hidden email]> wrote:
Yeah, that seems most likely i have wanted, does the scalar Pandas UDF support input is a StructType too ? 

On Fri, Mar 8, 2019 at 2:25 PM Bryan Cutler <[hidden email]> wrote:
Hi Peng,

I just added support for scalar Pandas UDF to return a StructType as a Pandas DataFrame in https://issues.apache.org/jira/browse/SPARK-23836. Is that the functionality you are looking for?

Bryan

On Thu, Mar 7, 2019 at 1:13 PM peng yu <[hidden email]> wrote:
right now, i'm using the colums-at-a-time mapping https://github.com/yupbank/tf-spark-serving/blob/master/tss/utils.py#L129 



On Thu, Mar 7, 2019 at 4:00 PM Sean Owen <[hidden email]> wrote:
Maybe, it depends on what you're doing. It sounds like you are trying
to do row-at-a-time mapping, even on a pandas DataFrame. Is what
you're doing vectorized? may not help much.
Just make the pandas Series into a DataFrame if you want? and a single
col back to Series?

On Thu, Mar 7, 2019 at 2:45 PM peng yu <[hidden email]> wrote:
>
> pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd.
>
> But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ?
>
> On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you just applying a function to every row in the DataFrame? you
>> don't need pandas at all. Just get the RDD of Row from it and map a
>> UDF that makes another Row, and go back to DataFrame. Or make a UDF
>> that operates on all columns and returns a new value. mapPartitions is
>> also available if you want to transform an iterator of Row to another
>> iterator of Row.
>>
>> On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>> >
>> > it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>> >
>> > I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>> >
>> > Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>> >
>> > On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for SCALAR? that lets you map one row to one row, but
>> >> do it more efficiently in batch. What are you trying to do?
>> >>
>> >> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >> >
>> >> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >> >
>> >> > ```
>> >> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> >> > def do_nothing(pandas_df):
>> >> >     return pandas_df
>> >> >
>> >> >
>> >> > new_df = df.mapPartition(do_nothing)
>> >> > ```
>> >> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >> >
>> >> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >> >>
>> >> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >> >>
>> >> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >> >>>
>> >> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >> >>>
>> >> >>> I'm wondering why we don't have that in python?
>> >> >>>
>> >> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >> >>>
>> >> >>> thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

peng yu
Cool, thanks for letting me know, but why not support dapply http://spark.apache.org/docs/2.0.0/api/R/dapply.html as supported in R, so we can just pass in a pandas dataframe 

On Fri, Mar 8, 2019 at 6:09 PM Li Jin <[hidden email]> wrote:
Hi,

Pandas UDF supports input as struct type. However, note that it will be turned into python dict because pandas itself does not have native struct type.
On Fri, Mar 8, 2019 at 2:55 PM peng yu <[hidden email]> wrote:
Yeah, that seems most likely i have wanted, does the scalar Pandas UDF support input is a StructType too ? 

On Fri, Mar 8, 2019 at 2:25 PM Bryan Cutler <[hidden email]> wrote:
Hi Peng,

I just added support for scalar Pandas UDF to return a StructType as a Pandas DataFrame in https://issues.apache.org/jira/browse/SPARK-23836. Is that the functionality you are looking for?

Bryan

On Thu, Mar 7, 2019 at 1:13 PM peng yu <[hidden email]> wrote:
right now, i'm using the colums-at-a-time mapping https://github.com/yupbank/tf-spark-serving/blob/master/tss/utils.py#L129 



On Thu, Mar 7, 2019 at 4:00 PM Sean Owen <[hidden email]> wrote:
Maybe, it depends on what you're doing. It sounds like you are trying
to do row-at-a-time mapping, even on a pandas DataFrame. Is what
you're doing vectorized? may not help much.
Just make the pandas Series into a DataFrame if you want? and a single
col back to Series?

On Thu, Mar 7, 2019 at 2:45 PM peng yu <[hidden email]> wrote:
>
> pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd.
>
> But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ?
>
> On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you just applying a function to every row in the DataFrame? you
>> don't need pandas at all. Just get the RDD of Row from it and map a
>> UDF that makes another Row, and go back to DataFrame. Or make a UDF
>> that operates on all columns and returns a new value. mapPartitions is
>> also available if you want to transform an iterator of Row to another
>> iterator of Row.
>>
>> On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>> >
>> > it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>> >
>> > I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>> >
>> > Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>> >
>> > On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for SCALAR? that lets you map one row to one row, but
>> >> do it more efficiently in batch. What are you trying to do?
>> >>
>> >> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >> >
>> >> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >> >
>> >> > ```
>> >> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> >> > def do_nothing(pandas_df):
>> >> >     return pandas_df
>> >> >
>> >> >
>> >> > new_df = df.mapPartition(do_nothing)
>> >> > ```
>> >> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >> >
>> >> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >> >>
>> >> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >> >>
>> >> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >> >>>
>> >> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >> >>>
>> >> >>> I'm wondering why we don't have that in python?
>> >> >>>
>> >> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >> >>>
>> >> >>> thanks!
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark] dataframe map_partition

Hyukjin Kwon
Because both dapply in R and Scalar Pandas UDF in Python are similar, and cover each other. FWIW, it somewhat sounds like SPARK-26413 and SPARK-26412


2019년 3월 9일 (토) 오후 12:32, peng yu <[hidden email]>님이 작성:
Cool, thanks for letting me know, but why not support dapply http://spark.apache.org/docs/2.0.0/api/R/dapply.html as supported in R, so we can just pass in a pandas dataframe 

On Fri, Mar 8, 2019 at 6:09 PM Li Jin <[hidden email]> wrote:
Hi,

Pandas UDF supports input as struct type. However, note that it will be turned into python dict because pandas itself does not have native struct type.
On Fri, Mar 8, 2019 at 2:55 PM peng yu <[hidden email]> wrote:
Yeah, that seems most likely i have wanted, does the scalar Pandas UDF support input is a StructType too ? 

On Fri, Mar 8, 2019 at 2:25 PM Bryan Cutler <[hidden email]> wrote:
Hi Peng,

I just added support for scalar Pandas UDF to return a StructType as a Pandas DataFrame in https://issues.apache.org/jira/browse/SPARK-23836. Is that the functionality you are looking for?

Bryan

On Thu, Mar 7, 2019 at 1:13 PM peng yu <[hidden email]> wrote:
right now, i'm using the colums-at-a-time mapping https://github.com/yupbank/tf-spark-serving/blob/master/tss/utils.py#L129 



On Thu, Mar 7, 2019 at 4:00 PM Sean Owen <[hidden email]> wrote:
Maybe, it depends on what you're doing. It sounds like you are trying
to do row-at-a-time mapping, even on a pandas DataFrame. Is what
you're doing vectorized? may not help much.
Just make the pandas Series into a DataFrame if you want? and a single
col back to Series?

On Thu, Mar 7, 2019 at 2:45 PM peng yu <[hidden email]> wrote:
>
> pandas/arrow is for the memory efficiency, and mapPartitions is only available to rdds, for sure i can do everything in rdd.
>
> But i thought that's the whole point of having pandas_udf, so my program run faster and consumes less memory ?
>
> On Thu, Mar 7, 2019 at 3:40 PM Sean Owen <[hidden email]> wrote:
>>
>> Are you just applying a function to every row in the DataFrame? you
>> don't need pandas at all. Just get the RDD of Row from it and map a
>> UDF that makes another Row, and go back to DataFrame. Or make a UDF
>> that operates on all columns and returns a new value. mapPartitions is
>> also available if you want to transform an iterator of Row to another
>> iterator of Row.
>>
>> On Thu, Mar 7, 2019 at 2:33 PM peng yu <[hidden email]> wrote:
>> >
>> > it is very similar to SCALAR, but for SCALAR the output can't be struct/row and the input has to be pd.Series, which doesn't support a row.
>> >
>> > I'm doing tensorflow batch inference in spark, https://github.com/yupbank/tf-spark-serving/blob/master/tss/serving.py#L108
>> >
>> > Which i have to do the groupBy in order to use the apply function, i'm wondering why not just enable apply to df ?
>> >
>> > On Thu, Mar 7, 2019 at 3:15 PM Sean Owen <[hidden email]> wrote:
>> >>
>> >> Are you looking for SCALAR? that lets you map one row to one row, but
>> >> do it more efficiently in batch. What are you trying to do?
>> >>
>> >> On Thu, Mar 7, 2019 at 2:03 PM peng yu <[hidden email]> wrote:
>> >> >
>> >> > I'm looking for a mapPartition(pandas_udf) for  a pyspark.Dataframe.
>> >> >
>> >> > ```
>> >> > @pandas_udf(df.schema, PandasUDFType.MAP)
>> >> > def do_nothing(pandas_df):
>> >> >     return pandas_df
>> >> >
>> >> >
>> >> > new_df = df.mapPartition(do_nothing)
>> >> > ```
>> >> > pandas_udf only support scala or GROUPED_MAP.  Why not support just Map?
>> >> >
>> >> > On Thu, Mar 7, 2019 at 2:57 PM Sean Owen <[hidden email]> wrote:
>> >> >>
>> >> >> Are you looking for @pandas_udf in Python? Or just mapPartition? Those exist already
>> >> >>
>> >> >> On Thu, Mar 7, 2019, 1:43 PM peng yu <[hidden email]> wrote:
>> >> >>>
>> >> >>> There is a nice map_partition function in R `dapply`.  so that user can pass a row to udf.
>> >> >>>
>> >> >>> I'm wondering why we don't have that in python?
>> >> >>>
>> >> >>> I'm trying to have a map_partition function with pandas_udf supported
>> >> >>>
>> >> >>> thanks!