Hi Max, 

Is it possible for you to try Kafka Low Level Consumer which I have written which is also part of Spark-Packages . This Consumer also a Reliable Consumer having no data loss on Receiver failure. I have tested this with Spark 1.2 with  spark.streaming.receiver.writeAheadLog.enable as "true", and this consumer is pulling messages from Kafka and store in WAL .  

I have tested this consumer with Spark 1.2 with WAL feature enabled with a large Kafka backlog ( around 6 million messages), and it pulls without any issue.

With WAL feature enabled, the throughput will be impacted . 

You can find this consumer here : https://github.com/dibbhatt/kafka-spark-consumer

Here is the reference of it in spark-package : http://spark-packages.org/package/5

If you find some issue configuring this, you can reach me.

Regards, 
Dibyendu





On Tue, Jan 13, 2015 at 1:40 AM, Max Xu <Max.Xu@twosigma.com> wrote:

Hi all,

 

I am running a Spark streaming application with ReliableKafkaReceiver (Spark 1.2.0). Constantly I was getting the following exception:

 

15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread

java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:107)

        at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)

        at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)

        at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)

        at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)

        at org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)

        at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)

        at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)

        at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)

        at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)

 

After the exception, ReliableKafkaReceiver stayed in ACTIVE status but stopped receiving data from Kafka. The Kafka message handler thread is in BLOCKED state:

 

Thread 92: KafkaMessageHandler-0 (BLOCKED)

org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123)

org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185)

org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247)

java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

java.util.concurrent.FutureTask.run(FutureTask.java:262)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

java.lang.Thread.run(Thread.java:745)

 

Sometimes when the exception was thrown, I also see warning messages like this:

15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 30533ms (threshold=30000ms); ack: seqno: 113 status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 30524893062, targets: [172.20.xxx.xxx:50010, 172.20.xxx.xxx:50010]

15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms (threshold=30000ms)

 

In the past, I never have such problem with KafkaReceiver. What causes this exception? How can I solve this problem?

 

Thanks in advance,

Max