spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Tanase <>
Subject Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka
Date Tue, 06 Oct 2015 19:33:46 GMT
Also check if the Kafka cluster is still balanced. Maybe one of the brokers manages too many
partitions, all the work will stay on that executor unless you repartition right after kakfka
(and I'm not saying you should).

Sent from my iPhone

On 06 Oct 2015, at 22:17, Cody Koeninger <<>>

I'm not clear on what you're measuring.  Can you post relevant code snippets including the
measurement code?

As far as kafka metrics, nothing currently.  There is an info-level log message every time
a kafka rdd iterator is instantiated,<>(s"Computing topic ${part.topic}, partition ${part.partition}
" +

      s"offsets ${part.fromOffset} -> ${part.untilOffset}")

If you log once you're done with an iterator you should be able to see the delta.

The other thing to try is reduce the number of parts involved in the job to isolate it ...
first thing I'd do there is take cassandra out of the equation.

On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <<>>
Hi Cody,

The job is doing ETL from Kafka records to Cassandra. After a single filtering stage on Spark,
the 'TL' part is done using the dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.

We have metrics on the executor work which we collect and add together, indicated here by
'local computation'.  As you can see, we also measure how much it cost us to measure :-)
See how 'local work'  times are comparable.  What's not visible is the task scheduling and
consuming the data from Kafka which becomes part of the 'spark computation' part.

The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...

Are there metrics available somehow on the Kafka reading time?

        Slow Task       Fast Task
local computation       347.6   281.53
spark computation       6930    263
metric collection       70      138
wall clock process      7000    401
total records processed 4297    5002

(time in ms)

kr, Gerard.

On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <<>>
Can you say anything more about what the job is doing?

First thing I'd do is try to get some metrics on the time taken by your code on the executors
(e.g. when processing the iterator) to see if it's consistent between the two situations.

On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <<>>

We recently migrated our streaming jobs to the direct kafka receiver. Our initial migration
went quite fine but now we are seeing a weird zig-zag performance pattern we cannot explain.
In alternating fashion, one task takes about 1 second to finish and the next takes 7sec for
a stable streaming rate.

Here are comparable metrics for two successive tasks:


Executor ID     Address Task Time       Total Tasks     Failed Tasks    Succeeded Tasks
20151006-044141-2408867082-5050-21047-S0        dnode-3.hdfs.private:36863      22 s    3
      0       3
20151006-044141-2408867082-5050-21047-S1        dnode-0.hdfs.private:43812      40 s    11
     0       11
20151006-044141-2408867082-5050-21047-S4        dnode-5.hdfs.private:59945      49 s    10
     0       10

Executor ID     Address Task Time       Total Tasks     Failed Tasks    Succeeded Tasks
20151006-044141-2408867082-5050-21047-S0        dnode-3.hdfs.private:36863      0.6 s   4
      0       4
20151006-044141-2408867082-5050-21047-S1        dnode-0.hdfs.private:43812      1 s     9
      0       9
20151006-044141-2408867082-5050-21047-S4        dnode-5.hdfs.private:59945      1 s     11
     0       11
We have some custom metrics that measure wall-clock time of execution of certain blocks of
the job, like the time it takes to do the local computations (RDD.foreachPartition closure)
vs total time.
The difference between the slow and fast executing task is on the 'spark computation time'
which is wall-clock for the task scheduling (DStream.foreachRDD closure)

Slow task:

local computation time: 347.60968499999996, spark computation time: 6930, metric collection:
70, total process: 7000, total_records: 4297

Fast task:
local computation time: 281.539042, spark computation time: 263, metric collection: 138, total
process: 401, total_records: 5002

We are currently running Spark 1.4.1. The load and the work to be done is stable -this is
on a dev env with that stuff under control.

Any ideas what this behavior could be?

thanks in advance,  Gerard.

View raw message