spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "K. Shankari" <shank...@eecs.berkeley.edu>
Subject Re: Spark worker processes hang while processing small 2MB dataset
Date Tue, 03 Dec 2013 04:03:09 GMT
After I kill -9 the worker processes and start them up again, I can connect
using the spark shell and access my HDFS data using it. I haven't yet tried
to shuffle from the spark-shell, though, which is the part that is giving
me problems in my program.

I also did some more debugging:
The stacktrace where the worker is waiting for the unix process is this one:
"Thread-9" daemon prio=10 tid=0x00007f596c063000 nid=0x3cb0 in
Object.wait() [0x00007f599b35e000]
   java.lang.Thread.State: WAITING (on object monitor)
  at java.lang.Object.wait(Native Method)
  - waiting on <0x00000007e025e150> (a java.lang.UNIXProcess)
  at java.lang.Object.wait(Object.java:503)
  at java.lang.UNIXProcess.waitFor(UNIXProcess.java:210)
  - locked <0x00000007e025e150> (a java.lang.UNIXProcess)
  at
org.apache.spark.deploy.worker.ExecutorRunner$$anon$2.run(ExecutorRunner.scala:69)

which corresponds to this code:

 66       override def run() {
 67         if (process != null) {
 68           logInfo("Shutdown hook killing child process.")
 69           process.destroy()
 70           process.waitFor()
 71         }
 72       }


Looking at the associated log file, I see that the launched process was:

13/12/02 06:47:20 INFO worker.ExecutorRunner: Launch command: "java" "-cp"
":/root/ephemeral-hdfs/conf:/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating-hadoop1.0.4.jar"
"-Djava.library.path=/root/ephemeral-hdfs/lib/native/"
"-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
"-Dspark.akka.timeout=90000"
"-Dspark.storage.blockManagerHeartBeatMs=90000"
"-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
"-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
"-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
"-Dspark.akka.timeout=90000"
"-Dspark.storage.blockManagerHeartBeatMs=90000"
"-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
"-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
"-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
"-Dspark.akka.timeout=90000"
"-Dspark.storage.blockManagerHeartBeatMs=90000"
"-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
"-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
"-Xms6154M" "-Xmx6154M"
"org.apache.spark.executor.StandaloneExecutorBackend"
"akka://spark@<master-host-name>:38832/user/StandaloneScheduler"
"0" "<slave-host-name>" "2"

And sure enough, the process is still running.

logs]$ jps
1510 DataNode
16795 Jps
12446 StandaloneExecutorBackend
12382 Worker

But I am not able to call jstack on it because it is not responding:
logs]$ jstack 12446
12446: Unable to open socket file: target process not responding or HotSpot
VM not loaded
The -F option can be used when the target process is not responding

Its logs don't have any errors, but seem to indicate issues while fetching
blocks using the block manager.

On the master, I see the following:

13/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map
output locations for shuffle 0 to <slave-ip>:39992
13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent heart
beats: 51656ms exceeds 45000ms
13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Registering block manager <slave-ip>:46556 with 3.8 GB RAM
13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Added rdd_9_0 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8 GB)
13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Added rdd_9_1 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8 GB)
13/12/02 06:51:19 WARN storage.BlockManagerMasterActor: Removing
BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent heart
beats: 81974ms exceeds 45000ms

And on the slave, I see:
13/12/02 06:47:33 INFO spark.MapOutputTracker: Don't have map outputs for
shuffle 0, fetching them
13/12/02 06:47:33 INFO spark.MapOutputTracker: Doing the fetch; tracker
actor = Actor[akka://spark@master-ip.ec2.internal
:38832/user/MapOutputTracker]
13/12/02 06:47:33 INFO spark.MapOutputTracker: Got the output locations
13/12/02 06:47:33 INFO
storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2
non-zero-bytes blocks out of 2 blocks
13/12/02 06:47:33 INFO
storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote
gets in  10 ms
13/12/02 06:47:37 INFO network.ConnectionManager: Accepted connection from
[other-slave.ec2.internal/10.116.46.231]
13/12/02 06:47:44 INFO network.SendingConnection: Initiating connection to
[other-slave.internal/10.116.46.231:32807]
13/12/02 06:47:44 INFO network.SendingConnection: Connected to
[other-slave.internal/10.116.46.231:32807], 1 messages pending
*** this is where we get the warning on the master, but it doesn't look
like we have started any block fetches by that time ***
13/12/02 06:49:28 INFO storage.BlockManager: BlockManager reregistering
with master
13/12/02 06:49:28 INFO storage.BlockManagerMaster: Trying to register
BlockManager
13/12/02 06:49:28 INFO storage.BlockManagerMaster: Registered BlockManager
13/12/02 06:49:28 INFO storage.BlockManager: Reporting 7 blocks to the
master.
13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block
rdd_9_0
13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block
rdd_9_1
*** this is where we get the second warning on the master ***

