[Spark Core] How do Spark workers exchange data in standalone mode?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[Spark Core] How do Spark workers exchange data in standalone mode?

Hello All,

I want to know more about data exchange between Spark workers in
standalone mode. Every time a task wants to read result of another task,
I want to log that event.

Information I need:
    source task / stage
    destination task / stage
    size of the data transfer

So far I've managed to do something similar by changing two methods in
Spark Core:

In order to get which task produced which partition / block, I added

logError(s"""PRODUCED SORT:
        |BlockId: ${blockId.shuffleId} ${blockId.mapId}
        |PartitionId: ${context.partitionId()}
        |TaskAttemptId: ${context.taskAttemptId()}
        |StageId: ${context.stageId()}

To get which task consumed which partition / block, I added to ShuffleBlockFetcherIterator.scala#ShuffleBlockFetcherIterator#sendRequest

blockIds.foreach{ blockId =>
           |BlockId: ${blockId},
           |PartitionId: ${context.partitionId()},
           |TaskAttemptId: ${context.taskAttemptId()}
           |StageId: ${context.stageId()},
           |Address: ${address}
           |Size: ${sizeMap(blockId)}

Using these two changes, I managed to partially reconstruct the
communication graph, but there are a couple of problems:
1. I cannot map all PRODUCED/CONSUMED logs
2. The amount of data (filed "size") does not match real traffic numbers
that I got from the OS. On the other hand, it matches the numbers for
Shuffle Read/Write on Spark History Server.

I've found an article that explains data exchange in Apache Flink to a
certain extent. Is there something similar for Spark?