spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: [0.9.0] Possible deadlock in shutdown hook?
Date Fri, 07 Feb 2014 07:49:40 GMT
I think we can enumerate all current threads with the ThreadMXBean, filter
to those threads with the name of executor pool in them, and interrupt them.

http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29

The executor threads are currently named according to the pattern "Executor
task launch worker-X"


On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das
<tathagata.das1565@gmail.com>wrote:

> That definitely sound more reliable. Worth trying out if there is a
> reliable way of reproducing the deadlock-like scenario.
>
> TD
>
>
> On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <matei.zaharia@gmail.com
> >wrote:
>
> > I don't think we necessarily want to do this through the DAGScheduler
> > because the worker might also shut down due to some unusual termination
> > condition, like the driver node crashing. Can't we do it at the top of
> the
> > shutdown hook instead? If all the threads are in the same thread pool it
> > might be possible to interrupt or stop the whole pool.
> >
> > Matei
> >
> > On Feb 6, 2014, at 11:30 PM, Andrew Ash <andrew@andrewash.com> wrote:
> >
> > > That's genius.  Of course when a worker is told to shutdown it should
> > > interrupt its worker threads -- I think that would address this issue.
> > >
> > > Are you thinking to put
> > >
> > > running.map(_.jobId).foreach { handleJobCancellation }
> > >
> > > at the top of the StopDAGScheduler block?
> > >
> > >
> > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> > > <tathagata.das1565@gmail.com>wrote:
> > >
> > >> Its highly likely that the executor with the threadpool that runs the
> > tasks
> > >> are the only set of threads that writes to disk. The tasks are
> designed
> > to
> > >> be interrupted when the corresponding job is cancelled. So a
> reasonably
> > >> simple way could be to actually cancel the currently active jobs,
> which
> > >> would send the signal to the worker to stop the tasks. Currently, the
> > >> DAGScheduler<
> > >>
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> > >>> does
> > >> not seem to actually cancel the jobs, only mark them as failed. So it
> > >> may be a simple addition.
> > >>
> > >> There may be some complications with the external spilling of shuffle
> > data
> > >> to disk not stopping immediately when the task is marked for killing.
> > Gotta
> > >> try it out.
> > >>
> > >> TD
> > >>
> > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <andrew@andrewash.com>
> > wrote:
> > >>
> > >>> There is probably just one threadpool that has task threads -- is it
> > >>> possible to enumerate and interrupt just those?  We may need to keep
> > >> string
> > >>> a reference to that threadpool through to the shutdown thread to make
> > >> that
> > >>> happen.
> > >>>
> > >>>
> > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <
> mridul@gmail.com
> > >>>> wrote:
> > >>>
> > >>>> Ideally, interrupting the thread writing to disk should be
> sufficient
> > >>>> - though since we are in middle of shutdown when this is happening,
> it
> > >>>> is best case effort anyway.
> > >>>> Identifying which threads to interrupt will be interesting since
> most
> > >>>> of them are driven by threadpool's and we cant list all threads
and
> > >>>> interrupt all of them !
> > >>>>
> > >>>>
> > >>>> Regards,
> > >>>> Mridul
> > >>>>
> > >>>>
> > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <andrew@andrewash.com>
> > >> wrote:
> > >>>>> I think the solution where we stop the writing threads and
then let
> > >> the
> > >>>>> deleting threads completely clean up is the best option since
the
> > >> final
> > >>>>> state doesn't have half-deleted temp dirs scattered across
the
> > >> cluster.
> > >>>>>
> > >>>>> How feasible do you think it'd be to interrupt the other threads?
> > >>>>>
> > >>>>>
> > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> > >> mridul@gmail.com
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Looks like a pathological corner case here - where the
the delete
> > >>>>>> thread is not getting run while the OS is busy prioritizing
the
> > >> thread
> > >>>>>> writing data (probably with heavy gc too).
> > >>>>>> Ideally, the delete thread would list files, remove them
and then
> > >> fail
> > >>>>>> when it tries to remove the non empty directory (since
other
> thread
> > >>>>>> might be creating more in parallel).
> > >>>>>>
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Mridul
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <andrew@andrewash.com>
> > >>>> wrote:
> > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS
> machine).
> > >>>>>>>
> > >>>>>>> Build spark, run a master and a worker with the sbin/start-all.sh
> > >>>> script,
> > >>>>>>> then run this in a shell:
> > >>>>>>>
> > >>>>>>> import org.apache.spark.storage.StorageLevel._
> > >>>>>>> val s = sc.parallelize(1 to
> > >>> 1000000000).persist(MEMORY_AND_DISK_SER);
> > >>>>>>> s.count
> > >>>>>>>
> > >>>>>>> After about a minute, this line appears in the shell
logging
> > >> output:
> > >>>>>>>
> > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> > >>> BlockManager
> > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895,
0) with no
> > >> recent
> > >>>>>> heart
> > >>>>>>> beats: 57510ms exceeds 45000ms
> > >>>>>>>
> > >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a
master, and a
> > >>>>>>> CoarseGrainedExecutorBackend.
> > >>>>>>>
> > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
> > >>> stacktraces.
> > >>>> I
> > >>>>>>> waited around for 15min then kill -9'd the JVM and
restarted the
> > >>>> process.
> > >>>>>>>
> > >>>>>>> I wonder if what's happening here is that the threads
that are
> > >>> spewing
> > >>>>>> data
> > >>>>>>> to disk (as that parallelize and persist would do)
can write to
> > >> disk
> > >>>>>> faster
> > >>>>>>> than the cleanup threads can delete from disk.
> > >>>>>>>
> > >>>>>>> What do you think of that theory?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Andrew
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan
<
> > >>> mridul@gmail.com
> > >>>>>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> shutdown hooks should not take 15 mins are you
mentioned !
> > >>>>>>>> On the other hand, how busy was your disk when
this was
> > >> happening ?
> > >>>>>>>> (either due to spark or something else ?)
> > >>>>>>>>
> > >>>>>>>> It might just be that there was a lot of stuff
to remove ?
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>> Mridul
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <
> andrew@andrewash.com
> > >>>
> > >>>>>> wrote:
> > >>>>>>>>> Hi Spark devs,
> > >>>>>>>>>
> > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala
spark shell on
> > >>> 0.9.0
> > >>>> one
> > >>>>>>>>> of
> > >>>>>>>>> my workers goes dead in the spark master UI.
 I'm using the
