spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Smith <secs...@gmail.com>
Subject Streaming job concurrency (was: Re: Streaming scheduling delay)
Date Sat, 14 Feb 2015 23:19:53 GMT
I think I might have found the culprit here, going to make some code
changes to test but meanwhile, will appreciate if someone can chime in
whether I am on the right track. I think I also figured out why dStream
partitioning was providing better throughput but possibly introducing other
issues with shuffling data around the cluster.

To recap, I have 23 kafka nodes, the spark app reads from one topic with 50
partitions and writes out to 24 topics, again, each with 50 topics. Time
and again, I tested and found out that the writing out to kafka stage is
what slows down the entire batch. If I cut out the write to kafka, then
processing/total delay is very comfortably within the batch window time.
The obvious culprit one would think would be something to do with how the
kafkaWriter network object is created and handled or too much network load.

Here's how I setup a run of the app:
- 100 executors, each with 4 cores and 2GB of RAM (spread over 23 physical
servers)
- 5 receivers/dStreams, each with one thread (I should give each 10 threads
but it does fine with a single thread, we have no issues in consuming from
kafka, the single thread manages just fine multiplexing over 10 partitions
that kafka allocates it)
- spark.streaming.blockInterval 500
- spark batch window is set to 10 seconds

Now, with a 10 second batch window and 500ms blockInterval, we produce 20
tasks. Follow so far?

Now looking at the SparkUI, first thing I noticed (when I stopped fiddling
with Spark and Kafka params), is that only one job is active at a time and
it is at a foreachRDD statement. Why is only one job active at a time when
we created 5 receivers??!! We want five receivers/dStreams to be processed
in parallel. Right? I think the answer had been staring at me for the
longest time but I just was on the wrong track or investigating the
gazillion other knobs/dials/theories. And, it is an extremely simple yet
stupid mistake. Look at this code:
val kInStreams = …..create n streams

for (k <- kInStreams)
    {
     …. apply a transformation function using map() ….
     ….. foreachRDD (. .. foreachPartition (write out to kafka) ) …..
    }

---->>>"for vs foreach"!!!<<<----

I took a perfectly parallel system and hobbled it with a sequential
processing construct. Spark zooms along in parallel until it hits the "for"
and then says ok, I will process each dStream, one at a time. D'oh!

Drilling further into the SparkUI, I noticed that we have only one active
job at any given point in time, each job has only one Stage and all 20
tasks in the stage are running on the same worker node. I also noticed that
out of the 20 tasks, 16 would get done real quick and then 4 would take
longer. Presumably, the 4 tasks are associated with writing out to kafka.

So we ended up with exactly the opposite of what we wanted. Instead of
having all worker nodes process all the inbound dStreams in parallel, we
only have one active worker node at any given point in time.

What dStream partitioning was doing was within the for loop, taking a
dStream and spreading the load to other nodes, presumably. That is why a
high number of dStream partitions seemed to work fine in terms of
throughput but they introduced un-necessary shuffle in the cluster and
shuffle related bugs reared their head.

Update1: As I was writing this email, I thought I'd make the quick code
change from "for" -> "foreach" and test. Quickly made the code change, push
out the jar, run it and hit the SparkUI, excitedly expecting to see as many
jobs to be launched as number of receivers/dStreams. And ..... drum rolll
...... #fail. Nope, SparkUI still showed just one active job. Quick
googling for "spark only one job active" brought out the undocumented
"spark.streaming.concurrentJobs" parameter! Testing it now.

Update2: Yeah baby!!! That did it :))) I set job concurrency to 20 and my
throughput is through the roof now. Because messages in the kafka
partitions aren't exactly evenly distributed, some jobs/receivers get more
and some get less so some jobs take longer but most get done quickly.

I am still going to clean up the code related to connection pooling and
monitor this app for stability and I don't know what other pandora's box
this undocumented feature opens up but I am going to now enjoy my long
weekend :D

Stay tuned for more updates on stability or other issues that crop up :)

Thanks,

Tim






On Fri, Feb 13, 2015 at 2:54 PM, Tim Smith <secsubs@gmail.com> wrote:

