spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Richard Marscher <rmarsc...@localytics.com>
Subject Re: Spark driver hangs on start of job
Date Thu, 02 Jul 2015 15:11:13 GMT
Ah I see, glad that simple patch works for your problem. That seems to be a
different underlying problem than we have been experiencing. In our case,
the executors are failing properly, its just that none of the new ones will
ever escape experiencing the same exact issue. So we start a death spiral
of thousands of failed executors, all of which can't connect with the
driver. Meanwhile, the driver just sits there in the "zombie" state doing
nothing while it waits for executors to respond. In that light, my solution
is geared towards solving things on the driver-side gracefully.


On Thu, Jul 2, 2015 at 4:37 AM, Sjoerd Mulder <sjoerdmulder@gmail.com>
wrote:

> Hi Richard,
>
> I have actually applied the following fix to our 1.4.0 version and this
> seem to resolve the zombies :)
>
> https://github.com/apache/spark/pull/7077/files
>
> Sjoerd
>
> 2015-06-26 20:08 GMT+02:00 Richard Marscher <rmarscher@localytics.com>:
>
>> Hi,
>>
>> we are on 1.3.1 right now so in case there are differences in the Spark
>> files I'll walk through the logic of what we did and post a couple gists at
>> the end. We haven't committed to forking Spark for our own deployments yet,
>> so right now we shadow some Spark classes in our application code with our
>> versions of the classes. Keep in mind I am not a Spark committer so the
>> following is a best effort basis that is working for us. But it may be that
>> someone more knowledgable about the Spark codebase might see a pitfall to
>> my solution or a better solution.
>>
>> --
>>
>> First, we'll start with the root issue in TaskSchedulerImpl. You will
>> find the code that prints the "Initial job has not accepted any resources"
>> warning inside the "submitTasks" function. Spark creates a separate thread
>> that checks some conditions every "STARVATION_TIMEOUT" milliseconds until
>> the submitted task set has been launched. It only posts the warn logging
>> here and does nothing. I will come back to this part of the code in a
>> moment.
>>
>> The code that determines when the "hasLaunchedTask" flag gets set (and
>> thus closes out the starvation thread and the task set is being worked on
>> by the cluster) is within the "resourceOffers" function. The various Spark
>> Scheduler Backend classes will periodically call this function in
>> TaskSchedulerImpl until cluster resources have been assigned to the task
>> set.
>>
>> To start signaling the zombied scenario, I created a new flag: "@volatile
>> private var hasZombied = false". In our experience we always get the
>> resources in resourceOffer before the starvation thread runs, otherwise we
>> have always hit the zombie scenario if resources weren't allocated yet. So
>> I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask =
>> true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied)
{
>> dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in
>> a moment.
>>
>> The last detail here is to add code inside the starvation thread block
>> after it posts the warning log. Set "hasZombied" to true and then call
>> "this.cancel()" to stop the starvation thread from continuing to run. With
>> this we now have all the steps needed inside TaskSchedulerImpl to start
>> signaling out the zombied condition.
>>
>> Back to the custom function. DAGScheduler has reference to the
>> appropriate Spark listeners that can propagate errors to the task set and
>> more importantly back to your application code. If you look at DAGScheduler
>> class, you will find a function called "cleanUpAfterSchedulerStop()". This
>> function does everything we want, except it is hard coded to a specific
>> exception "val error = new SparkException(...)". What I did was copy this
>> and made another function that returned a custom Exception I created that I
>> use to signal the zombie, something like
>> SparkTaskResourceAllocationZombieError. Now you call this function within
>> the conditional block in TaskSchedulerImpl.resourceOffers and you should
>> see your exception propagating out to your application code so you can take
>> appropriate actions.
>>
>> In our case, we are submitting Spark applications programmatically from a
>> Scala application service on an EC2 instance to a Spark Standalone cluster
>> in EC2. Whenever we see this error, the application service EC2 instance is
>> unable to get resources from the cluster even when attempting subsequent
>> Spark applications for a long period of time (it eventually recovers hours
>> or days later but that is not useful for us). So in our case we need to
>> reschedule the failed Spark application on another EC2 application instance
>> and shut down this current EC2 instance because it can no longer get
>> cluster resources. Your use case may be different, but at least action can
>> be taken at an application level.
>>
>> Here is some source code, you should be able to locate most of my
>> additions to the code by searching for comments starting with "//
>> Localytics Code"
>> TaskSchedulerImpl gist:
>> https://gist.github.com/rmarsch/e5d298e582ab75957957
>> DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7
>>
>> Regards,
>> Richard
>>
>> On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder <sjoerdmulder@gmail.com>
>> wrote:
>>
>>> Hi Richard,
>>>
>>> I would  like to see how we can get a workaround to get out of the
>>> Zombie situation since were planning for production :)
>>>
>>> If you could share the workaround or point directions that would be
>>> great!
>>>
>>> Sjoerd
>>>
>>> 2015-06-26 16:53 GMT+02:00 Richard Marscher <rmarscher@localytics.com>:
>>>
>>>> We've seen this issue as well in production. We also aren't sure what
>>>> causes it, but have just recently shaded some of the Spark code in
>>>> TaskSchedulerImpl that we use to effectively bubble up an exception from
>>>> Spark instead of zombie in this situation. If you are interested I can go
>>>> into more detail about that. Otherwise I'm also keen to find out more on
>>>> how this might be happening.
>>>>
>>>> On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder <sjoerdmulder@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a really annoying issue that i cannot replicate consistently,
>>>>> still it happens every +- 100 submissions. (it's a job that's running
every
>>>>> 3 minutes).
>>>>> Already reported an issue for this:
>>>>> https://issues.apache.org/jira/browse/SPARK-8592
>>>>>
>>>>> Here are the Thread dump of the Driver and the Executor:
>>>>> https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ
>>>>>
>>>>> Any direction is should look into?
>>>>>
>>>>> Spark 1.4.0
>>>>> Java 1.8.0_45 (Oracle Corporation)
>>>>> Scala 2.11.6
>>>>>
>>>>> I already tried to resolve the NPE by not logging the ActorRef. This
>>>>> makes the NPE go away :)
>>>>>
>>>>> But  the root cause lies deeper I expect, since then the driver then
>>>>> still hangs with the "*WARN TaskSchedulerImpl: Initial job has not
>>>>> accepted any resources; check your cluster UI to ensure that workers
are
>>>>> registered and have sufficient resources*" messages. But there are
>>>>> enough resources available in the cluster, it has plenty of CPU and Memory
>>>>> left.
>>>>>
>>>>> Logs from Driver:
>>>>>
>>>>> 15/06/26 11:58:19 INFO Remoting: Starting remoting
>>>>> 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on
>>>>> addresses :[akka.tcp://sparkDriver@172.17.0.123:51415]
>>>>> 15/06/26 11:58:19 INFO Utils: Successfully started service
>>>>> 'sparkDriver' on port 51415.
>>>>> 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker
>>>>> 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster
>>>>> 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at
>>>>> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630
>>>>> 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity
>>>>> 265.1 MB
>>>>> 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is
>>>>> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8
>>>>> 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server
>>>>> 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file
>>>>> server' on port 33176.
>>>>> 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator
>>>>> 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI'
>>>>> on port 4040.
>>>>> 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at
>>>>> http://172.17.0.123:4040
>>>>> 15/06/26 11:58:20 INFO SparkContext: Added JAR
>>>>> file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at
>>>>> http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with
>>>>> timestamp 1435319900257
>>>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master
>>>>> akka.tcp://sparkMaster@172.17.42.1:7077/user/Master...
>>>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark
>>>>> cluster with app ID app-20150626115820-0917
>>>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added:
>>>>> app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 (
>>>>> 10.0.7.171:47050) with 1 cores
>>>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor
>>>>> ID app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1
>>>>> cores, 2.0 GB RAM
>>>>> 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative
>>>>> execution thread
>>>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
>>>>> app-20150626115820-0917/0 is now LOADING
>>>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
>>>>> app-20150626115820-0917/0 is now RUNNING
>>>>> 15/06/26 11:58:20 INFO Utils: Successfully started service
>>>>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000.
>>>>> 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on
>>>>> 52000
>>>>> 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register
>>>>> BlockManager
>>>>> 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block
>>>>> manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver,
>>>>> 172.17.0.123, 52000)
>>>>> 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager
>>>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend
>>>>> is ready for scheduling beginning after reached
>>>>> minRegisteredResourcesRatio: 0.0
>>>>> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
>>>>> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
>>>>> 15/06/26 11:58:24 INFO SparkContext: Starting job: map at
>>>>> SparkProductEventAggregator.scala:144
>>>>> 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1
>>>>> [5cc3f53084]
>>>>> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from
>>>>> [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
>>>>> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping
>>>>> {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for
>>>>> [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
>>>>> 15/06/26 11:58:24 INFO DAGScheduler: Registering RDD 5 (map at
>>>>> SparkProductEventAggregator.scala:144)
>>>>> 15/06/26 11:58:24 INFO DAGScheduler: Got job 0 (map at
>>>>> SparkProductEventAggregator.scala:144) with 200 output partitions
>>>>> (allowLocal=false)
>>>>> 15/06/26 11:58:24 INFO DAGScheduler: Final stage: ResultStage 1(map at
>>>>> SparkProductEventAggregator.scala:144)
>>>>> 15/06/26 11:58:24 INFO DAGScheduler: Parents of final stage:
>>>>> List(ShuffleMapStage 0)
>>>>> 15/06/26 11:58:24 INFO DAGScheduler: Missing parents:
>>>>> List(ShuffleMapStage 0)
>>>>> 15/06/26 11:58:24 INFO DAGScheduler: Submitting ShuffleMapStage 0
>>>>> (MapPartitionsRDD[5] at map at SparkProductEventAggregator.scala:144),
>>>>> which has no missing parents
>>>>> 15/06/26 11:58:24 INFO MemoryStore: ensureFreeSpace(12384) called with
>>>>> curMem=0, maxMem=278019440
>>>>> 15/06/26 11:58:24 INFO MemoryStore: Block broadcast_0 stored as values
>>>>> in memory (estimated size 12.1 KB, free 265.1 MB)
>>>>> 15/06/26 11:58:24 INFO MemoryStore: ensureFreeSpace(5542) called with
>>>>> curMem=12384, maxMem=278019440
>>>>> 15/06/26 11:58:24 INFO MemoryStore: Block broadcast_0_piece0 stored as
>>>>> bytes in memory (estimated size 5.4 KB, free 265.1 MB)
>>>>> 15/06/26 11:58:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>>>>> memory on 172.17.0.123:52000 (size: 5.4 KB, free: 265.1 MB)
>>>>> 15/06/26 11:58:24 INFO SparkContext: Created broadcast 0 from
>>>>> broadcast at DAGScheduler.scala:874
>>>>> 15/06/26 11:58:24 INFO DAGScheduler: Submitting 5 missing tasks from
>>>>> ShuffleMapStage 0 (MapPartitionsRDD[5] at map at
>>>>> SparkProductEventAggregator.scala:144)
>>>>> 15/06/26 11:58:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 5
>>>>> tasks
>>>>> 15/06/26 11:58:39 WARN TaskSchedulerImpl: Initial job has not accepted
>>>>> any resources; check your cluster UI to ensure that workers are registered
>>>>> and have sufficient resources
>>>>> 15/06/26 11:58:54 WARN TaskSchedulerImpl: Initial job has not accepted
>>>>> any resources; check your cluster UI to ensure that workers are registered
>>>>> and have sufficient resources
>>>>> 15/06/26 11:59:09 WARN TaskSchedulerImpl: Initial job has not accepted
>>>>> any resources; check your cluster UI to ensure that workers are registered
>>>>> and have sufficient resources
>>>>>
>>>>>
>>>>> Sjoerd
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message