Hi all,

 

I am running a Spark streaming application with ReliableKafkaReceiver (Spark 1.2.0). Constantly I was getting the following exception:

 

15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread

java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:107)

        at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)

        at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)

        at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)

        at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)

        at org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)

        at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)

        at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)

        at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)

        at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)

 

After the exception, ReliableKafkaReceiver stayed in ACTIVE status but stopped receiving data from Kafka. The Kafka message handler thread is in BLOCKED state:

 

Thread 92: KafkaMessageHandler-0 (BLOCKED)

org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123)

org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185)

org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247)

java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

java.util.concurrent.FutureTask.run(FutureTask.java:262)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

java.lang.Thread.run(Thread.java:745)

 

Sometimes when the exception was thrown, I also see warning messages like this:

15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 30533ms (threshold=30000ms); ack: seqno: 113 status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 30524893062, targets: [172.20.xxx.xxx:50010, 172.20.xxx.xxx:50010]

15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms (threshold=30000ms)

 

In the past, I never have such problem with KafkaReceiver. What causes this exception? How can I solve this problem?

 

Thanks in advance,

Max