spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)
Date Mon, 05 Jan 2015 14:35:00 GMT
Hi RK,

Have you tried observing 'processing time' ?? You can derive it from the
Streaming metrics by  doing (lastReceivedBatch_processEndTime -
lastReceivedBatch_processStartTime)
I've observed the same issue and it seems to be a sudden failure in an
otherwise fine-running job. After several hours of execution, the job
starts degrading in performance, ultimately resulting in failure.

Here's a chart of 'processing time' over time. Each data point is the
processing time of the 10-second job interval.


‚Äč
We made a lot of progress in understanding the factors affecting the
streaming job performance that I summarized in this post:
http://www.virdata.com/tuning-spark/
Yet, this one is still puzzling us. I'm currently adding GC times to the
metrics, to see if I find a correlation.

-kr, Gerard.




On Thu, Jan 1, 2015 at 8:36 PM, RK <prk001@yahoo.com.invalid> wrote:

> TD, here is the processing & GC stats after running the job for 11 hours.
>
> [image: Inline image]
>
> Sorted all stages in desc order of duration. The getCallSite call that
> occurred recently tops the list at 35s even though getCallSite operation at
> the start of the job completed in 2s.
> [image: Inline image]
>
> All the map operations stay under 1 second throughout 11 hours.
> [image: Inline image]
>
> Here is more information about the longest ran getCallSite stage.
> [image: Inline image]
>
> [image: Inline image]
>
> [image: Inline image]
>
> Thanks,
> RK
>
>
>
>   On Tuesday, December 30, 2014 6:03 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>
> Which version of Spark Streaming are you using.
>
> When the batch processing time increases to 15-20 seconds, could you
> compare the task times compared to the tasks time when the application
> is just launched? Basically is the increase from 6 seconds to 15-20
> seconds is caused by increase in computation or because of GC's etc.
>
> On Tue, Dec 30, 2014 at 1:41 PM, RK <prk001@yahoo.com.invalid> wrote:
> > Here is the code for my streaming job.
> >
> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> > val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
> > sparkConf.set("spark.serializer",
> > "org.apache.spark.serializer.KryoSerializer")
> > sparkConf.set("spark.default.parallelism", "100")
> > sparkConf.set("spark.shuffle.consolidateFiles", "true")
> > sparkConf.set("spark.speculation", "true")
> > sparkConf.set("spark.speculation.interval", "5000")
> > sparkConf.set("spark.speculation.quantile", "0.9")
> > sparkConf.set("spark.speculation.multiplier", "3")
> > sparkConf.set("spark.mesos.coarse", "true")
> > sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
> > -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
> > sparkConf.set("spark.shuffle.manager", "SORT")
> >
> > val ssc = new StreamingContext(sparkConf, Seconds(10))
> > ssc.checkpoint(checkpointDir)
> >
> > val topics = "trace"
> > val numThreads = 1
> > val topicMap = topics.split(",").map((_,numThreads)).toMap
> >
> > val kafkaPartitions = 20
> > val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
> >  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> > }
> >
> > val lines = ssc.union(kafkaDStreams)
> > val words = lines.map(line => doSomething_1(line))
> > val filteredWords = words.filter(word => word != "test")
> > val groupedWords = filteredWords.map(word => (word, 1))
> >
> > val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
> > Seconds(30), Seconds(10))
> > val windowedWordsFiltered = windowedWordCounts.filter{case (word, count)
> =>
> > count > 50}
> > val finalResult = windowedWordsFiltered.foreachRDD(words =>
> > doSomething_2(words))
> >
> > ssc.start()
> > ssc.awaitTermination()
> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> >
> > I am running this job on a 9 slave AWS EC2 cluster with each slave node
> has
> > 32 vCPU & 60GB memory.
> >
> > When I start this job, the processing time is usually around 5 - 6
> seconds
> > for the 10 seconds batch and the scheduling delay is around 0 seconds or
> a
> > few ms. However, as the job run for 6 - 8 hours, the processing time
> > increases to 15 - 20 seconds but the scheduling delay is increasing to 4
> - 6
> > hours.
> >
> > When I look at the completed stages, I see that the time taken for
> > getCallSite at DStream.scala:294 keeps increasing as time passes by. It
> goes
> > from around 2 seconds to more than a few minutes.
> >
> > Clicking on +details next to this stage description shows the following
> > execution trace.
> > org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> > scala.Option.orElse(Option.scala:257)
> >
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
> >
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> > scala.util.Try$.apply(Try.scala:161)
> >
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> > org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
> >
> > When I click on one of these slow stages that executed after 6 - 8
> hours, I
> > find the following information for individual tasks inside.
> > - All tasks seem to execute with PROCESS_LOCAL locality.
> > - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
> > their time in GC. Although, when I look at the total memory usage on
> each of
> > the slave nodes under executors information, I see that the usage is only
> > around 200MB out of 20GB available.
> >
> > Even after a few hours, the map stages (val groupedWords =
> > filteredWords.map(word => (word, 1))) seem to have consistent times as
> > during the start of the job which seems to indicate that this code is
> fine.
> > Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
> >
> > Based on the information that map is as fast as during the start of job
> and
> > that there is no waiting batches, I am assuming that the getCallSite
> stages
> > correspond to getting data out of Kafka? Is this correct or not?
> > If my assumption is correct, Is there anything that I could do to
> optimize
> > receiving data from Kafka?
> > If not, which part of my code needs to be optimized to reduce the
> scheduling
> > delay?
> >
> > Thanks,
> > RK
>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

Mime
View raw message