spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)
Date Mon, 05 Jan 2015 19:15:22 GMT
>From the screenshots its clear that GC of 30 seconds is the culprit. If one
of the node is stuck in GC that delays the whole  stage/job/batch. Here are
some more questions.

- what is the storage levels used in the input streams and/or persisted
streams?

- could you go to the log of the executor that suffered the GC and see what
is usage of the block manager? You should find log4j lines saying
"MemoryStore: added xxx MB, free yy MB). Also if you can print GC lines, do
check the generation sizes.

- Do you see the problem if you are not doing the window operation? That is
if you do a simple filter, map, etc.? If no, can you try doing
reduceByKeyAndWindow without the "_ + _" (the other version of
reduceByKeyAndWindow) and see the problem still persists?

TD
On Jan 5, 2015 6:35 AM, "Gerard Maas" <gerard.maas@gmail.com> wrote:

> Hi RK,
>
> Have you tried observing 'processing time' ?? You can derive it from the
> Streaming metrics by  doing (lastReceivedBatch_processEndTime -
> lastReceivedBatch_processStartTime)
> I've observed the same issue and it seems to be a sudden failure in an
> otherwise fine-running job. After several hours of execution, the job
> starts degrading in performance, ultimately resulting in failure.
>
> Here's a chart of 'processing time' over time. Each data point is the
> processing time of the 10-second job interval.
>
>
> ‚Äč
> We made a lot of progress in understanding the factors affecting the
> streaming job performance that I summarized in this post:
> http://www.virdata.com/tuning-spark/
> Yet, this one is still puzzling us. I'm currently adding GC times to the
> metrics, to see if I find a correlation.
>
> -kr, Gerard.
>
>
>
>
> On Thu, Jan 1, 2015 at 8:36 PM, RK <prk001@yahoo.com.invalid> wrote:
>
>> TD, here is the processing & GC stats after running the job for 11 hours.
>>
>> [image: Inline image]
>>
>> Sorted all stages in desc order of duration. The getCallSite call that
>> occurred recently tops the list at 35s even though getCallSite operation at
>> the start of the job completed in 2s.
>> [image: Inline image]
>>
>> All the map operations stay under 1 second throughout 11 hours.
>> [image: Inline image]
>>
>> Here is more information about the longest ran getCallSite stage.
>> [image: Inline image]
>>
>> [image: Inline image]
>>
>> [image: Inline image]
>>
>> Thanks,
>> RK
>>
>>
>>
>>   On Tuesday, December 30, 2014 6:03 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>
>> Which version of Spark Streaming are you using.
>>
>> When the batch processing time increases to 15-20 seconds, could you
>> compare the task times compared to the tasks time when the application
>> is just launched? Basically is the increase from 6 seconds to 15-20
>> seconds is caused by increase in computation or because of GC's etc.
>>
>> On Tue, Dec 30, 2014 at 1:41 PM, RK <prk001@yahoo.com.invalid> wrote:
>> > Here is the code for my streaming job.
>> >
>> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>> > val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
>> > sparkConf.set("spark.serializer",
>> > "org.apache.spark.serializer.KryoSerializer")
>> > sparkConf.set("spark.default.parallelism", "100")
>> > sparkConf.set("spark.shuffle.consolidateFiles", "true")
>> > sparkConf.set("spark.speculation", "true")
>> > sparkConf.set("spark.speculation.interval", "5000")
>> > sparkConf.set("spark.speculation.quantile", "0.9")
>> > sparkConf.set("spark.speculation.multiplier", "3")
>> > sparkConf.set("spark.mesos.coarse", "true")
>> > sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
>> > -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
>> > sparkConf.set("spark.shuffle.manager", "SORT")
>> >
>> > val ssc = new StreamingContext(sparkConf, Seconds(10))
>> > ssc.checkpoint(checkpointDir)
>> >
>> > val topics = "trace"
>> > val numThreads = 1
>> > val topicMap = topics.split(",").map((_,numThreads)).toMap
>> >
>> > val kafkaPartitions = 20
>> > val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
>> >  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
>> > }
>> >
>> > val lines = ssc.union(kafkaDStreams)
>> > val words = lines.map(line => doSomething_1(line))
>> > val filteredWords = words.filter(word => word != "test")
>> > val groupedWords = filteredWords.map(word => (word, 1))
>> >
>> > val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
>> > Seconds(30), Seconds(10))
>> > val windowedWordsFiltered = windowedWordCounts.filter{case (word,
>> count) =>
>> > count > 50}
>> > val finalResult = windowedWordsFiltered.foreachRDD(words =>
>> > doSomething_2(words))
>> >
>> > ssc.start()
>> > ssc.awaitTermination()
>> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>> >
>> > I am running this job on a 9 slave AWS EC2 cluster with each slave node
>> has
>> > 32 vCPU & 60GB memory.
>> >
>> > When I start this job, the processing time is usually around 5 - 6
>> seconds
>> > for the 10 seconds batch and the scheduling delay is around 0 seconds
>> or a
>> > few ms. However, as the job run for 6 - 8 hours, the processing time
>> > increases to 15 - 20 seconds but the scheduling delay is increasing to
>> 4 - 6
>> > hours.
>> >
>> > When I look at the completed stages, I see that the time taken for
>> > getCallSite at DStream.scala:294 keeps increasing as time passes by. It
>> goes
>> > from around 2 seconds to more than a few minutes.
>> >
>> > Clicking on +details next to this stage description shows the following
>> > execution trace.
>> > org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
>> >
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
>> >
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
>> > scala.Option.orElse(Option.scala:257)
>> >
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
>> >
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> >
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> >
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> >
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> >
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> >
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> >
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>> >
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>> >
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
>> > scala.util.Try$.apply(Try.scala:161)
>> >
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
>> > org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>> >
>> > When I click on one of these slow stages that executed after 6 - 8
>> hours, I
>> > find the following information for individual tasks inside.
>> > - All tasks seem to execute with PROCESS_LOCAL locality.
>> > - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
>> > their time in GC. Although, when I look at the total memory usage on
>> each of
>> > the slave nodes under executors information, I see that the usage is
>> only
>> > around 200MB out of 20GB available.
>> >
>> > Even after a few hours, the map stages (val groupedWords =
>> > filteredWords.map(word => (word, 1))) seem to have consistent times as
>> > during the start of the job which seems to indicate that this code is
>> fine.
>> > Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
>> >
>> > Based on the information that map is as fast as during the start of job
>> and
>> > that there is no waiting batches, I am assuming that the getCallSite
>> stages
>> > correspond to getting data out of Kafka? Is this correct or not?
>> > If my assumption is correct, Is there anything that I could do to
>> optimize
>> > receiving data from Kafka?
>> > If not, which part of my code needs to be optimized to reduce the
>> scheduling
>> > delay?
>> >
>> > Thanks,
>> > RK
>>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>

Mime
View raw message