spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vdukic <>
Subject [Spark Core] How do Spark workers exchange data in standalone mode?
Date Mon, 27 Feb 2017 08:24:41 GMT
Hello All,

I want to know more about data exchange between Spark workers in
standalone mode. Every time a task wants to read result of another task,
I want to log that event.

Information I need:
    source task / stage
    destination task / stage
    size of the data transfer

So far I've managed to do something similar by changing two methods in
Spark Core:

In order to get which task produced which partition / block, I added

logError(s"""PRODUCED SORT:
        |BlockId: ${blockId.shuffleId} ${blockId.mapId}
        |PartitionId: ${context.partitionId()}
        |TaskAttemptId: ${context.taskAttemptId()}
        |StageId: ${context.stageId()}

To get which task consumed which partition / block, I added to

blockIds.foreach{ blockId =>
           |BlockId: ${blockId},
           |PartitionId: ${context.partitionId()},
           |TaskAttemptId: ${context.taskAttemptId()}
           |StageId: ${context.stageId()},
           |Address: ${address}
           |Size: ${sizeMap(blockId)}

Using these two changes, I managed to partially reconstruct the
communication graph, but there are a couple of problems:
1. I cannot map all PRODUCED/CONSUMED logs
2. The amount of data (filed "size") does not match real traffic numbers
that I got from the OS. On the other hand, it matches the numbers for
Shuffle Read/Write on Spark History Server.

I've found an article that explains data exchange in Apache Flink to a
certain extent. Is there something similar for Spark?


View this message in context:
Sent from the Apache Spark Developers List mailing list archive at

To unsubscribe e-mail:

View raw message