Thanks,
Shankari



Thanks,
Shankari


On Mon, Dec 2, 2013 at 7:41 PM, Patrick Wendell <pwendell@gmail.com> wrote:

> This is indeed a bit strange. One question - when you launch the ec2
> cluster can you run the spark shell? Can you access your HDFS data
> using the Spark shell?
>
> Also, what version of Spark is your application compiled against? Is
> it exactly the same version that is on the ec2 cluster?
>
> On Mon, Dec 2, 2013 at 4:24 PM, K. Shankari <shankari@eecs.berkeley.edu>
> wrote:
> > So this is pretty weird, and my debugging hasn't made much progress, so I
> > thought I'd ask for help.
> >
> > I have a medium size dataset, but I am developing my code against a much
> > smaller version since it is much smaller and faster to work with. I have
> > deployed a cluster on EC2 using the default scripts and the AMI and
> loaded
> > my simple dataset into the ephermeral HDFS.
> >
> > When I ssh into the master node and run my application using "local",
> > everything works. However, if I just change to using distributed spark
> > (spark://...:7077) then everything starts up, but the shuffle operations
> > start failing with the error:
> > 3/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map
> output
> > locations for shuffle 0 to ip-10-38-11-59.ec2.internal:39992
> > ...
> > 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
> > BlockManager BlockManagerId(0, ip-10-38-11-59.ec2.internal, 46556, 0)
> with
> > no recent heart beats: 51656ms exceeds 45000ms
> >
> > I looked around the prior documentation and put the extra options into
> > SPARK_JAVA_OPTS, and increased them to 90000 from the 30000 in the
> example
> > but I get the same error.
> > -Dspark.worker.timeout=90000 -Dspark.akka.timeout=90000
> > -Dspark.storage.blockManagerHeartBeatMs=90000
>  -Dspark.akka.retry.wait=90000
> > -Dspark.akka.frameSize=30000 -Dsun.rmi.dgc.server.gcInterval=3600000
> >
> > I don't think that this is a memory issue because the dataset fits on the
> > master alone and I am successfully able to run my program with "local". I
> > checked the stdout and stderr on the worker, and one of the few times
> that
> > there was a stacktrace, it was from here:
> >
> > 148   private def askDriverWithReply[T](message: Any): T = {
> > 149     // TODO: Consider removing multiple attempts
> > 150     if (driverActor == null) {
> > 151       throw new SparkException("Error sending message to
> BlockManager as
> > driverActor is null " +
> > 152         "[message = " + message + "]")
> > 153     }
> > 154     var attempts = 0
> > 155     var lastException: Exception = null
> > 156     while (attempts < AKKA_RETRY_ATTEMPTS) {
> > 157       attempts += 1
> > 158       try {
> > 159         val future = driverActor.ask(message)(timeout)
> > 160         val result = Await.result(future, timeout)
> > 161         if (result == null) {
> > 162           throw new SparkException("BlockManagerMaster returned
> null")
> > 163         }
> > 164         return result.asInstanceOf[T]
> > 165       } catch {
> > 166         case ie: InterruptedException => throw ie
> > 167         case e: Exception =>
> > 168           lastException = e
> > 169           logWarning("Error sending message to BlockManagerMaster in
> " +
> > attempts + "attempts", e)
> > 170       }
> > 171       Thread.sleep(AKKA_RETRY_INTERVAL_MS)
> > 172     }
> > 173
> > 174     throw new SparkException(
> > 175       "Error sending message to BlockManagerMaster [message = " +
> > message + "]", lastException)
> > 176   }
> >
> > Most of the time, though, the worker just hangs. I can't run anything
> else
> > against that master because there are no resources available. When I try
> to
> > stop the workers using stop-slaves.sh, they don't stop. The only way to
> > recover the cluster is to use "kill -9 <pid>" on the worker processes,
> which
> > does work.
> >
> > So I took a quick look at one of the hung worker processes using jstack.
> The
> > output is attached. As you can see, I have tried to shut down the process
> > multiple times, and the SIGTERM handlers are stuck waiting. I think that
> > this may be the reason that the workers are not responsive for block
> > handling as well.
> >
> > If I read this correctly, the handler is waiting on 0x00000007e025d468
> which
> > is locked by spark.deploy.worker.ExecutorRunner$$anon$2. And that is
> waiting
> > for some unixprocess?
> >
> > Thanks,
> > Shankari
>

Mime
View raw message