How does MapWithStateRDD distribute the data

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

How does MapWithStateRDD distribute the data

Soumitra Johri
Hi,

I am running a steaming job with 4 executors and 16 cores so that each executor has two cores to work with. The input Kafka topic has 4 partitions.
With this given configuration I was expecting MapWithStateRDD to be evenly distributed across all executors, how ever I see that it uses only two executors on which MapWithStateRDD data is distributed. Sometimes the data goes only to one executor.

How can this be explained and pretty sure there would be some math to understand this behavior.

I am using the standard standalone 1.6.2 cluster.

Thanks
Soumitra
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: How does MapWithStateRDD distribute the data

Cody Koeninger-2
Are you using KafkaUtils.createDirectStream?

On Wed, Aug 3, 2016 at 9:42 AM, Soumitra Johri
<[hidden email]> wrote:

> Hi,
>
> I am running a steaming job with 4 executors and 16 cores so that each
> executor has two cores to work with. The input Kafka topic has 4 partitions.
> With this given configuration I was expecting MapWithStateRDD to be evenly
> distributed across all executors, how ever I see that it uses only two
> executors on which MapWithStateRDD data is distributed. Sometimes the data
> goes only to one executor.
>
> How can this be explained and pretty sure there would be some math to
> understand this behavior.
>
> I am using the standard standalone 1.6.2 cluster.
>
> Thanks
> Soumitra

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: How does MapWithStateRDD distribute the data

coolcoolkid
Hello,

I have encountered some situation just like what is described above. I am running a Spark Streaming Application with 2 executors, 16 cores and 10G memory for each executor and the input topic Kafka has 64 partitions.

My code are like this:
--------------------------------------------
KafkaUtils.createDirectStream(...)
...
.map(s => (k, v))
.mapWithState(...numPartitions(32))
...
.foreachRdd(_.foreachPartition(output))
--------------------------------------------

I was also expecting the 32 partitions of the MapWithStateRDD would be distributed evenly between the two executors, but it turned out that all the 32 were on one of them.

I noticed that you replyed 'Are you using KafkaUtils.createDirectStream? ' and I was wondering whether this Kafka direct stream lead to this situation. Or is there something else?

Thanks a lot!
Loading...