flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Julien Nioche (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-11783) Deadlock during Join operation
Date Sat, 02 Mar 2019 07:23:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782311#comment-16782311
] 

Julien Nioche commented on FLINK-11783:
---------------------------------------

Same issue with a different job on the same cluster, the problem occurs on the same node.
This time during a reduce step.

 

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:67)) (11/150)" #20948 prio=5
os_prio=0 tid=0x00007faa7c027800 nid=0x57d0 in Object.wait() [0x00007fa981df6000]

   java.lang.Thread.State: WAITING (on object monitor)

--

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:67)) (66/150)" #20947 prio=5
os_prio=0 tid=0x00007faa7c026000 nid=0x57cf in Object.wait() [0x00007fa9815ee000]

   java.lang.Thread.State: WAITING (on object monitor)

--

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:67)) (10/150)" #20943 prio=5
os_prio=0 tid=0x00007faa7c024800 nid=0x57cb in Object.wait() [0x00007fa980ae9000]

   java.lang.Thread.State: WAITING (on object monitor)

--

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:67)) (65/150)" #20942 prio=5
os_prio=0 tid=0x00007faa7c023800 nid=0x57ca in Object.wait() [0x00007fa9804e3000]

   java.lang.Thread.State: WAITING (on object monitor)

--

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:67)) (69/150)" #20941 prio=5
os_prio=0 tid=0x00007faa7c022800 nid=0x57c9 in Object.wait() [0x00007fa9807e6000]

   java.lang.Thread.State: WAITING (on object monitor)

--

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:67)) (67/150)" #20940 prio=5
os_prio=0 tid=0x00007faa7c021800 nid=0x57c8 in Object.wait() [0x00007fa9d2bf0000]

   java.lang.Thread.State: WAITING (on object monitor)

--

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:67)) (19/150)" #20939 prio=5
os_prio=0 tid=0x00007faa7c020800 nid=0x57c7 in Object.wait() [0x00007fa980dec000]

   java.lang.Thread.State: WAITING (on object monitor)

--

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:67)) (20/150)" #20938 prio=5
os_prio=0 tid=0x00007faa7c01f800 nid=0x57c6 in Object.wait() [0x00007fa98c89b000]

   java.lang.Thread.State: WAITING (on object monitor)

--

"GroupReduce (GroupReduce at <init>(*GlobalStatsJob*.java:66)) (72/150)" #12276 prio=5
os_prio=0 tid=0x00007faa0c02f000 nid=0x33a4 waiting on condition [0x00007fa981ff8000]

   java.lang.Thread.State: TIMED_WAITING (parking)

