spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Max Xu (JIRA)" <>
Subject [jira] [Commented] (SPARK-5220) keepPushingBlocks in BlockGenerator terminated when an exception occurs, which causes the block pushing thread to terminate and blocks receiver
Date Wed, 14 Jan 2015 13:35:34 GMT


Max Xu commented on SPARK-5220:

Hi Jerry, my point is that keepPushingBlocks in BlockGenerator should not terminate because
of any exception thrown by the receiver. With current implementation, if an exception is thrown,
even when it is not stopped, keepPushingBlocks will terminate, so does the block pushing thread.
It is not a matter of speed, it finished. No subsequent blocks can be pushed to BM because
there is not block pushing thread any more after the exception. This behavior contradict its

> keepPushingBlocks in BlockGenerator terminated when an exception occurs, which causes
the block pushing thread to terminate and blocks receiver  
> -------------------------------------------------------------------------------------------------------------------------------------------------
>                 Key: SPARK-5220
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
> I am running a Spark streaming application with ReliableKafkaReceiver. It uses BlockGenerator
to push blocks to BlockManager. However, writing WALs to HDFS may time out that causes keepPushingBlocks
in BlockGenerator to terminate.
> 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>         at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>         at scala.concurrent.Await$.result(package.scala:107)
>         at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
>         at
>         at$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
>         at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
>         at$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
>         at org.apache.spark.streaming.receiver.BlockGenerator$$anon$
> Then the block pushing thread is done and no subsequent blocks can be pushed into blockManager.
In turn this blocks receiver from receiving new data.
> So when running my app and the TimeoutException happens, the ReliableKafkaReceiver stays
in ACTIVE status but doesn't do anything at all. The application rogues.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message