spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Petros Nyfantis <pnyfan...@gmail.com>
Subject Re: DAG Scheduler deadlock when two RDDs reference each other, force Stages manually?
Date Mon, 14 Sep 2015 10:13:41 GMT
Hi Sean

thanks a lot for your reply, yes I understand
that as scala is a functional language maps
correspond to transforms of immutable objects
but the behavior of the program seems like
a deadlock as it simply does not proceed beyond
the B = B.map (A.aggregate) stage

my Spark Web interface shows a pure
scheduler delay bar when I click one
of the still active jobs and expand the
event timeline.

A snippet of thread dump follows my
message, where threads that correspond
to my code calls appear and they are
all in WAITING. like I said, when I remove
the second map and nested aggregate
the problem vanishes.

Thanks again,
Petros

2015-09-14 20:00:53
Full thread dump OpenJDK 64-Bit Server VM (24.79-b02 mixed mode):

"Attach Listener" daemon prio=10 tid=0x00007f9aa4001000 nid=0x674c 
waiting on condition [0x0000000000000000]
    java.lang.Thread.State: RUNNABLE

    Locked ownable synchronizers:
         - None

"qtp1523343281-98" daemon prio=10 tid=0x00007f9a2c001000 nid=0x6675 
waiting on condition [0x00007f9ab48a0000]
    java.lang.Thread.State: TIMED_WAITING (parking)
         at sun.misc.Unsafe.park(Native Method)
         - parking to wait for  <0x00000000e0389d80> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
         at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
         at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
         at 
org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
         at java.lang.Thread.run(Thread.java:745)

    Locked ownable synchronizers:
         - None

"qtp1523343281-97" daemon prio=10 tid=0x00007f9a28002000 nid=0x6649 
waiting on condition [0x00007f99f7baa000]
    java.lang.Thread.State: TIMED_WAITING (parking)
         at sun.misc.Unsafe.park(Native Method)
         - parking to wait for  <0x00000000e0389d80> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
         at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
         at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
         at 
org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
         at java.lang.Thread.run(Thread.java:745)

    Locked ownable synchronizers:
         - None

"Executor task launch worker-7" daemon prio=10 tid=0x00007f9a8400a800 
nid=0x661a in Object.wait() [0x00007f99f6e9c000]
    java.lang.Thread.State: WAITING (on object monitor)
         at java.lang.Object.wait(Native Method)
         - waiting on <0x00000000fc9ba760> (a 
org.apache.spark.scheduler.JobWaiter)
         at java.lang.Object.wait(Object.java:503)
         at 
org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
         - locked <0x00000000fc9ba760> (a 
org.apache.spark.scheduler.JobWaiter)
         at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:530)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1734)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1804)
         at 
org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1058)
         at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
         at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
         at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
         at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1051)
         at 
DistributedCholesky$$anonfun$compute_sigma_A$1.apply(DistributedCholesky.scala:323)
         at 
DistributedCholesky$$anonfun$compute_sigma_A$1.apply(DistributedCholesky.scala:321)
         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
         at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
         at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
         at 
scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
         at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
         at 
org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1056)
         at 
org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1056)
         at 
org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1803)
         at 
org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1803)
         at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
         at org.apache.spark.scheduler.Task.run(Task.scala:70)
         at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
         at java.lang.Thread.run(Thread.java:745)

    Locked ownable synchronizers:
         - <0x00000000fc9bac50> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)

"qtp1523343281-40 Acceptor0 SelectChannelConnector@0.0.0.0:4040" daemon 
prio=10 tid=0x00007f9adc95c800 nid=0x65f4 runnable [0x00007f9ab4ba3000]
    java.lang.Thread.State: RUNNABLE
         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
         at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
         - locked <0x00000000e075b488> (a java.lang.Object)
         at 
org.eclipse.jetty.server.nio.SelectChannelConnector.accept(SelectChannelConnector.java:109)
         at 
org.eclipse.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.java:938)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
         at java.lang.Thread.run(Thread.java:745)

    Locked ownable synchronizers:
         - None

