Fwd: A question about rdd transformation

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Fwd: A question about rdd transformation

Lionel Luffy
add dev list. Who can help on below question?

Thanks & Best Regards,
LL

---------- Forwarded message ----------
From: Lionel Luffy <[hidden email]>
Date: Fri, Jun 23, 2017 at 11:20 AM
Subject: Re: A question about rdd transformation
To: [hidden email]


Now I found the root cause is a Wrapper class in AnyRef is not Serializable, but even though I changed it to implements Serializable. the 'rows' still cannot get data... Any suggestion?

On Fri, Jun 23, 2017 at 10:56 AM, Lionel Luffy <[hidden email]> wrote:
Hi there,
I'm trying to do below action while it always return java.io.NotSerializableException in the shuffle task.
I've checked that Array is serializable. how can I get the data of rdd in newRDD?

step 1: val rdd: RDD[(AnyRef, Array[AnyRef]] {......}

step2 :       rdd
                 .partitionBy(partitioner)
                 .map(_._2)

step3:  pass rdd to newRDD as prev:
newRDD[K, V] (
xxx,
xxx,
xxx,
prev: RDD[Array[AnyRef]] extends RDD[(K, V)] (prev) {

override protected def getPartitions() {...}

override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] {...
      val rows = firstParent[Array[AnyRef]].iterator(split, context)

   }

}


Thanks,
LL


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: A question about rdd transformation

cloud0fan
The exception message should include the lineage of the un-serializable object, can you post that too?

On 23 Jun 2017, at 11:23 AM, Lionel Luffy <[hidden email]> wrote:

add dev list. Who can help on below question?

Thanks & Best Regards,
LL

---------- Forwarded message ----------
From: Lionel Luffy <[hidden email]>
Date: Fri, Jun 23, 2017 at 11:20 AM
Subject: Re: A question about rdd transformation
To: [hidden email]


Now I found the root cause is a Wrapper class in AnyRef is not Serializable, but even though I changed it to implements Serializable. the 'rows' still cannot get data... Any suggestion?

On Fri, Jun 23, 2017 at 10:56 AM, Lionel Luffy <[hidden email]> wrote:
Hi there,
I'm trying to do below action while it always return java.io.NotSerializableException in the shuffle task.
I've checked that Array is serializable. how can I get the data of rdd in newRDD?

step 1: val rdd: RDD[(AnyRef, Array[AnyRef]] {......}

step2 :       rdd
                 .partitionBy(partitioner)
                 .map(_._2)

step3:  pass rdd to newRDD as prev:
newRDD[K, V] (
xxx,
xxx,
xxx,
prev: RDD[Array[AnyRef]] extends RDD[(K, V)] (prev) {

override protected def getPartitions() {...}

override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] {...
      val rows = firstParent[Array[AnyRef]].iterator(split, context)

   }

}


Thanks,
LL



--------------------------------------------------------------------- To unsubscribe e-mail: [hidden email]
Loading...