spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Max Xu (JIRA)" <>
Subject [jira] [Commented] (SPARK-5220) keepPushingBlocks in BlockGenerator terminated when an exception occurs, which causes the block pushing thread to terminate and blocks receiver
Date Wed, 14 Jan 2015 13:54:34 GMT


Max Xu commented on SPARK-5220:

Current situation: 
1. The TimeoutException is thrown by ReliableKafkaReceiver, receiver log the error and do
nothing else;
2. Block pushing thread in BlockGenerator is terminated due to the exception;
3. ReliableKafkaReceiver is ACTIVE but rogues afterward because blocksForPushing queue is
full and no block pushing thread available to clean the queue.

So when the exception occurs, what user will see is the streaming application keeps running
but sits there doing nothing (We are using some supervisor to restart the failed app. But
if the app keeps running and doing nothing, we have to have some curator to monitoring the
data flow and kill the app). Can ReliableKafkaReceiver handle TimeoutException? For example
retry pushing block when a TimeoutException is thrown? Or restart itself? Or at least fail
the streaming application?

> keepPushingBlocks in BlockGenerator terminated when an exception occurs, which causes
the block pushing thread to terminate and blocks receiver  
> -------------------------------------------------------------------------------------------------------------------------------------------------
>                 Key: SPARK-5220
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
> I am running a Spark streaming application with ReliableKafkaReceiver. It uses BlockGenerator
to push blocks to BlockManager. However, writing WALs to HDFS may time out that causes keepPushingBlocks
in BlockGenerator to terminate.
> 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>         at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>         at scala.concurrent.Await$.result(package.scala:107)
>         at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
>         at
>         at$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
>         at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
>         at$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
>         at org.apache.spark.streaming.receiver.BlockGenerator$$anon$
> Then the block pushing thread is done and no subsequent blocks can be pushed into blockManager.
In turn this blocks receiver from receiving new data.
> So when running my app and the TimeoutException happens, the ReliableKafkaReceiver stays
in ACTIVE status but doesn't do anything at all. The application rogues.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message