spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "K. Shankari" <shank...@eecs.berkeley.edu>
Subject Re: Spark worker processes hang while processing small 2MB dataset
Date Tue, 03 Dec 2013 04:32:41 GMT
I compiled against 0.8.0-incubating. And I am running against
0.8.0-incubating.

Thanks,
Shankari


On Mon, Dec 2, 2013 at 8:18 PM, Patrick Wendell <pwendell@gmail.com> wrote:

> The traces just show that your cluster is trying to shut down and
> getting hung somewhere while shutting down, so unfortunately that
> doesn't tell us much. The key question is why everything wants to shut
> down in the first place.
>
> One thing I was wondering was if maybe you'd compiled against an older
> version of Spark so there are communications issues on all of the
> slaves and they are getting in a weird state.
>
> What version did you compile your program against?
>
> On Mon, Dec 2, 2013 at 8:03 PM, K. Shankari <shankari@eecs.berkeley.edu>
> wrote:
> > After I kill -9 the worker processes and start them up again, I can
> connect
> > using the spark shell and access my HDFS data using it. I haven't yet
> tried
> > to shuffle from the spark-shell, though, which is the part that is
> giving me
> > problems in my program.
> >
> > I also did some more debugging:
> > The stacktrace where the worker is waiting for the unix process is this
> one:
> > "Thread-9" daemon prio=10 tid=0x00007f596c063000 nid=0x3cb0 in
> Object.wait()
> > [0x00007f599b35e000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >   at java.lang.Object.wait(Native Method)
> >   - waiting on <0x00000007e025e150> (a java.lang.UNIXProcess)
> >   at java.lang.Object.wait(Object.java:503)
> >   at java.lang.UNIXProcess.waitFor(UNIXProcess.java:210)
> >   - locked <0x00000007e025e150> (a java.lang.UNIXProcess)
> >   at
> >
> org.apache.spark.deploy.worker.ExecutorRunner$$anon$2.run(ExecutorRunner.scala:69)
> >
> > which corresponds to this code:
> >
> >  66       override def run() {
> >  67         if (process != null) {
> >  68           logInfo("Shutdown hook killing child process.")
> >  69           process.destroy()
> >  70           process.waitFor()
> >  71         }
> >  72       }
> >
> >
> > Looking at the associated log file, I see that the launched process was:
> >
> > 13/12/02 06:47:20 INFO worker.ExecutorRunner: Launch command: "java"
> "-cp"
> >
> ":/root/ephemeral-hdfs/conf:/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating-hadoop1.0.4.jar"
> > "-Djava.library.path=/root/ephemeral-hdfs/lib/native/"
> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
> > "-Dspark.akka.timeout=90000"
> "-Dspark.storage.blockManagerHeartBeatMs=90000"
> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
> > "-Dspark.akka.timeout=90000"
> "-Dspark.storage.blockManagerHeartBeatMs=90000"
> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
> > "-Dspark.akka.timeout=90000"
> "-Dspark.storage.blockManagerHeartBeatMs=90000"
> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
> > "-Xms6154M" "-Xmx6154M"
> > "org.apache.spark.executor.StandaloneExecutorBackend"
> > "akka://spark@<master-host-name>:38832/user/StandaloneScheduler" "0"
> > "<slave-host-name>" "2"
> >
> > And sure enough, the process is still running.
> >
> > logs]$ jps
> > 1510 DataNode
> > 16795 Jps
> > 12446 StandaloneExecutorBackend
> > 12382 Worker
> >
> > But I am not able to call jstack on it because it is not responding:
> > logs]$ jstack 12446
> > 12446: Unable to open socket file: target process not responding or
> HotSpot
> > VM not loaded
> > The -F option can be used when the target process is not responding
> >
> > Its logs don't have any errors, but seem to indicate issues while
> fetching
> > blocks using the block manager.
> >
> > On the master, I see the following:
> >
> > 13/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map
> output
> > locations for shuffle 0 to <slave-ip>:39992
> > 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
> > BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent heart
> > beats: 51656ms exceeds 45000ms
> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> > Registering block manager <slave-ip>:46556 with 3.8 GB RAM
> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> > Added rdd_9_0 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8
> GB)
> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> > Added rdd_9_1 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8
> GB)
> > 13/12/02 06:51:19 WARN storage.BlockManagerMasterActor: Removing
> > BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent heart
> > beats: 81974ms exceeds 45000ms
> >
> > And on the slave, I see:
> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Don't have map outputs for
> > shuffle 0, fetching them
> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Doing the fetch; tracker
> > actor =
> > Actor[akka://spark@master-ip.ec2.internal:38832/user/MapOutputTracker]
> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Got the output locations
> > 13/12/02 06:47:33 INFO
> > storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2
> > non-zero-bytes blocks out of 2 blocks
> > 13/12/02 06:47:33 INFO
> > storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote
> > gets in  10 ms
> > 13/12/02 06:47:37 INFO network.ConnectionManager: Accepted connection
> from
> > [other-slave.ec2.internal/10.116.46.231]
> > 13/12/02 06:47:44 INFO network.SendingConnection: Initiating connection
> to
> > [other-slave.internal/10.116.46.231:32807]
> > 13/12/02 06:47:44 INFO network.SendingConnection: Connected to
> > [other-slave.internal/10.116.46.231:32807], 1 messages pending
> > *** this is where we get the warning on the master, but it doesn't look
> like
> > we have started any block fetches by that time ***
> > 13/12/02 06:49:28 INFO storage.BlockManager: BlockManager reregistering
> with
> > master
> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Trying to register
> > BlockManager
> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Registered
> BlockManager
> > 13/12/02 06:49:28 INFO storage.BlockManager: Reporting 7 blocks to the
> > master.
> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block
> > rdd_9_0
> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block
> > rdd_9_1
> > *** this is where we get the second warning on the master ***
> >
> > Thanks,
> > Shankari
> >
> >
> >
> > Thanks,
> > Shankari
> >
> >
> > On Mon, Dec 2, 2013 at 7:41 PM, Patrick Wendell <pwendell@gmail.com>
> wrote:
> >>
> >> This is indeed a bit strange. One question - when you launch the ec2
> >> cluster can you run the spark shell? Can you access your HDFS data
> >> using the Spark shell?
> >>
> >> Also, what version of Spark is your application compiled against? Is
> >> it exactly the same version that is on the ec2 cluster?
> >>
> >> On Mon, Dec 2, 2013 at 4:24 PM, K. Shankari <shankari@eecs.berkeley.edu
> >
> >> wrote:
> >> > So this is pretty weird, and my debugging hasn't made much progress,
> so
> >> > I
> >> > thought I'd ask for help.
> >> >
> >> > I have a medium size dataset, but I am developing my code against a
> much
> >> > smaller version since it is much smaller and faster to work with. I
> have
> >> > deployed a cluster on EC2 using the default scripts and the AMI and
> >> > loaded
> >> > my simple dataset into the ephermeral HDFS.
> >> >
> >> > When I ssh into the master node and run my application using "local",
> >> > everything works. However, if I just change to using distributed spark
> >> > (spark://...:7077) then everything starts up, but the shuffle
> operations
> >> > start failing with the error:
> >> > 3/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map
> >> > output
> >> > locations for shuffle 0 to ip-10-38-11-59.ec2.internal:39992
> >> > ...
> >> > 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
> >> > BlockManager BlockManagerId(0, ip-10-38-11-59.ec2.internal, 46556, 0)
> >> > with
> >> > no recent heart beats: 51656ms exceeds 45000ms
> >> >
> >> > I looked around the prior documentation and put the extra options into
> >> > SPARK_JAVA_OPTS, and increased them to 90000 from the 30000 in the
> >> > example
> >> > but I get the same error.
> >> > -Dspark.worker.timeout=90000 -Dspark.akka.timeout=90000
> >> > -Dspark.storage.blockManagerHeartBeatMs=90000
> >> > -Dspark.akka.retry.wait=90000
> >> > -Dspark.akka.frameSize=30000 -Dsun.rmi.dgc.server.gcInterval=3600000
> >> >
> >> > I don't think that this is a memory issue because the dataset fits on
> >> > the
> >> > master alone and I am successfully able to run my program with
> "local".
> >> > I
> >> > checked the stdout and stderr on the worker, and one of the few times
> >> > that
> >> > there was a stacktrace, it was from here:
> >> >
> >> > 148   private def askDriverWithReply[T](message: Any): T = {
> >> > 149     // TODO: Consider removing multiple attempts
> >> > 150     if (driverActor == null) {
> >> > 151       throw new SparkException("Error sending message to
> >> > BlockManager as
> >> > driverActor is null " +
> >> > 152         "[message = " + message + "]")
> >> > 153     }
> >> > 154     var attempts = 0
> >> > 155     var lastException: Exception = null
> >> > 156     while (attempts < AKKA_RETRY_ATTEMPTS) {
> >> > 157       attempts += 1
> >> > 158       try {
> >> > 159         val future = driverActor.ask(message)(timeout)
> >> > 160         val result = Await.result(future, timeout)
> >> > 161         if (result == null) {
> >> > 162           throw new SparkException("BlockManagerMaster returned
> >> > null")
> >> > 163         }
> >> > 164         return result.asInstanceOf[T]
> >> > 165       } catch {
> >> > 166         case ie: InterruptedException => throw ie
> >> > 167         case e: Exception =>
> >> > 168           lastException = e
> >> > 169           logWarning("Error sending message to BlockManagerMaster
> in
> >> > " +
> >> > attempts + "attempts", e)
> >> > 170       }
> >> > 171       Thread.sleep(AKKA_RETRY_INTERVAL_MS)
> >> > 172     }
> >> > 173
> >> > 174     throw new SparkException(
> >> > 175       "Error sending message to BlockManagerMaster [message = " +
> >> > message + "]", lastException)
> >> > 176   }
> >> >
> >> > Most of the time, though, the worker just hangs. I can't run anything
> >> > else
> >> > against that master because there are no resources available. When I
> try
> >> > to
> >> > stop the workers using stop-slaves.sh, they don't stop. The only way
> to
> >> > recover the cluster is to use "kill -9 <pid>" on the worker processes,
> >> > which
> >> > does work.
> >> >
> >> > So I took a quick look at one of the hung worker processes using
> jstack.
> >> > The
> >> > output is attached. As you can see, I have tried to shut down the
> >> > process
> >> > multiple times, and the SIGTERM handlers are stuck waiting. I think
> that
> >> > this may be the reason that the workers are not responsive for block
> >> > handling as well.
> >> >
> >> > If I read this correctly, the handler is waiting on 0x00000007e025d468
> >> > which
> >> > is locked by spark.deploy.worker.ExecutorRunner$$anon$2. And that is
> >> > waiting
> >> > for some unixprocess?
> >> >
> >> > Thanks,
> >> > Shankari
> >
> >
>

Mime
View raw message