"qtp1523343281-39 Selector1" daemon prio=10 tid=0x00007f9adc95b000 
nid=0x65f3 runnable [0x00007f9ab4ca4000]
    java.lang.Thread.State: RUNNABLE
         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
         - locked <0x00000000e07a9a50> (a sun.nio.ch.Util$2)
         - locked <0x00000000e07a9a60> (a 
java.util.Collections$UnmodifiableSet)
         - locked <0x00000000e038a2b8> (a sun.nio.ch.EPollSelectorImpl)
         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
         at 
org.eclipse.jetty.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:569)
         at 
org.eclipse.jetty.io.nio.SelectorManager$1.run(SelectorManager.java:290)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
         at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
         at java.lang.Thread.run(Thread.java:745)

    Locked ownable synchronizers:
         - None



"VM Thread" prio=10 tid=0x00007f9adc07e800 nid=0x65d2 runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f9adc020000 
nid=0x65ca runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f9adc022000 
nid=0x65cb runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f9adc023800 
nid=0x65cc runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f9adc025800 
nid=0x65cd runnable

"GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f9adc027800 
nid=0x65ce runnable

"GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f9adc029800 
nid=0x65cf runnable

"GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f9adc02b000 
nid=0x65d0 runnable

"GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f9adc02d000 
nid=0x65d1 runnable

"VM Periodic Task Thread" prio=10 tid=0x00007f9adc0c1800 nid=0x65d9 
waiting on condition

JNI global references: 208


On 14/09/15 19:45, Sean Owen wrote:
> There isn't a cycle in your graph, since although you reuse reference
> variables in your code called A and B you are in fact creating new
> RDDs at each operation. You have some other problem, and you'd have to
> provide detail on why you think something is deadlocked, like a thread
> dump.
>
> On Mon, Sep 14, 2015 at 10:42 AM, petranidis <pnyfantis@gmail.com> wrote:
>> Hi all,
>>
>> I am new to spark and I have writen a few spark programs mostly around
>> machine learning
>> applications.
>>
>> I am trying to resolve a particular problem where there are two RDDs that
>> should be updated
>> by using elements of each other. More specifically, if the two pair RDDs are
>> called A and B M
>> is a matrix that specifies which elements of each RDD should be taken into
>> account when
>> computing the other with rows of M corresponding to elements of A and
>> columns to elements
>> of B e.g.
>>
>> A = (0, 6), (1,7), (2,8)
>> B = (0, 4), (1,6), (2,1)
>> and
>> M =
>>   0 1 1
>>   1 1 0
>> 0 1 0
>>
>> Then
>>
>> for (it =0;it < 10; it++) {
>> A(0) = B(1) + B(2)
>> A(1) = B(0) + B(1)
>> A(2) = B(1)
>> B(0) = A(1)
>> B(1) = A(0) + A(1) + A(2)
>> B(2) = A(0)
>> }
>>
>> To do such a computation in spark, I used
>> A = A.map( (key,val) => { B.aggregate(...) })
>> B = B.map( (key,val) => { A.aggregate(...) })
>>
>> where if the key of each mapped element keyA is passed in the aggregate
>> function as a
>> initialization parameter and then for each B element key keyB, if M(keyA,
>> keyB) ==1
>> then the B element is being taken into account in the summation.
>>
>> The calculation of A is done successfully and correctly, but then the DAG
>> scheduler
>> seems to deadlock when the calculation of B happens. This behaviour goes
>> away
>> when I remove the A.aggregate bit in my code. Apparently according to the
>> logs the
>> scheduler is expecting some results before if can go on but the results
>> should already
>> have been calculated.
>>
>> I assume that this has to do with the DAG scheduling not handling cyclical
>> dependencies.
>> Is there a way I can force each iteration or update of A and B to be seen as
>> a separate
>> stage? Otherwise, how can I implement this type of aggregation in another
>> way? (It could
>> be the equivalent of mapping the A elements to a List of all the B elements
>> for which the M
>> matrix entry is 1 and then mapping again to their sum, but this means I need
>> a lot of space
>> especially when the problem in hand could be very large, which is
>> unfeasible, so I need to avoid this)
>>
>> Thanks in advance for your help!
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DAG-Scheduler-deadlock-when-two-RDDs-reference-each-other-force-Stages-manually-tp24684.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message