spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From RK <prk...@yahoo.com.INVALID>
Subject Re: Gradual slow down of the Streaming job (getCallSite at DStream.scala:294)
Date Thu, 01 Jan 2015 19:36:48 GMT
TD, here is the processing & GC stats after running the job for 11 hours.


Sorted all stages in desc order of duration. The getCallSite call that occurred recently tops
the list at 35s even though getCallSite operation at the start of the job completed in 2s.

All the map operations stay under 1 second throughout 11 hours.

Here is more information about the longest ran getCallSite stage.





Thanks,RK
 

     On Tuesday, December 30, 2014 6:03 PM, Tathagata Das <tathagata.das1565@gmail.com>
wrote:
   

 Which version of Spark Streaming are you using.

When the batch processing time increases to 15-20 seconds, could you
compare the task times compared to the tasks time when the application
is just launched? Basically is the increase from 6 seconds to 15-20
seconds is caused by increase in computation or because of GC's etc.

On Tue, Dec 30, 2014 at 1:41 PM, RK <prk001@yahoo.com.invalid> wrote:
> Here is the code for my streaming job.
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.default.parallelism", "100")
> sparkConf.set("spark.shuffle.consolidateFiles", "true")
> sparkConf.set("spark.speculation", "true")
> sparkConf.set("spark.speculation.interval", "5000")
> sparkConf.set("spark.speculation.quantile", "0.9")
> sparkConf.set("spark.speculation.multiplier", "3")
> sparkConf.set("spark.mesos.coarse", "true")
> sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
> sparkConf.set("spark.shuffle.manager", "SORT")
>
> val ssc = new StreamingContext(sparkConf, Seconds(10))
> ssc.checkpoint(checkpointDir)
>
> val topics = "trace"
> val numThreads = 1
> val topicMap = topics.split(",").map((_,numThreads)).toMap
>
> val kafkaPartitions = 20
> val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
>  KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> }
>
> val lines = ssc.union(kafkaDStreams)
> val words = lines.map(line => doSomething_1(line))
> val filteredWords = words.filter(word => word != "test")
> val groupedWords = filteredWords.map(word => (word, 1))
>
> val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
> Seconds(30), Seconds(10))
> val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) =>
> count > 50}
> val finalResult = windowedWordsFiltered.foreachRDD(words =>
> doSomething_2(words))
>
> ssc.start()
> ssc.awaitTermination()
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> I am running this job on a 9 slave AWS EC2 cluster with each slave node has
> 32 vCPU & 60GB memory.
>
> When I start this job, the processing time is usually around 5 - 6 seconds
> for the 10 seconds batch and the scheduling delay is around 0 seconds or a
> few ms. However, as the job run for 6 - 8 hours, the processing time
> increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6
> hours.
>
> When I look at the completed stages, I see that the time taken for
> getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes
> from around 2 seconds to more than a few minutes.
>
> Clicking on +details next to this stage description shows the following
> execution trace.
> org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> scala.Option.orElse(Option.scala:257)
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> scala.util.Try$.apply(Try.scala:161)
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>
> When I click on one of these slow stages that executed after 6 - 8 hours, I
> find the following information for individual tasks inside.
> - All tasks seem to execute with PROCESS_LOCAL locality.
> - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
> their time in GC. Although, when I look at the total memory usage on each of
> the slave nodes under executors information, I see that the usage is only
> around 200MB out of 20GB available.
>
> Even after a few hours, the map stages (val groupedWords =
> filteredWords.map(word => (word, 1))) seem to have consistent times as
> during the start of the job which seems to indicate that this code is fine.
> Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
>
> Based on the information that map is as fast as during the start of job and
> that there is no waiting batches, I am assuming that the getCallSite stages
> correspond to getting data out of Kafka? Is this correct or not?
> If my assumption is correct, Is there anything that I could do to optimize
> receiving data from Kafka?
> If not, which part of my code needs to be optimized to reduce the scheduling
> delay?
>
> Thanks,
> RK
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



   
Mime
View raw message