spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manoj Samel <manojsamelt...@gmail.com>
Subject Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
Date Mon, 23 Mar 2015 21:15:12 GMT
Found the issue above error - the setting for spark_shuffle was incomplete.

Now it is able to ask and get additional executors. The issue is once they
are released, it is not able to proceed with next query.

The environment is CDH 5.3.2 (Hadoop 2.5) with Kerberos & Spark 1.3

After idle time, the release of executor gives a stack trace under WARN and
returns to the prompt (in spark-shell)

15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 2
15/03/23 20:55:50 INFO ExecutorAllocationManager: Removing executor 2
because it has been idle for 60 seconds (new desired total will be 6)
15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 5
15/03/23 20:55:50 INFO ExecutorAllocationManager: Removing executor 5
because it has been idle for 60 seconds (new desired total will be 5)
15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 1
15/03/23 20:55:51 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@xxxl:47358] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:55:51 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@yyy:51807] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:55:52 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@zzz:54623] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:56:20 WARN AkkaUtils: Error sending message [message =
KillExecutors(ArrayBuffer(1))] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:171)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply$mcV$sp(YarnSchedulerBackend.scala:136)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(YarnSchedulerBackend.scala:136)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(YarnSchedulerBackend.scala:136)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/23 20:56:20 WARN AkkaUtils: Error sending message [message =
KillExecutors(ArrayBuffer(1))] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:171)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doKillExecutors(YarnSchedulerBackend.scala:68)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:375)
at org.apache.spark.SparkContext.killExecutors(SparkContext.scala:1173)
at
org.apache.spark.ExecutorAllocationClient$class.killExecutor(ExecutorAllocationClient.scala:49)
at org.apache.spark.SparkContext.killExecutor(SparkContext.scala:1186)
at org.apache.spark.ExecutorAllocationManager.org
$apache$spark$ExecutorAllocationManager$$removeExecutor(ExecutorAllocationManager.scala:353)
at
org.apache.spark.ExecutorAllocationManager$$anonfun$org$apache$spark$ExecutorAllocationManager$$schedule$1.apply(ExecutorAllocationManager.scala:237)
at
org.apache.spark.ExecutorAllocationManager$$anonfun$org$apache$spark$ExecutorAllocationManager$$schedule$1.apply(ExecutorAllocationManager.scala:234)
at
scala.collection.mutable.MapLike$$anonfun$retain$2.apply(MapLike.scala:213)
at
scala.collection.mutable.MapLike$$anonfun$retain$2.apply(MapLike.scala:212)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.mutable.MapLike$class.retain(MapLike.scala:212)
at scala.collection.mutable.AbstractMap.retain(Map.scala:91)
at org.apache.spark.ExecutorAllocationManager.org
$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:234)
at
org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:191)

The next query hangs with following output

......
15/03/23 20:57:27 INFO MemoryStore: Block broadcast_4_piece0 stored as
bytes in memory (estimated size 4.5 KB, free 264.9 MB)
15/03/23 20:57:27 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory
on xxx:45903 (size: 4.5 KB, free: 265.1 MB)
15/03/23 20:57:27 INFO BlockManagerMaster: Updated info of block
broadcast_4_piece0
15/03/23 20:57:27 INFO SparkContext: Created broadcast 4 from broadcast at
DAGScheduler.scala:839
15/03/23 20:57:27 INFO DAGScheduler: Submitting 10 missing tasks from Stage
2 (MapPartitionsRDD[11] at mapPartitions at Exchange.scala:64)
15/03/23 20:57:27 INFO YarnScheduler: Adding task set 2.0 with 10 tasks
15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(2, yyy, 37231) with no recent heart beats: 131006ms exceeds
120000ms
15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(5, zzz, 34437) with no recent heart beats: 131329ms exceeds
120000ms
15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(1, bbb, 52747) with no recent heart beats: 128199ms exceeds
120000ms
15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(2, yyy, 37231)
15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(1, bbb, 52747)
15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(5, zzz, 34437)



On Sat, Mar 21, 2015 at 6:51 AM, Ted Yu <yuzhihong@gmail.com> wrote:

> bq. Requesting 1 new executor(s) because tasks are backlogged
>
> 1 executor was requested.
>
> Which hadoop release are you using ?
>
> Can you check resource manager log to see if there is some clue ?
>
> Thanks
>
> On Fri, Mar 20, 2015 at 4:17 PM, Manoj Samel <manojsameltech@gmail.com>
> wrote:
>
>> Forgot to add - the cluster is idle otherwise so there should be no
>> resource issues. Also the configuration works when not using Dynamic
>> allocation.
>>
>> On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel <manojsameltech@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Running Spark 1.3 with secured Hadoop.
>>>
>>> Spark-shell with Yarn client mode runs without issue when not using
>>> Dynamic Allocation.
>>>
>>> When Dynamic allocation is turned on, the shell comes up but same SQL
>>> etc. causes it to loop.
>>>
>>> spark.dynamicAllocation.enabled=true
>>> spark.dynamicAllocation.initialExecutors=1
>>> spark.dynamicAllocation.maxExecutors=10
>>> # Set IdleTime low for testing
>>> spark.dynamicAllocation.executorIdleTimeout=60
>>> spark.shuffle.service.enabled=true
>>>
>>> Following is the start of the messages and then it keeps looping with
>>> "Requesting 0 new executors"
>>>
>>> 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
>>> broadcast_1_piece0
>>> 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
>>> broadcast at DAGScheduler.scala:839
>>> 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing
>>> tasks from Stage 0 (MapPartitionsRDD[3] at mapPartitions at
>>> Exchange.scala:100)
>>> 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
>>> tasks
>>> 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>> 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>> 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient resources
>>> 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>> 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>> 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>> 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>> 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient resources
>>> 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>> 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>> 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new
>>> executor(s) because tasks are backlogged (new desired total will be 1)
>>>
>>
>>
>

Mime
View raw message