> Deadlock during Join operation
> ------------------------------
>
>                 Key: FLINK-11783
>                 URL: https://issues.apache.org/jira/browse/FLINK-11783
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataSet
>    Affects Versions: 1.7.2
>            Reporter: Julien Nioche
>            Priority: Major
>         Attachments: flink_is_stuck.png
>
>
> I am running a filtering job on a large dataset with Flink running in distributed mode.
Most tasks in the Join operation have completed a while ago and only the tasks from a particular
TaskManager are still running. These tasks make progress but extremely slowly.
> When logging onto the machine running this TM I can see that all threads are TIMED_WAITING .
> Could there be a synchronization problem?
> See attachment for a screenshot of the Flink UI and the stack below.
>  
> *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}*
> {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at <init>(DataSetFilterJob.java:67))
(66/150)" #155 prio=5 os_prio=0 tid=0x00007faa5c01c000 nid=0x248c waiting on condition [0x00007fa9d15d5000]}}
> {{ java.lang.Thread.State: TIMED_WAITING (parking)}}
> {{ at sun.misc.Unsafe.park(Native Method)}}
> {{ - parking to wait for <0x00000007bfa89578> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}}
> {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}}
> {{ at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}}
> {{ at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}}
> {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}}
> {{--}}
> {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at <init>(DataSetFilterJob.java:67))
(65/150)" #154 prio=5 os_prio=0 tid=0x00007faa5c01b000 nid=0x248b waiting on condition [0x00007fa9d14d4000]}}
> {{ java.lang.Thread.State: TIMED_WAITING (parking)}}
> {{ at sun.misc.Unsafe.park(Native Method)}}
> {{ - parking to wait for <0x00000007b8e0eb50> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}}
> {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}}
> {{ at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}}
> {{ at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}}
> {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}}
> {{--}}
> {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at <init>(DataSetFilterJob.java:67))
(68/150)" #153 prio=5 os_prio=0 tid=0x00007faa5c019800 nid=0x248a waiting on condition [0x00007fa981df6000]}}
> {{ java.lang.Thread.State: TIMED_WAITING (parking)}}
> {{ at sun.misc.Unsafe.park(Native Method)}}
> {{ - parking to wait for <0x0000000774903a00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}}
> {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}}
> {{ at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}}
> {{ at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView.nextSegment(HeaderlessChannelReaderInputView.java:90)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}}
> {{ at org.apache.flink.types.StringValue.readString(StringValue.java:769)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}}
> {{--}}
> {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at <init>(DataSetFilterJob.java:67))
(67/150)" #152 prio=5 os_prio=0 tid=0x00007faa5c018800 nid=0x2489 waiting on condition [0x00007fa98c194000]}}
> {{ java.lang.Thread.State: TIMED_WAITING (parking)}}
> {{ at sun.misc.Unsafe.park(Native Method)}}
> {{ - parking to wait for <0x00000007b08a0508> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}}
> {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}}
> {{ at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}}
> {{ at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}}
> {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}}
> {{--}}
> {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at <init>(DataSetFilterJob.java:67))
(62/150)" #151 prio=5 os_prio=0 tid=0x00007faa5c017800 nid=0x2488 in Object.wait() [0x00007fa98c295000]}}
> {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}}
> {{ at java.lang.Object.wait(Native Method)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:127)}}
> {{ - locked <0x00000007ab60d660> (a java.lang.Object)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:159)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:877)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)}}
> {{ at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116)}}
> {{ at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)}}
> {{ at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)}}
> {{ at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}{{"CHAIN Join (Join at with(JoinOperator.java:543))
-> Map (Map at <init>(DataSetFilterJob.java:67)) (64/150)" #150 prio=5 os_prio=0
tid=0x00007faa5c016800 nid=0x2487 in Object.wait() [0x00007fa981ef7000]}}
> {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}}
> {{ at java.lang.Object.wait(Native Method)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:127)}}
> {{ - locked <0x00000007ab60d650> (a java.lang.Object)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:159)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:877)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)}}
> {{ at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116)}}
> {{ at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)}}
> {{ at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)}}
> {{ at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}{{"CHAIN Join (Join at with(JoinOperator.java:543))
-> Map (Map at <init>(DataSetFilterJob.java:67)) (63/150)" #149 prio=5 os_prio=0
tid=0x00007faa5c015800 nid=0x2486 waiting on condition [0x00007fa9d0fcf000]}}
> {{ java.lang.Thread.State: TIMED_WAITING (parking)}}
> {{ at sun.misc.Unsafe.park(Native Method)}}
> {{ - parking to wait for <0x000000077439ea00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}}
> {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}}
> {{ at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}}
> {{ at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView.nextSegment(HeaderlessChannelReaderInputView.java:90)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}}
> {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}}
> {{ at org.apache.flink.types.StringValue.readString(StringValue.java:769)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}}
> {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}}
> {{--}}
> {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at <init>(DataSetFilterJob.java:67))
(61/150)" #148 prio=5 os_prio=0 tid=0x00007faa5c014800 nid=0x2485 in Object.wait() [0x00007fa9d2cf1000]}}
> {{ java.lang.Thread.State: TIMED_WAITING (on object monitor)}}
> {{ at java.lang.Object.wait(Native Method)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:127)}}
> {{ - locked <0x00000007ab60d640> (a java.lang.Object)}}
> {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:159)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:877)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)}}
> {{ at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)}}
> {{ at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116)}}
> {{ at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)}}
> {{ at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)}}
> {{ at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message