> > >>>> standalone
> > >>>>>>>>> cluster and didn't ever see this while using
0.8.0 so I think
> > >> it
> > >>>> may
> > >>>>>> be
> > >>>>>>>>> a
> > >>>>>>>>> regression.
> > >>>>>>>>>
> > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend
JVM with
> > >>>> jstack
> > >>>>>> and
> > >>>>>>>>> jmap -heap, it doesn't respond unless I add
the -F force flag.
> > >>> The
> > >>>>>> heap
> > >>>>>>>>> isn't full, but there are some interesting
bits in the jstack.
> > >>>> Poking
> > >>>>>>>>> around a little, I think there may be some
kind of deadlock in
> > >>> the
> > >>>>>>>>> shutdown
> > >>>>>>>>> hooks.
> > >>>>>>>>>
> > >>>>>>>>> Below are the threads I think are most interesting:
> > >>>>>>>>>
> > >>>>>>>>> Thread 14308: (state = BLOCKED)
> > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
(Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109
(Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962
(Interpreted
> > >>> frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted
frame)
> > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> > >> @bci=25,
> > >>>>>>>>> line=498
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope)
@bci=39,
> > >>>>>> line=456
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int,
long) @bci=24,
> > >>>> line=237
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219
(Interpreted
> > >>>> frame)
> > >>>>>>>>> -
> > >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> > >>>>>>>>> @bci=4, line=386 (Interpreted frame)
> > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec()
@bci=10,
> > >>>> line=260
> > >>>>>>>>> (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> > >>>>>>>>> @bci=10, line=1339 (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> > >>>>>>>>> @bci=11, line=1979 (Compiled frame)
> > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> > >> @bci=14,
> > >>>>>>>>> line=107
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>>
> > >>>>>>>>> Thread 3865: (state = BLOCKED)
> > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted
frame)
> > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280
(Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354
(Interpreted
> > >> frame)
> > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks()
@bci=87,
> > >>> line=106
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run()
@bci=0, line=46
> > >>>>>>>>> (Interpreted
> > >>>>>>>>> frame)
> > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123
(Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167
(Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212
(Interpreted
> > >>>> frame)
> > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal)
@bci=8,
> > >> line=52
> > >>>>>>>>> (Interpreted frame)
> > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212
(Interpreted frame)
> > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744
(Interpreted frame)
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Thread 3987: (state = BLOCKED)
> > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File)
@bci=0
> > >> (Interpreted
> > >>>>>> frame)
> > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted
frame)
> > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201
(Compiled frame)
> > >>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> > >>>> @bci=1,
> > >>>>>>>>> line=466 (Interpreted frame)
> > >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > >>>>>> @bci=9,
> > >>>>>>>>> line=478 (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> > >>>>>>>>> @bci=4, line=479 (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> > >>>>>>>>> @bci=5, line=478 (Compiled frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled
frame)
> > >>>>>>>>> -
> > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> > >>>>>>>>> @bci=2,
> > >>>>>>>>> line=34 (Compiled frame)
> > >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> > >>>>>> @bci=19,
> > >>>>>>>>> line=478 (Interpreted frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> > >>>>>>>>> @bci=14, line=141 (Interpreted frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> > >>>>>>>>> @bci=5, line=139 (Interpreted frame)
> > >>>>>>>>> -
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled
frame)
> > >>>>>>>>> -
> > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> > >>>>>>>>> @bci=2,
> > >>>>>>>>> line=108 (Interpreted frame)
> > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> > >>> @bci=39,
> > >>>>>>>>> line=139 (Interpreted frame)
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> I think what happened here is that thread 14308
received the
> > >> akka
> > >>>>>>>>> "shutdown" message and called System.exit().
 This started
> > >> thread
> > >>>>>> 3865,
> > >>>>>>>>> which is the JVM shutting itself down.  Part
of that process is
> > >>>>>> running
> > >>>>>>>>> the
> > >>>>>>>>> shutdown hooks, so it started thread 3987.
 That thread is the
> > >>>>>> shutdown
> > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala,
which
> > >>> looks
> > >>>>>> like
> > >>>>>>>>> this:
> > >>>>>>>>>
> > >>>>>>>>>  private def addShutdownHook() {
> > >>>>>>>>>    localDirs.foreach(localDir =>
> > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
> > >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete
Spark
> > >>>> local
> > >>>>>>>>> dirs") {
> > >>>>>>>>>      override def run() {
> > >>>>>>>>>        logDebug("Shutdown hook called")
> > >>>>>>>>>        localDirs.foreach { localDir =>
> > >>>>>>>>>          try {
> > >>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> > >>>>>>>>> Utils.deleteRecursively(localDir)
> > >>>>>>>>>          } catch {
> > >>>>>>>>>            case t: Throwable =>
> > >>>>>>>>>              logError("Exception while deleting
local spark
> > >> dir:
> > >>>> " +
> > >>>>>>>>> localDir, t)
> > >>>>>>>>>          }
> > >>>>>>>>>        }
> > >>>>>>>>>
> > >>>>>>>>>        if (shuffleSender != null) {
> > >>>>>>>>>          shuffleSender.stop()
> > >>>>>>>>>        }
> > >>>>>>>>>      }
> > >>>>>>>>>    })
> > >>>>>>>>>  }
> > >>>>>>>>>
> > >>>>>>>>> It goes through and deletes the directories
recursively.  I was
> > >>>>>> thinking
> > >>>>>>>>> there might be some issues with concurrently-running
shutdown
> > >>> hooks
> > >>>>>>>>> deleting things out from underneath each other
(shutdown hook
> > >>>> javadocs
> > >>>>>>>>> say
> > >>>>>>>>> they're all started in parallel if multiple
hooks are added)
> > >>>> causing
> > >>>>>> the
> > >>>>>>>>> File.list() in that last thread to take quite
some time.
> > >>>>>>>>>
> > >>>>>>>>> While I was looking through the stacktrace
the JVM finally
> > >> exited
> > >>>>>> (after
> > >>>>>>>>> 15-20min at least) so I won't be able to debug
more until this
> > >>> bug
> > >>>>>>>>> strikes
> > >>>>>>>>> again.
> > >>>>>>>>>
> > >>>>>>>>> Any ideas on what might be going on here?
> > >>>>>>>>>
> > >>>>>>>>> Thanks!
> > >>>>>>>>> Andrew
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message