spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "K. Shankari" <shank...@eecs.berkeley.edu>
Subject Spark worker processes hang while processing small 2MB dataset
Date Tue, 03 Dec 2013 00:24:23 GMT
So this is pretty weird, and my debugging hasn't made much progress, so I
thought I'd ask for help.

I have a medium size dataset, but I am developing my code against a much
smaller version since it is much smaller and faster to work with. I have
deployed a cluster on EC2 using the default scripts and the AMI and loaded
my simple dataset into the ephermeral HDFS.

When I ssh into the master node and run my application using "local",
everything works. However, if I just change to using distributed spark
(spark://...:7077) then everything starts up, but the shuffle operations
start failing with the error:
3/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map output
locations for shuffle 0 to ip-10-38-11-59.ec2.internal:39992
...
13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
BlockManager BlockManagerId(0, ip-10-38-11-59.ec2.internal, 46556, 0) with
no recent heart beats: 51656ms exceeds 45000ms

I looked around the prior documentation and put the extra options into
SPARK_JAVA_OPTS, and increased them to 90000 from the 30000 in the example
but I get the same error.
-Dspark.worker.timeout=90000 -Dspark.akka.timeout=90000
-Dspark.storage.blockManagerHeartBeatMs=90000
-Dspark.akka.retry.wait=90000 -Dspark.akka.frameSize=30000
-Dsun.rmi.dgc.server.gcInterval=3600000

I don't think that this is a memory issue because the dataset fits on the
master alone and I am successfully able to run my program with "local". I
checked the stdout and stderr on the worker, and one of the few times that
there was a stacktrace, it was from here:

148   private def askDriverWithReply[T](message: Any): T = {
149     // TODO: Consider removing multiple attempts
150     if (driverActor == null) {
151       throw new SparkException("Error sending message to BlockManager
as driverActor is null " +
152         "[message = " + message + "]")
153     }
154     var attempts = 0
155     var lastException: Exception = null
156     while (attempts < AKKA_RETRY_ATTEMPTS) {
157       attempts += 1
158       try {
159         val future = driverActor.ask(message)(timeout)
160         val result = Await.result(future, timeout)
161         if (result == null) {
162           throw new SparkException("BlockManagerMaster returned null")
163         }
164         return result.asInstanceOf[T]
165       } catch {
166         case ie: InterruptedException => throw ie
167         case e: Exception =>
168           lastException = e
169           logWarning("Error sending message to BlockManagerMaster in "
+ attempts + "attempts", e)
170       }
171       Thread.sleep(AKKA_RETRY_INTERVAL_MS)
172     }
173
174     throw new SparkException(
175       "Error sending message to BlockManagerMaster [message = " +
message + "]", lastException)
176   }

Most of the time, though, the worker just hangs. I can't run anything else
against that master because there are no resources available. When I try to
stop the workers using stop-slaves.sh, they don't stop. The only way to
recover the cluster is to use "kill -9 <pid>" on the worker processes,
which does work.

So I took a quick look at one of the hung worker processes using jstack.
The output is attached. As you can see, I have tried to shut down the
process multiple times, and the SIGTERM handlers are stuck waiting. I think
that this may be the reason that the workers are not responsive for block
handling as well.

If I read this correctly, the handler is waiting on 0x00000007e025d468
which is locked by spark.deploy.worker.ExecutorRunner$$anon$2. And that is
waiting for some unixprocess?

Thanks,
Shankari

Mime
View raw message