> First, thanks everyone for helping me clear up the code and troubleshoot
> the pipeline. Going inline now.
>
> On Fri, Feb 13, 2015 at 2:21 AM, Gerard Maas <gerard.maas@gmail.com>
> wrote:
>
>> Hi Tim,
>>
>> Combining the advice in this thread, I think the job is creating too many
>> [Spark] partitions and the overhead of creating a Kafka producer for each
>> partition is killing performance.  Could you send a snapshot of the Spark
>> UI/Stages page?
>>
>
> Attaching snapshots from the SparkUI. How do I lookup how many Spark
> partitions are created and how many tasks are pending at any given point? I
> re-tested the app after disabling writetoKafka and replacing it with
> rdd.count() and I think it is pretty clear that it the whole writing out to
> kafka that's slowing down the batches.
>
>
>
>
>
>
> ​
>
> I made other small optimizations to test different knobs/dials, so now, I
> can control these by passing runtime args to the app:
> - RDD persistence (turns on persistence on/off for all RDDs before the
> "outdata" RDD). Doesn't seem to have any measurable impact on performance.
> - kafka consumer threads
> - producer compression - don't see any impact
> - WriteToKafka or do rdd.count - to test app performance sans writeToKafka
>
> In addition, I have removed the dStream partitioning logic and commented
> out a few "logInfo" statements I had in the kafka writer class.
>
> After all these, my app isn't crashing anymore with input block missing
> errors but total delay is enormous and throughput is poor. In contrast,
> with dStream partitioning, I had stability issues but inconsistent total
> delay results but when the app did work, the output was equal to input.
>
>
>
>> How are your input Kafka topics partitioned ? (I mean at kafka level - I
>> know "partitions" get confusing when we mix Spark and Kafka)
>>
>
> I have 23 kafka nodes, each with 5 disks. There are 25 topics, each with
> 50 partitions. Out of the 25 topics, one is read by the Spark Streaming app
> and then it writes out to the 24 other topics depending on the input
> message. The one topic that Spark reads from, is populated by rsyslog using
> the default partitioner and null key. In the kafka producer in my app, I am
> using the default partitioner and random number between 1 and 100 to
> populate the partitioning key (I could leave it null but a downstream flume
> consumer throws NPE errors if the kafka topic has a null key).
> producer.send(v.map(m => new KeyedMessage[String,String](topic,
> r.nextInt(100).toString, m)): _*)
>
>
>
>
>> Here's how I'd restructure the job:
>>
>> - Introduce a Producer factory as suggested by Cody, which pools
>> producers per JVM (a Scala object is a great vessel for this)
>>
> Agreed. Working on this.
>
>
>> - *DO NOT* process the kafka consumers independently. This multiplies the
>> number of tasks by n consumers - Union them instead
>>
> Won't "union" introduce un-necessary shuffle? In my app, each incoming
> message is processed independent of other messages so am I better off
> controlling # of Spark tasks by other methods?
>
>
>
>> - *DO NOT* repartition. Use spark.streaming.blockInterval to control the
>> nr of partitions. Try to keep the nr of partitions low to minimize the per
>> task scheduling overhead on Spark  but high enough that they don't become
>> too big to process.
>>
> Yes, already took out "repartition". Going to play with "blockInternval"
> to control number of spark partitions.
>
>
>
>>
>> Applying these tips to the code, the job should look like this:
>>
>> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
>> KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
>> Map(otherConf("kafkaConsumerTopic").toString ->
>> (##kafkaPartitions/inStreamCount##)), StorageLevel.MEMORY_AND_DISK_SER) }
>> val kInStream = ssc.union(kInStreams)
>>
>> val producerProps = otherConf("kafkaProducerTopic").toString
>>
>> val kOutStream = kInStream.map(x=>myfunc(x._2,configMap))
>> outdata.foreachRDD{ rdd =>
>>     rdd.foreachPartition{ rec =>
>>         val writer =
>> KafkaOutputServicePool.getProducer().send(producerProps, propsMap)
>>         writer.output(rec)
>>     }
>> }
>>
>> Compare the Spark UI/Stages page before and after and you should see a
>> strong decrease in the amount of Stages being processed by Spark and hence
>> the time it takes to process the data in each Streaming interval.
>>
> I see two active jobs and one stage only per active job. Total number of
> tasks for all stages is ~30 (depending on how I configure my app launch).
>
> Some more dial/knobs I have set for the app:
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.rdd.compress true
> spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec
> spark.akka.threads 128
> spark.akka.frameSize 500
> spark.task.maxFailures 64
> spark.scheduler.mode FIFO
> spark.yarn.executor.memoryOverhead 1024
> spark.yarn.driver.memoryOverhead 4096
> spark.shuffle.consolidateFiles true
> spark.default.parallelism 528
> spark.executor.logs.rolling.strategy size
> spark.executor.logs.rolling.size.maxBytes 2000000000
> spark.executor.logs.rolling.maxRetainedFiles 1
> spark.eventLog.enabled true
> spark.streaming.receiver.maxRate 2000
> spark.streaming.unpersist true
> spark.locality.wait 500
>
>
>
>>
>> I hope you see something like this:
>>
>
> Working to get in the recommended changes. Thanks again. I will report
> back soon.
>
> - Tim
>
>
>
>
>>
>> kr, Gerard.​
>>
>>
>> On Fri, Feb 13, 2015 at 8:44 AM, Saisai Shao <sai.sai.shao@gmail.com>
>> wrote:
>>
>>> Yes, you can try it. For example, if you have a cluster of 10 executors,
>>> 60 Kafka partitions, you can try to choose 10 receivers * 2 consumer
>>> threads, so each thread will consume 3 partitions ideally, if you increase
>>> the threads to 6, each threads will consume 1 partitions ideally. What I
>>> think importantly is that each executor will have a receiver, so the data
>>> will be distributed to each executor.
>>>
>>> If you have a large cluster even number of executors are more than the
>>> Kafka partitions, maybe you need to increase the Kafka partitions to
>>> increase the parallelism, otherwise some of the computation resources may
>>> be idle.
>>>
>>> Besides if executors * consumers > Kafka partitions, the left consumers
>>> beyond partition numbers will be idle, each partition could only be
>>> consumed by one consumer.
>>>
>>> We have a in house benchmark cluster with such deploy criterion, I'm not
>>> sure if it works for you, you can try it.
>>>
>>> Thanks
>>> Saisai
>>>
>>> 2015-02-13 15:19 GMT+08:00 Tim Smith <secsubs@gmail.com>:
>>>
>>>> Hi Saisai,
>>>>
>>>> If I understand correctly, you are suggesting that control parallelism
>>>> by having number of consumers/executors at least 1:1 for number of kafka
>>>> partitions. For example, if I have 50 partitions for a kafka topic then
>>>> either have:
>>>> - 25 or more executors, 25 receivers, each receiver set to 2 consumer
>>>> threads per topic, or,
>>>> - 50 or more executors, 50 receivers, each receiver set to 1 consumer
>>>> thread per topic
>>>>
>>>> Actually, both executors and total consumers can be more than the
>>>> number of kafka partitions (some will probably sit idle).
>>>>
>>>> But do away with dStream partitioning altogether.
>>>>
>>>> Right?
>>>>
>>>> Thanks,
>>>>
>>>> - Tim
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao <sai.sai.shao@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Tim,
>>>>>
>>>>> I think maybe you can try this way:
>>>>>
>>>>> create Receiver per executor and specify thread for each topic large
>>>>> than 1, and the total number of consumer thread will be: total consumer =
>>>>> (receiver number) * (thread number), and make sure this total consumer is
>>>>> less than or equal to Kafka partition number. In this case, I think the
>>>>> parallelism is enough, received blocks are distributed to each executor. So
>>>>> you don't need to repartition to increase the parallelism.
>>>>>
>>>>> Besides for Kafka's high-level API, Kafka partitions may not be
>>>>> equally distributed to all the receivers, so some tasks may process more
>>>>> data than other tasks. another way you can try DirectKafkaInputDStream in
>>>>> Spark 1.3, that will be more balanced because each Kafka partition mapping
>>>>> to Spark partition.
>>>>>
>>>>>
>>>>> Besides "set partition count to 1 for each dStream" means
>>>>> dstream.repartition(1) ? If so I think it will still introduce shuffle and
>>>>> move all the data into one partition.
>>>>>
>>>>> Thanks
>>>>> Saisai
>>>>>
>>>>> 2015-02-13 13:54 GMT+08:00 Tim Smith <secsubs@gmail.com>:
>>>>>
>>>>>> TD - I will try count() and report back. Meanwhile, attached is the
>>>>>> entire driver log that includes the error logs about missing blocks.
>>>>>>
>>>>>> Cody - Let me research a bit about how to do connection pooling.
>>>>>> Sorry, I am not really a programmer. I did see the connection pooling
>>>>>> advise in the Spark Streaming Programming guide as an optimization but
>>>>>> wasn't sure how to implement it. But do you think it will have a
>>>>>> significant impact on performance?
>>>>>>
>>>>>> Saisai - I think, ideally, I'd rather not do any dStream
>>>>>> partitioning. Instead have 1 receiver for each kafka partition (so in this
>>>>>> case 23 receivers for 23 kafka partitions) and then have as many or more
>>>>>> executors to handle processing of the dStreams. Right? Trouble is, I tried
>>>>>> this approach and didn't work. Even If I set 23 receivers, and set
>>>>>> partition count to 1 for each dStream (effectively, no stream splitting),
>>>>>> my performance is extremely poor/laggy. Should I modify my code to remove
>>>>>> dStream partitioning altogether and then try setting as many receivers as
>>>>>> kafka partitions?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao <sai.sai.shao@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Tim,
>>>>>>>
>>>>>>> I think this code will still introduce shuffle even when you call
>>>>>>> repartition on each input stream. Actually this style of implementation
>>>>>>> will generate more jobs (job per each input stream) than union into one
>>>>>>> stream as called DStream.union(), and union normally has no special
>>>>>>> overhead as I understood.
>>>>>>>
>>>>>>> Also as Cody said, creating Producer per partition could be a
>>>>>>> potential overhead, producer pool or sharing the Producer for one executor
>>>>>>> might be better :).
>>>>>>>
>>>>>>>
>>>>>>>          // Process stream from each receiver separately
>>>>>>>          // (do not attempt to merge all streams and then
>>>>>>> re-partition, this causes un-necessary and high amount of shuffle in the
>>>>>>> job)
>>>>>>>          for (k <- kInStreams)
>>>>>>>                 {
>>>>>>>                  // Re-partition stream from each receiver across
>>>>>>> all compute nodes to spread out processing load and allows per partition
>>>>>>> processing
>>>>>>>                  // and, set persistence level to spill to disk
>>>>>>> along with serialization
>>>>>>>                  val kInMsgParts =
>>>>>>> k.repartition(otherConf("dStreamPartitions").toInt).
>>>>>>>
>>>>>>> 2015-02-13 13:27 GMT+08:00 Cody Koeninger <cody@koeninger.org>:
>>>>>>>
>>>>>>>> outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>>>>>>>>
>>>>>>>>      val writer = new
>>>>>>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>>>>>>>
>>>>>>>>      writer.output(rec)
>>>>>>>>
>>>>>>>>     }) )
>>>>>>>>
>>>>>>>>
>>>>>>>> So this is creating a new kafka producer for every new output
>>>>>>>> partition, right?  Have you tried pooling the producers?
>>>>>>>>
>>>>>>>> On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith <secsubs@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> 1) Yes, if I disable writing out to kafka and replace it with some
>>>>>>>>> very light weight action is rdd.take(1), the app is stable.
>>>>>>>>>
>>>>>>>>> 2) The partitions I spoke of in the previous mail are the number
>>>>>>>>> of partitions I create from each dStream. But yes, since I do processing
>>>>>>>>> and writing out, per partition, each dStream partition ends up getting
>>>>>>>>> written to a kafka partition. Flow is, broadly:
>>>>>>>>> 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions
>>>>>>>>> (150 partitions) -> Apply some transformation logic to each partition ->
>>>>>>>>> write out each partition to kafka (kafka has 23 partitions). Let me
>>>>>>>>> increase the number of partitions on the kafka side and see if that helps.
>>>>>>>>>
>>>>>>>>> Here's what the main block of code looks like (I added persistence
>>>>>>>>> back):
>>>>>>>>>
>>>>>>>>> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
>>>>>>>>> KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
>>>>>>>>> Map(otherConf("kafkaConsumerTopic").toString -> 1),
>>>>>>>>> StorageLevel.MEMORY_AND_DISK_SER) }
>>>>>>>>>
>>>>>>>>> if (!configMap.keySet.isEmpty)
>>>>>>>>>         {
>>>>>>>>>          // Process stream from each receiver separately
>>>>>>>>>          // (do not attempt to merge all streams and then
>>>>>>>>> re-partition, this causes un-necessary and high amount of shuffle in the
>>>>>>>>> job)
>>>>>>>>>          for (k <- kInStreams)
>>>>>>>>>                 {
>>>>>>>>>                  // Re-partition stream from each receiver across
>>>>>>>>> all compute nodes to spread out processing load and allows per partition
>>>>>>>>> processing
>>>>>>>>>                  // and, set persistence level to spill to disk
>>>>>>>>> along with serialization
>>>>>>>>>                  val kInMsgParts =
>>>>>>>>> k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>>>>>>
>>>>>>>>>                  val outdata =
>>>>>>>>> kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>>>>>>
>>>>>>>>>                  // Write each transformed partition to Kafka via
>>>>>>>>> the "writer" object
>>>>>>>>>                  outdata.foreachRDD( rdd =>
>>>>>>>>> rdd.foreachPartition(rec => {
>>>>>>>>>
>>>>>>>>>        val writer = new
>>>>>>>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>>>>>>>>
>>>>>>>>>        writer.output(rec)
>>>>>>>>>
>>>>>>>>>       }) )
>>>>>>>>>                 }
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Here's the life-cycle of a lost block:
>>>>>>>>>
>>>>>>>>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added
>>>>>>>>> input-4-1423758372200 in memory on nodedn1-23-acme.com:34526
>>>>>>>>> (size: 164.7 KB, free: 379.5 MB)
>>>>>>>>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added
>>>>>>>>> input-4-1423758372200 in memory on nodedn1-22-acme.com:42084
>>>>>>>>> (size: 164.7 KB, free: 366.5 MB)
>>>>>>>>> 15/02/12 16:31:21 INFO BlockManagerInfo: Added
>>>>>>>>> input-4-1423758372200 on disk on nodedn1-22-acme.com:42084 (size:
>>>>>>>>> 164.7 KB)
>>>>>>>>> 15/02/12 16:31:23 INFO BlockManagerInfo: Added
>>>>>>>>> input-4-1423758372200 on disk on nodedn1-23-acme.com:34526 (size:
>>>>>>>>> 164.7 KB)
>>>>>>>>> 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage
>>>>>>>>> 16291.0 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception:
>>>>>>>>> Could not compute split, block input-4-1423758372200 not found
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage
>>>>>>>>> 16291.0 (TID 1042575) on executor nodedn1-21-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 1]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage
>>>>>>>>> 16291.0 (TID 1042581) on executor nodedn1-21-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 2]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage
>>>>>>>>> 16291.0 (TID 1042584) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 3]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage
>>>>>>>>> 16291.0 (TID 1042586) on executor nodedn1-11-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 4]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage
>>>>>>>>> 16291.0 (TID 1042589) on executor nodedn1-14-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 5]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage
>>>>>>>>> 16291.0 (TID 1042594) on executor nodedn1-15-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 6]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage
>>>>>>>>> 16291.0 (TID 1042597) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 7]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage
>>>>>>>>> 16291.0 (TID 1042600) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 8]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage
>>>>>>>>> 16291.0 (TID 1042606) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 9]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage
>>>>>>>>> 16291.0 (TID 1042609) on executor nodedn1-19-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 10]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.11 in stage
>>>>>>>>> 16291.0 (TID 1042611) on executor nodedn1-14-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 11]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.12 in stage
>>>>>>>>> 16291.0 (TID 1042615) on executor nodedn1-22-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 12]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.13 in stage
>>>>>>>>> 16291.0 (TID 1042618) on executor nodedn1-19-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 13]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.14 in stage
>>>>>>>>> 16291.0 (TID 1042622) on executor nodedn1-21-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 14]
>>>>>>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.15 in stage
>>>>>>>>> 16291.0 (TID 1042625) on executor nodedn1-16-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 15]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.16 in stage
>>>>>>>>> 16291.0 (TID 1042628) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 16]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.17 in stage
>>>>>>>>> 16291.0 (TID 1042633) on executor nodedn1-16-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 17]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.18 in stage
>>>>>>>>> 16291.0 (TID 1042636) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 18]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.19 in stage
>>>>>>>>> 16291.0 (TID 1042640) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 19]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.20 in stage
>>>>>>>>> 16291.0 (TID 1042642) on executor nodedn1-16-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 20]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.21 in stage
>>>>>>>>> 16291.0 (TID 1042645) on executor nodedn1-16-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 21]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.22 in stage
>>>>>>>>> 16291.0 (TID 1042648) on executor nodedn1-14-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 22]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.23 in stage
>>>>>>>>> 16291.0 (TID 1042654) on executor nodedn1-23-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 23]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.24 in stage
>>>>>>>>> 16291.0 (TID 1042660) on executor nodedn1-22-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 24]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.25 in stage
>>>>>>>>> 16291.0 (TID 1042662) on executor nodedn1-21-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 25]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.26 in stage
>>>>>>>>> 16291.0 (TID 1042663) on executor nodedn1-14-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 26]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.27 in stage
>>>>>>>>> 16291.0 (TID 1042665) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 27]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.28 in stage
>>>>>>>>> 16291.0 (TID 1042667) on executor nodedn1-19-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 28]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.29 in stage
>>>>>>>>> 16291.0 (TID 1042671) on executor nodedn1-14-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 29]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.30 in stage
>>>>>>>>> 16291.0 (TID 1042672) on executor nodedn1-12-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 30]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.31 in stage
>>>>>>>>> 16291.0 (TID 1042674) on executor nodedn1-14-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 31]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.32 in stage
>>>>>>>>> 16291.0 (TID 1042677) on executor nodedn1-19-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 32]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.33 in stage
>>>>>>>>> 16291.0 (TID 1042680) on executor nodedn1-22-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 33]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.34 in stage
>>>>>>>>> 16291.0 (TID 1042681) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 34]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.35 in stage
>>>>>>>>> 16291.0 (TID 1042682) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 35]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.36 in stage
>>>>>>>>> 16291.0 (TID 1042687) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 36]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.37 in stage
>>>>>>>>> 16291.0 (TID 1042689) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 37]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.38 in stage
>>>>>>>>> 16291.0 (TID 1042690) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 38]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.39 in stage
>>>>>>>>> 16291.0 (TID 1042693) on executor nodedn1-22-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 39]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.40 in stage
>>>>>>>>> 16291.0 (TID 1042697) on executor nodedn1-22-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 40]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.41 in stage
>>>>>>>>> 16291.0 (TID 1042700) on executor nodedn1-13-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 41]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.42 in stage
>>>>>>>>> 16291.0 (TID 1042706) on executor nodedn1-15-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 42]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.43 in stage
>>>>>>>>> 16291.0 (TID 1042710) on executor nodedn1-14-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 43]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.44 in stage
>>>>>>>>> 16291.0 (TID 1042713) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 44]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.45 in stage
>>>>>>>>> 16291.0 (TID 1042716) on executor nodedn1-19-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 45]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.46 in stage
>>>>>>>>> 16291.0 (TID 1042718) on executor nodedn1-15-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 46]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.47 in stage
>>>>>>>>> 16291.0 (TID 1042720) on executor nodedn1-12-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 47]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.48 in stage
>>>>>>>>> 16291.0 (TID 1042724) on executor nodedn1-12-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 48]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.49 in stage
>>>>>>>>> 16291.0 (TID 1042729) on executor nodedn1-16-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 49]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.50 in stage
>>>>>>>>> 16291.0 (TID 1042730) on executor nodedn1-21-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 50]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.51 in stage
>>>>>>>>> 16291.0 (TID 1042733) on executor nodedn1-21-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 51]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.52 in stage
>>>>>>>>> 16291.0 (TID 1042736) on executor nodedn1-19-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 52]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.53 in stage
>>>>>>>>> 16291.0 (TID 1042740) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 53]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.54 in stage
>>>>>>>>> 16291.0 (TID 1042743) on executor nodedn1-22-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 54]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.55 in stage
>>>>>>>>> 16291.0 (TID 1042745) on executor nodedn1-18-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 55]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage
>>>>>>>>> 16291.0 (TID 1042754) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 56]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage
>>>>>>>>> 16291.0 (TID 1042758) on executor nodedn1-17-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 57]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage
>>>>>>>>> 16291.0 (TID 1042762) on executor nodedn1-12-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 58]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage
>>>>>>>>> 16291.0 (TID 1042766) on executor nodedn1-23-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 59]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage
>>>>>>>>> 16291.0 (TID 1042774) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 60]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage
>>>>>>>>> 16291.0 (TID 1042779) on executor nodedn1-13-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 61]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage
>>>>>>>>> 16291.0 (TID 1042789) on executor nodedn1-20-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 62]
>>>>>>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage
>>>>>>>>> 16291.0 (TID 1042793) on executor nodedn1-15-acme.com:
>>>>>>>>> java.lang.Exception (Could not compute split, block input-4-1423758372200
>>>>>>>>> not found) [duplicate 63]
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>>> Task 54 in stage 16291.0 failed 64 times, most recent failure: Lost task
>>>>>>>>> 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com):
>>>>>>>>> java.lang.Exception: Could not compute split, block input-4-1423758372200
>>>>>>>>> not found
>>>>>>>>> Exception in thread "main" org.apache.spark.SparkException: Job
>>>>>>>>> aborted due to stage failure: Task 54 in stage 16291.0 failed 64 times,
>>>>>>>>> most recent failure: Lost task 54.63 in stage 16291.0 (TID 1042793,
>>>>>>>>> nodedn1-15-acme.com): java.lang.Exception: Could not compute
>>>>>>>>> split, block input-4-1423758372200 not found
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks for looking into it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das <
>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Tim,
>>>>>>>>>>
>>>>>>>>>> Let me get the key points.
>>>>>>>>>> 1. If you are not writing back to Kafka, the delay is stable?
>>>>>>>>>> That is, instead of "foreachRDD { // write to kafka }"  if you do
>>>>>>>>>> "dstream.count", then the delay is stable. Right?
>>>>>>>>>> 2. If so, then Kafka is the bottleneck. Is the number of
>>>>>>>>>> partitions, that you spoke of the in the second mail, that determines the
>>>>>>>>>> parallelism in writes? Is it stable with 30 partitions?
>>>>>>>>>>
>>>>>>>>>> Regarding the block exception, could you give me a trace of info
>>>>>>>>>> level logging that leads to this error? Basically I want trace the
>>>>>>>>>> lifecycle of the block.
>>>>>>>>>>
>>>>>>>>>> TD
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith <secsubs@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Gerard,
>>>>>>>>>>>
>>>>>>>>>>> Great write-up and really good guidance in there.
>>>>>>>>>>>
>>>>>>>>>>> I have to be honest, I don't know why but setting # of
>>>>>>>>>>> partitions for each dStream to a low number (5-10) just causes the app to
>>>>>>>>>>> choke/crash. Setting it to 20 gets the app going but with not so great
>>>>>>>>>>> delays. Bump it up to 30 and I start winning the war where processing time
>>>>>>>>>>> is consistently below batch time window (20 seconds) except for a batch
>>>>>>>>>>> every few batches where the compute time spikes 10x the usual.
>>>>>>>>>>>
>>>>>>>>>>> Following your guide, I took out some "logInfo" statements I had
>>>>>>>>>>> in the app but didn't seem to make much difference :(
>>>>>>>>>>>
>>>>>>>>>>> With a higher time window (20 seconds), I got the app to run
>>>>>>>>>>> stably for a few hours but then ran into the dreaded "java.lang.Exception:
>>>>>>>>>>> Could not compute split, block input-0-1423761240800 not found". Wonder if
>>>>>>>>>>> I need to add RDD persistence back?
>>>>>>>>>>>
>>>>>>>>>>> Also, I am reaching out to Virdata with some ProServ inquiries.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas <
>>>>>>>>>>> gerard.maas@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Tim,
>>>>>>>>>>>>
>>>>>>>>>>>> From this: " There are 5 kafka receivers and each incoming
>>>>>>>>>>>> stream is split into 40 partitions"  I suspect that you're
>>>>>>>>>>>> creating too many tasks for Spark to process on time.
>>>>>>>>>>>> Could you try some of the 'knobs' I describe here to see if
>>>>>>>>>>>> that would help?
>>>>>>>>>>>>
>>>>>>>>>>>> http://www.virdata.com/tuning-spark/
>>>>>>>>>>>>
>>>>>>>>>>>> -kr, Gerard.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith <secsubs@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Just read the thread "Are these numbers abnormal for spark
>>>>>>>>>>>>> streaming?" and I think I am seeing similar results - that is - increasing
>>>>>>>>>>>>> the window seems to be the trick here. I will have to monitor for a few
>>>>>>>>>>>>> hours/days before I can conclude (there are so many knobs/dials).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith <secsubs@gmail.com
>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Spark 1.2 (have been seeing this behaviour since 1.0), I
>>>>>>>>>>>>>> have a streaming app that consumes data from Kafka and writes it back to
>>>>>>>>>>>>>> Kafka (different topic). My big problem has been Total Delay. While
>>>>>>>>>>>>>> execution time is usually <window size (in seconds), the total delay ranges
>>>>>>>>>>>>>> from a minutes to hours(s) (keeps going up).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For a little while, I thought I had solved the issue by
>>>>>>>>>>>>>> bumping up the driver memory. Then I expanded my Kafka cluster to add more
>>>>>>>>>>>>>> nodes and the issue came up again. I tried a few things to smoke out the
>>>>>>>>>>>>>> issue and something tells me the driver is the bottleneck again:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) From my app, I took out the entire write-out-to-kafka
>>>>>>>>>>>>>> piece. Sure enough, execution, scheduling delay and hence total delay fell
>>>>>>>>>>>>>> to sub second. This assured me that whatever processing I do before writing
>>>>>>>>>>>>>> back to kafka isn't the bottleneck.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2) In my app, I had RDD persistence set at different points
>>>>>>>>>>>>>> but my code wasn't really re-using any RDDs so I took out all explicit
>>>>>>>>>>>>>> persist() statements. And added, "spar...unpersist" to "true" in the
>>>>>>>>>>>>>> context. After this, it doesn't seem to matter how much memory I give my
>>>>>>>>>>>>>> executor, the total delay seems to be in the same range. I tried per
>>>>>>>>>>>>>> executor memory from 2G to 12G with no change in total delay so executors
>>>>>>>>>>>>>> aren't memory starved. Also, in the SparkUI, under the Executors tab, all
>>>>>>>>>>>>>> executors show 0/1060MB used when per executor memory is set to 2GB, for
>>>>>>>>>>>>>> example.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3) Input rate in the kafka consumer restricts spikes in
>>>>>>>>>>>>>> incoming data.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 4) Tried FIFO and FAIR but didn't make any difference.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 5) Adding executors beyond a certain points seems useless (I
>>>>>>>>>>>>>> guess excess ones just sit idle).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> At any given point in time, the SparkUI shows only one batch
>>>>>>>>>>>>>> pending processing. So with just one batch pending processing, why would
>>>>>>>>>>>>>> the scheduling delay run into minutes/hours if execution time is within the
>>>>>>>>>>>>>> batch window duration? There aren't any failed stages or jobs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Right now, I have 100 executors ( i have tried setting
>>>>>>>>>>>>>> executors from 50-150), each with 2GB and 4 cores and the driver running
>>>>>>>>>>>>>> with 16GB. There are 5 kafka receivers and each incoming stream is split
>>>>>>>>>>>>>> into 40 partitions. Per receiver, input rate is restricted to 20000
>>>>>>>>>>>>>> messages per second.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can anyone help me with clues or areas to look into, for
>>>>>>>>>>>>>> troubleshooting the issue?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One nugget I found buried in the code says:
>>>>>>>>>>>>>> "The scheduler delay includes the network delay to send the
>>>>>>>>>>>>>> task to the worker machine and to send back the result (but not the time to
>>>>>>>>>>>>>> fetch the task result, if it needed to be fetched from the block manager on
>>>>>>>>>>>>>> the worker)."
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Could this be an issue with the driver being a bottlneck? All
>>>>>>>>>>>>>> the executors posting their logs/stats to the driver?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Tim
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message