Need to order iterator values in spark dataframe

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Need to order iterator values in spark dataframe

Ranjan, Abhinav

Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic on every of the value I got by grouping. So I did this:

lets suppose it is in a dataframe df.

case class key_class(key: string, code: string, code_value: string)


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
      // do some custom logic on row
      ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying groupByKey are not sorted/ordered. I want the values to be sorted by the column 'code'.

There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM if the iterartor is too big.

2. I think some how to apply the secondary sort, but problem with that approach is I have to keep track of the key change.

3. sortWithinPartitions cannot be applied because groupBy will mess up the order.

4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map function, and sometimes this also overflows if the keys are skewed.

   

So is there any way in which I can get the values sorted after grouping them by a key.??


Thanks,

Abhinav

Reply | Threaded
Open this post in threaded view
|

Re: Need to order iterator values in spark dataframe

Enrico Minack

Abhinav,

you can repartition by your key, then sortWithinPartition, and the groupByKey. Since data are already hash-partitioned by key, Spark should not shuffle the data hence change the sort wihtin each partition:

    ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")

Enrico


Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:

Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic on every of the value I got by grouping. So I did this:

lets suppose it is in a dataframe df.

case class key_class(key: string, code: string, code_value: string)


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
      // do some custom logic on row
      ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying groupByKey are not sorted/ordered. I want the values to be sorted by the column 'code'.

There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM if the iterartor is too big.

2. I think some how to apply the secondary sort, but problem with that approach is I have to keep track of the key change.

3. sortWithinPartitions cannot be applied because groupBy will mess up the order.

4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map function, and sometimes this also overflows if the keys are skewed.

   

So is there any way in which I can get the values sorted after grouping them by a key.??


Thanks,

Abhinav


Reply | Threaded
Open this post in threaded view
|

Re: Need to order iterator values in spark dataframe

Zahid Rahman

I believe I logged an issue first and I should get a response first.
I was ignored.

Regards

Did you know there are 8 million people in kashmir locked up in their homes by the Hindutwa (Indians)
for 8 months.
Now the whole planet is locked up in their homes.
You didn't take notice of them either.
you ignored them.

¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}


On Thu, 26 Mar 2020 at 17:24, Enrico Minack <[hidden email]> wrote:

Abhinav,

you can repartition by your key, then sortWithinPartition, and the groupByKey. Since data are already hash-partitioned by key, Spark should not shuffle the data hence change the sort wihtin each partition:

    ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")

Enrico


Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:

Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic on every of the value I got by grouping. So I did this:

lets suppose it is in a dataframe df.

case class key_class(key: string, code: string, code_value: string)


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
      // do some custom logic on row
      ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying groupByKey are not sorted/ordered. I want the values to be sorted by the column 'code'.

There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM if the iterartor is too big.

2. I think some how to apply the secondary sort, but problem with that approach is I have to keep track of the key change.

3. sortWithinPartitions cannot be applied because groupBy will mess up the order.

4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map function, and sometimes this also overflows if the keys are skewed.

   

So is there any way in which I can get the values sorted after grouping them by a key.??


Thanks,

Abhinav


Reply | Threaded
Open this post in threaded view
|

Re: Need to order iterator values in spark dataframe

Ranjan, Abhinav
In reply to this post by Enrico Minack

Enrico,

The below solution works but there is a little glitch.

It is working fine in spark-shell but failing for skewed keys while doing a spark-submit.

while looking into the execution plan, the partitioning value is same for both repartition and groupByKey and is driven by the value "spark.sql.shuffle.partitions"

like: Exchange hashpartitioning(value#143, 200)

Any ideas on why is skewed keys giving wrong output while the same code giving correct in spark-shell?


--Abhinav

On 26/03/20 10:54 pm, Enrico Minack wrote:

Abhinav,

you can repartition by your key, then sortWithinPartition, and the groupByKey. Since data are already hash-partitioned by key, Spark should not shuffle the data hence change the sort wihtin each partition:

    ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")

Enrico


Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:

Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic on every of the value I got by grouping. So I did this:

lets suppose it is in a dataframe df.

case class key_class(key: string, code: string, code_value: string)


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
      // do some custom logic on row
      ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying groupByKey are not sorted/ordered. I want the values to be sorted by the column 'code'.

There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM if the iterartor is too big.

2. I think some how to apply the secondary sort, but problem with that approach is I have to keep track of the key change.

3. sortWithinPartitions cannot be applied because groupBy will mess up the order.

4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map function, and sometimes this also overflows if the keys are skewed.

   

So is there any way in which I can get the values sorted after grouping them by a key.??


Thanks,

Abhinav