spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "K. Shankari" <shank...@eecs.berkeley.edu>
Subject Re: Spark worker processes hang while processing small 2MB dataset
Date Tue, 03 Dec 2013 05:12:39 GMT
So if I run my code directly from the spark-shell, it works as well.
Luckily, I have a fairly small main function.
I wonder if there is something funky going on with my spark context - that
seems to be the main difference in launching the program.
Anyway, I am unblocked now, so I will go off and experiment with various
options and see whether it is an error with launching my script, or whether
I can report something more reproducible.

Thanks,
Shankari


On Mon, Dec 2, 2013 at 8:32 PM, K. Shankari <shankari@eecs.berkeley.edu>wrote:

> I compiled against 0.8.0-incubating. And I am running against
> 0.8.0-incubating.
>
> Thanks,
> Shankari
>
>
> On Mon, Dec 2, 2013 at 8:18 PM, Patrick Wendell <pwendell@gmail.com>wrote:
>
>> The traces just show that your cluster is trying to shut down and
>> getting hung somewhere while shutting down, so unfortunately that
>> doesn't tell us much. The key question is why everything wants to shut
>> down in the first place.
>>
>> One thing I was wondering was if maybe you'd compiled against an older
>> version of Spark so there are communications issues on all of the
>> slaves and they are getting in a weird state.
>>
>> What version did you compile your program against?
>>
>> On Mon, Dec 2, 2013 at 8:03 PM, K. Shankari <shankari@eecs.berkeley.edu>
>> wrote:
>> > After I kill -9 the worker processes and start them up again, I can
>> connect
>> > using the spark shell and access my HDFS data using it. I haven't yet
>> tried
>> > to shuffle from the spark-shell, though, which is the part that is
>> giving me
>> > problems in my program.
>> >
>> > I also did some more debugging:
>> > The stacktrace where the worker is waiting for the unix process is this
>> one:
>> > "Thread-9" daemon prio=10 tid=0x00007f596c063000 nid=0x3cb0 in
>> Object.wait()
>> > [0x00007f599b35e000]
>> >    java.lang.Thread.State: WAITING (on object monitor)
>> >   at java.lang.Object.wait(Native Method)
>> >   - waiting on <0x00000007e025e150> (a java.lang.UNIXProcess)
>> >   at java.lang.Object.wait(Object.java:503)
>> >   at java.lang.UNIXProcess.waitFor(UNIXProcess.java:210)
>> >   - locked <0x00000007e025e150> (a java.lang.UNIXProcess)
>> >   at
>> >
>> org.apache.spark.deploy.worker.ExecutorRunner$$anon$2.run(ExecutorRunner.scala:69)
>> >
>> > which corresponds to this code:
>> >
>> >  66       override def run() {
>> >  67         if (process != null) {
>> >  68           logInfo("Shutdown hook killing child process.")
>> >  69           process.destroy()
>> >  70           process.waitFor()
>> >  71         }
>> >  72       }
>> >
>> >
>> > Looking at the associated log file, I see that the launched process was:
>> >
>> > 13/12/02 06:47:20 INFO worker.ExecutorRunner: Launch command: "java"
>> "-cp"
>> >
>> ":/root/ephemeral-hdfs/conf:/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating-hadoop1.0.4.jar"
>> > "-Djava.library.path=/root/ephemeral-hdfs/lib/native/"
>> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark"
>> "-Dspark.worker.timeout=90000"
>> > "-Dspark.akka.timeout=90000"
>> "-Dspark.storage.blockManagerHeartBeatMs=90000"
>> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
>> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
>> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark"
>> "-Dspark.worker.timeout=90000"
>> > "-Dspark.akka.timeout=90000"
>> "-Dspark.storage.blockManagerHeartBeatMs=90000"
>> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
>> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
>> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark"
>> "-Dspark.worker.timeout=90000"
>> > "-Dspark.akka.timeout=90000"
>> "-Dspark.storage.blockManagerHeartBeatMs=90000"
>> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000"
>> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit"
>> > "-Xms6154M" "-Xmx6154M"
>> > "org.apache.spark.executor.StandaloneExecutorBackend"
>> > "akka://spark@<master-host-name>:38832/user/StandaloneScheduler" "0"
>> > "<slave-host-name>" "2"
>> >
>> > And sure enough, the process is still running.
>> >
>> > logs]$ jps
>> > 1510 DataNode
>> > 16795 Jps
>> > 12446 StandaloneExecutorBackend
>> > 12382 Worker
>> >
>> > But I am not able to call jstack on it because it is not responding:
>> > logs]$ jstack 12446
>> > 12446: Unable to open socket file: target process not responding or
>> HotSpot
>> > VM not loaded
>> > The -F option can be used when the target process is not responding
>> >
>> > Its logs don't have any errors, but seem to indicate issues while
>> fetching
>> > blocks using the block manager.
>> >
>> > On the master, I see the following:
>> >
>> > 13/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map
>> output
>> > locations for shuffle 0 to <slave-ip>:39992
>> > 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
>> > BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent
>> heart
>> > beats: 51656ms exceeds 45000ms
>> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>> > Registering block manager <slave-ip>:46556 with 3.8 GB RAM
>> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>> > Added rdd_9_0 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8
>> GB)
>> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>> > Added rdd_9_1 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8
>> GB)
>> > 13/12/02 06:51:19 WARN storage.BlockManagerMasterActor: Removing
>> > BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent
>> heart
>> > beats: 81974ms exceeds 45000ms
>> >
>> > And on the slave, I see:
>> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Don't have map outputs
>> for
>> > shuffle 0, fetching them
>> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Doing the fetch; tracker
>> > actor =
>> > Actor[akka://spark@master-ip.ec2.internal:38832/user/MapOutputTracker]
>> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Got the output locations
>> > 13/12/02 06:47:33 INFO
>> > storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2
>> > non-zero-bytes blocks out of 2 blocks
>> > 13/12/02 06:47:33 INFO
>> > storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote
>> > gets in  10 ms
>> > 13/12/02 06:47:37 INFO network.ConnectionManager: Accepted connection
>> from
>> > [other-slave.ec2.internal/10.116.46.231]
>> > 13/12/02 06:47:44 INFO network.SendingConnection: Initiating connection
>> to
>> > [other-slave.internal/10.116.46.231:32807]
>> > 13/12/02 06:47:44 INFO network.SendingConnection: Connected to
>> > [other-slave.internal/10.116.46.231:32807], 1 messages pending
>> > *** this is where we get the warning on the master, but it doesn't look
>> like
>> > we have started any block fetches by that time ***
>> > 13/12/02 06:49:28 INFO storage.BlockManager: BlockManager reregistering
>> with
>> > master
>> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Trying to register
>> > BlockManager
>> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Registered
>> BlockManager
>> > 13/12/02 06:49:28 INFO storage.BlockManager: Reporting 7 blocks to the
>> > master.
>> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block
>> > rdd_9_0
>> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block
>> > rdd_9_1
>> > *** this is where we get the second warning on the master ***
>> >
>> > Thanks,
>> > Shankari
>> >
>> >
>> >
>> > Thanks,
>> > Shankari
>> >
>> >
>> > On Mon, Dec 2, 2013 at 7:41 PM, Patrick Wendell <pwendell@gmail.com>
>> wrote:
>> >>
>> >> This is indeed a bit strange. One question - when you launch the ec2
>> >> cluster can you run the spark shell? Can you access your HDFS data
>> >> using the Spark shell?
>> >>
>> >> Also, what version of Spark is your application compiled against? Is
>> >> it exactly the same version that is on the ec2 cluster?
>> >>
>> >> On Mon, Dec 2, 2013 at 4:24 PM, K. Shankari <
>> shankari@eecs.berkeley.edu>
>> >> wrote:
>> >> > So this is pretty weird, and my debugging hasn't made much progress,
>> so
>> >> > I
>> >> > thought I'd ask for help.
>> >> >
>> >> > I have a medium size dataset, but I am developing my code against a
>> much
>> >> > smaller version since it is much smaller and faster to work with. I
>> have
>> >> > deployed a cluster on EC2 using the default scripts and the AMI and
>> >> > loaded
>> >> > my simple dataset into the ephermeral HDFS.
>> >> >
>> >> > When I ssh into the master node and run my application using "local",
>> >> > everything works. However, if I just change to using distributed
>> spark
>> >> > (spark://...:7077) then everything starts up, but the shuffle
>> operations
>> >> > start failing with the error:
>> >> > 3/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map
>> >> > output
>> >> > locations for shuffle 0 to ip-10-38-11-59.ec2.internal:39992
>> >> > ...
>> >> > 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing
>> >> > BlockManager BlockManagerId(0, ip-10-38-11-59.ec2.internal, 46556,
0)
>> >> > with
>> >> > no recent heart beats: 51656ms exceeds 45000ms
>> >> >
>> >> > I looked around the prior documentation and put the extra options
>> into
>> >> > SPARK_JAVA_OPTS, and increased them to 90000 from the 30000 in the
>> >> > example
>> >> > but I get the same error.
>> >> > -Dspark.worker.timeout=90000 -Dspark.akka.timeout=90000
>> >> > -Dspark.storage.blockManagerHeartBeatMs=90000
>> >> > -Dspark.akka.retry.wait=90000
>> >> > -Dspark.akka.frameSize=30000 -Dsun.rmi.dgc.server.gcInterval=3600000
>> >> >
>> >> > I don't think that this is a memory issue because the dataset fits
on
>> >> > the
>> >> > master alone and I am successfully able to run my program with
>> "local".
>> >> > I
>> >> > checked the stdout and stderr on the worker, and one of the few times
>> >> > that
>> >> > there was a stacktrace, it was from here:
>> >> >
>> >> > 148   private def askDriverWithReply[T](message: Any): T = {
>> >> > 149     // TODO: Consider removing multiple attempts
>> >> > 150     if (driverActor == null) {
>> >> > 151       throw new SparkException("Error sending message to
>> >> > BlockManager as
>> >> > driverActor is null " +
>> >> > 152         "[message = " + message + "]")
>> >> > 153     }
>> >> > 154     var attempts = 0
>> >> > 155     var lastException: Exception = null
>> >> > 156     while (attempts < AKKA_RETRY_ATTEMPTS) {
>> >> > 157       attempts += 1
>> >> > 158       try {
>> >> > 159         val future = driverActor.ask(message)(timeout)
>> >> > 160         val result = Await.result(future, timeout)
>> >> > 161         if (result == null) {
>> >> > 162           throw new SparkException("BlockManagerMaster returned
>> >> > null")
>> >> > 163         }
>> >> > 164         return result.asInstanceOf[T]
>> >> > 165       } catch {
>> >> > 166         case ie: InterruptedException => throw ie
>> >> > 167         case e: Exception =>
>> >> > 168           lastException = e
>> >> > 169           logWarning("Error sending message to
>> BlockManagerMaster in
>> >> > " +
>> >> > attempts + "attempts", e)
>> >> > 170       }
>> >> > 171       Thread.sleep(AKKA_RETRY_INTERVAL_MS)
>> >> > 172     }
>> >> > 173
>> >> > 174     throw new SparkException(
>> >> > 175       "Error sending message to BlockManagerMaster [message = "
+
>> >> > message + "]", lastException)
>> >> > 176   }
>> >> >
>> >> > Most of the time, though, the worker just hangs. I can't run anything
>> >> > else
>> >> > against that master because there are no resources available. When
I
>> try
>> >> > to
>> >> > stop the workers using stop-slaves.sh, they don't stop. The only way
>> to
>> >> > recover the cluster is to use "kill -9 <pid>" on the worker
>> processes,
>> >> > which
>> >> > does work.
>> >> >
>> >> > So I took a quick look at one of the hung worker processes using
>> jstack.
>> >> > The
>> >> > output is attached. As you can see, I have tried to shut down the
>> >> > process
>> >> > multiple times, and the SIGTERM handlers are stuck waiting. I think
>> that
>> >> > this may be the reason that the workers are not responsive for block
>> >> > handling as well.
>> >> >
>> >> > If I read this correctly, the handler is waiting on
>> 0x00000007e025d468
>> >> > which
>> >> > is locked by spark.deploy.worker.ExecutorRunner$$anon$2. And that is
>> >> > waiting
>> >> > for some unixprocess?
>> >> >
>> >> > Thanks,
>> >> > Shankari
>> >
>> >
>>
>
>

Mime
View raw message