spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick Wendell <pwend...@gmail.com>
Subject Re: Spark worker processes hang while processing small 2MB dataset
Date Tue, 03 Dec 2013 04:18:55 GMT
The traces just show that your cluster is trying to shut down and
getting hung somewhere while shutting down, so unfortunately that
doesn't tell us much. The key question is why everything wants to shut
down in the first place.

One thing I was wondering was if maybe you'd compiled against an older
version of Spark so there are communications issues on all of the
slaves and they are getting in a weird state.

What version did you compile your program against?

On Mon, Dec 2, 2013 at 8:03 PM, K. Shankari <shankari@eecs.berkeley.edu> wrote:
> After I kill -9 the worker processes and start them up again, I can connect
> using the spark shell and access my HDFS data using it. I haven't yet tried
> to shuffle from the spark-shell, though, which is the part that is giving me
> problems in my program.
>
> I also did some more debugging:
> The stacktrace where the worker is waiting for the unix process is this one:
> "Thread-9" daemon prio=10 tid=0x00007f596c063000 nid=0x3cb0 in Object.wait()
> [0x00007f599b35e000]
>    java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0x00000007e025e150> (a java.lang.UNIXProcess)
>   at java.lang.Object.wait(Object.java:503)
>   at java.lang.UNIXProcess.waitFor(UNIXProcess.java:210)
>   - locked <0x00000007e025e150> (a java.lang.UNIXProcess)
>   at
> org.apache.spark.deploy.worker.ExecutorRunner$$anon$2.run(ExecutorRunner.scala:69)
>
> which corresponds to this code:
>
>  66       override def run() {
>  67         if (process != null) {
>  68           logInfo("Shutdown hook killing child process.")
>  69           process.destroy()
>  70           process.waitFor()
>  71         }
>  72       }
>
>
> Looking at the associated log file, I see that the launched process was:
>
> 13/12/02 06:47:20 INFO worker.ExecutorRunner: Launch command: "java" "-cp"
> ":/root/ephemeral-hdfs/conf:/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating-hadoop1.0.4.jar"
> "-Djava.library.path=/root/ephemeral-hdfs/lib/native/"
> "-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
> "-Dspark.akka.timeout=90000" "-Dspark.storage.blockManagerHeartBeatMs=90000"
> "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
> "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
> "-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
> "-Dspark.akka.timeout=90000" "-Dspark.storage.blockManagerHeartBeatMs=90000"
> "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
> "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
> "-Dspark.local.dir=/mnt/spark,/mnt2/spark" "-Dspark.worker.timeout=90000"
> "-Dspark.akka.timeout=90000" "-Dspark.storage.blockManagerHeartBeatMs=90000"
> "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
> "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
> "-Xms6154M" "-Xmx6154M"
> "org.apache.spark.executor.StandaloneExecutorBackend"
> "akka://spark@<master-host-name>:38832/user/StandaloneScheduler" "0"
> "<slave-host-name>" "2"
>
> And sure enough, the process is still running.
>
> logs]$ jps
> 1510 DataNode
> 16795 Jps
> 12446 StandaloneExecutorBackend
> 12382 Worker
>
> But I am not able to call jstack on it because it is not responding:
> logs]$ jstack 12446
> 12446: Unable to open socket file: target process not responding or HotSpot
> VM not loaded
> The -F option can be used when the target process is not responding
>
> Its logs don't have any errors, but seem to indicate issues while fetching
> blocks using the block manager.
>
> On the master, I see the following:
>
> 13/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map output
> locations for shuffle 0 to <slave-ip>:39992
> 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
> BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent heart
> beats: 51656ms exceeds 45000ms
> 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Registering block manager <slave-ip>:46556 with 3.8 GB RAM
> 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Added rdd_9_0 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8 GB)
> 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Added rdd_9_1 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8 GB)
> 13/12/02 06:51:19 WARN storage.BlockManagerMasterActor: Removing
> BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent heart
> beats: 81974ms exceeds 45000ms
>
> And on the slave, I see:
> 13/12/02 06:47:33 INFO spark.MapOutputTracker: Don't have map outputs for
> shuffle 0, fetching them
> 13/12/02 06:47:33 INFO spark.MapOutputTracker: Doing the fetch; tracker
> actor =
> Actor[akka://spark@master-ip.ec2.internal:38832/user/MapOutputTracker]
> 13/12/02 06:47:33 INFO spark.MapOutputTracker: Got the output locations
> 13/12/02 06:47:33 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2
> non-zero-bytes blocks out of 2 blocks
> 13/12/02 06:47:33 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote
> gets in  10 ms
> 13/12/02 06:47:37 INFO network.ConnectionManager: Accepted connection from
> [other-slave.ec2.internal/10.116.46.231]
> 13/12/02 06:47:44 INFO network.SendingConnection: Initiating connection to
> [other-slave.internal/10.116.46.231:32807]
> 13/12/02 06:47:44 INFO network.SendingConnection: Connected to
> [other-slave.internal/10.116.46.231:32807], 1 messages pending
> *** this is where we get the warning on the master, but it doesn't look like
> we have started any block fetches by that time ***
> 13/12/02 06:49:28 INFO storage.BlockManager: BlockManager reregistering with
> master
> 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Trying to register
> BlockManager
> 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Registered BlockManager
> 13/12/02 06:49:28 INFO storage.BlockManager: Reporting 7 blocks to the
> master.
> 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block
> rdd_9_0
> 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block
> rdd_9_1
> *** this is where we get the second warning on the master ***
>
> Thanks,
> Shankari
>
>
>
> Thanks,
> Shankari
>
>
> On Mon, Dec 2, 2013 at 7:41 PM, Patrick Wendell <pwendell@gmail.com> wrote:
>>
>> This is indeed a bit strange. One question - when you launch the ec2
>> cluster can you run the spark shell? Can you access your HDFS data
>> using the Spark shell?
>>
>> Also, what version of Spark is your application compiled against? Is
>> it exactly the same version that is on the ec2 cluster?
>>
>> On Mon, Dec 2, 2013 at 4:24 PM, K. Shankari <shankari@eecs.berkeley.edu>
>> wrote:
>> > So this is pretty weird, and my debugging hasn't made much progress, so
>> > I
>> > thought I'd ask for help.
>> >
>> > I have a medium size dataset, but I am developing my code against a much
>> > smaller version since it is much smaller and faster to work with. I have
>> > deployed a cluster on EC2 using the default scripts and the AMI and
>> > loaded
>> > my simple dataset into the ephermeral HDFS.
>> >
>> > When I ssh into the master node and run my application using "local",
>> > everything works. However, if I just change to using distributed spark
>> > (spark://...:7077) then everything starts up, but the shuffle operations
>> > start failing with the error:
>> > 3/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map
>> > output
>> > locations for shuffle 0 to ip-10-38-11-59.ec2.internal:39992
>> > ...
>> > 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
>> > BlockManager BlockManagerId(0, ip-10-38-11-59.ec2.internal, 46556, 0)
>> > with
>> > no recent heart beats: 51656ms exceeds 45000ms
>> >
>> > I looked around the prior documentation and put the extra options into
>> > SPARK_JAVA_OPTS, and increased them to 90000 from the 30000 in the
>> > example
>> > but I get the same error.
>> > -Dspark.worker.timeout=90000 -Dspark.akka.timeout=90000
>> > -Dspark.storage.blockManagerHeartBeatMs=90000
>> > -Dspark.akka.retry.wait=90000
>> > -Dspark.akka.frameSize=30000 -Dsun.rmi.dgc.server.gcInterval=3600000
>> >
>> > I don't think that this is a memory issue because the dataset fits on
>> > the
>> > master alone and I am successfully able to run my program with "local".
>> > I
>> > checked the stdout and stderr on the worker, and one of the few times
>> > that
>> > there was a stacktrace, it was from here:
>> >
>> > 148   private def askDriverWithReply[T](message: Any): T = {
>> > 149     // TODO: Consider removing multiple attempts
>> > 150     if (driverActor == null) {
>> > 151       throw new SparkException("Error sending message to
>> > BlockManager as
>> > driverActor is null " +
>> > 152         "[message = " + message + "]")
>> > 153     }
>> > 154     var attempts = 0
>> > 155     var lastException: Exception = null
>> > 156     while (attempts < AKKA_RETRY_ATTEMPTS) {
>> > 157       attempts += 1
>> > 158       try {
>> > 159         val future = driverActor.ask(message)(timeout)
>> > 160         val result = Await.result(future, timeout)
>> > 161         if (result == null) {
>> > 162           throw new SparkException("BlockManagerMaster returned
>> > null")
>> > 163         }
>> > 164         return result.asInstanceOf[T]
>> > 165       } catch {
>> > 166         case ie: InterruptedException => throw ie
>> > 167         case e: Exception =>
>> > 168           lastException = e
>> > 169           logWarning("Error sending message to BlockManagerMaster in
>> > " +
>> > attempts + "attempts", e)
>> > 170       }
>> > 171       Thread.sleep(AKKA_RETRY_INTERVAL_MS)
>> > 172     }
>> > 173
>> > 174     throw new SparkException(
>> > 175       "Error sending message to BlockManagerMaster [message = " +
>> > message + "]", lastException)
>> > 176   }
>> >
>> > Most of the time, though, the worker just hangs. I can't run anything
>> > else
>> > against that master because there are no resources available. When I try
>> > to
>> > stop the workers using stop-slaves.sh, they don't stop. The only way to
>> > recover the cluster is to use "kill -9 <pid>" on the worker processes,
>> > which
>> > does work.
>> >
>> > So I took a quick look at one of the hung worker processes using jstack.
>> > The
>> > output is attached. As you can see, I have tried to shut down the
>> > process
>> > multiple times, and the SIGTERM handlers are stuck waiting. I think that
>> > this may be the reason that the workers are not responsive for block
>> > handling as well.
>> >
>> > If I read this correctly, the handler is waiting on 0x00000007e025d468
>> > which
>> > is locked by spark.deploy.worker.ExecutorRunner$$anon$2. And that is
>> > waiting
>> > for some unixprocess?
>> >
>> > Thanks,
>> > Shankari
>
>

Mime
View raw message