spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Snehal Nagmote <nagmote.sne...@gmail.com>
Subject Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out
Date Tue, 09 Jun 2015 19:32:21 GMT
Hi Dibyendu,

Thank you for your reply.

I am using Kafka https://github.com/dibbhatt/kafka-spark-consumer which
uses spark-core and spark-streaming  *1.2.2*

Spark cluster on which I am running application is* 1.3.1* . I will test it
with latest changes .

Yes Underlying BlockManager gives error and tries 3 times and then die.
Problem is even if it dies,  I cant restart it because application status
says running

Looks like its related to the issue
https://issues.apache.org/jira/browse/SPARK-5220


Thanks,
Snehal







On 8 June 2015 at 20:56, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> Seems to be related to this JIRA :
> https://issues.apache.org/jira/browse/SPARK-3612 ?
>
>
>
> On Tue, Jun 9, 2015 at 7:39 AM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> Hi Snehal
>>
>> Are you running the latest kafka consumer from github/spark-packages ? If
>> not can you take the latest changes. This low level receiver will make
>> attempt to keep trying if underlying BlockManager gives error. Are you see
>> those retry cycle in log ? If yes then there is issue writing blocks to
>> blockmanager and spark not able to recover from this failure but Receivet
>> keep trying ..
>>
>> Which version of Spark you are using ?
>>
>> Dibyendu
>> On Jun 9, 2015 5:14 AM, "Snehal Nagmote" <nagmote.snehal@gmail.com>
>> wrote:
>>
>>> All,
>>>
>>> I am using Kafka Spark Consumer
>>> https://github.com/dibbhatt/kafka-spark-consumer  in  spark streaming
>>> job .
>>>
>>> After spark streaming job runs for few hours , all executors exit and I
>>> still see status of application on SPARK UI as running
>>>
>>> Does anyone know cause of this exception and how to fix this ?
>>>
>>>
>>>  WARN  [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - Error
reported by receiver for stream 7: Error While Store for Partition Partition{host=dal-kafka-broker01.bfd.walmart.com:9092,
partition=27} - org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(2,
dfw-searcher.com, 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 1),10492,0,0)]
>>> 	at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>> 	at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
>>> 	at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
>>> 	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
>>> 	at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812)
>>> 	at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
>>> 	at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71)
>>> 	at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161)
>>> 	at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136)
>>> 	at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152)
>>> 	at consumer.kafka.PartitionManager.next(PartitionManager.java:215)
>>> 	at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75)
>>> 	at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30
seconds]
>>> 	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>> 	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> 	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>> 	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>> 	at scala.concurrent.Await$.result(package.scala:107)
>>> 	at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>
>>> 	... 14 more WARN  [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92]
- Error sending message [message = UpdateBlockInfo(BlockManagerId(<driver>, dfw-searcher.com,
57286),broadcast_10665_piece0,StorageLevel(false, false, false, false, 1),0,0,0)] in 2 attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>>> 	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>> 	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> 	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>> 	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
>>> 	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
>>> 	at scala.concurrent.Await$.result(package.scala:107)
>>> 	at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>> 	at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
>>> 	at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
>>> 	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
>>> 	at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
>>> 	at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1104)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081)
>>> 	at scala.collection.immutable.Set$Set2.foreach(Set.scala:94)
>>> 	at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1081)
>>> 	at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcI$sp(BlockManagerSlaveActor.scala:63)
>>> 	at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply(BlockManagerSlaveActor.scala:63)
>>> 	at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply(BlockManagerSlaveActor.scala:63)
>>> 	at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
>>> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> Thanks,
>>>
>>> Snehal
>>>
>>
>

Mime
View raw message