flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shannon Carey (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()
Date Wed, 12 Oct 2016 15:24:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569025#comment-15569025

Shannon Carey commented on FLINK-4803:

Yes, that's right. cancel() blocks on close(), and therefore if close() misbehaves the thread
is never interrupted and cancel() blocks forever.

In the issue description, I suggested your option #2. I think you'll want #1 no matter what.
However, #2 allows for at least one message and/or exception to be logged that tells the user
what went wrong (why their job is taking a long time to cancel, or why it did not cancel gracefully).
I'm not sure what your DataSink-specific option would look like. Maybe it is similar to my
workaround, where I wrapped my HadoopOutputFormat in a subclass that calls super.close() from
a separate thread with a timeout? That workaround is ok, but I had to expend a fair amount
of effort to figure out what the problem was, and also there was nothing I could do but restart
Flink in order to get my job to terminate (not a desirable solution). You'll want Flink to
function smoothly regardless of what data sink the user chooses.

> Job Cancel can hang forever waiting for OutputFormat.close()
> ------------------------------------------------------------
>                 Key: FLINK-4803
>                 URL: https://issues.apache.org/jira/browse/FLINK-4803
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.1
>            Reporter: Shannon Carey
> If the Flink job uses a badly-behaved OutputFormat (in this example, a HadoopOutputFormat
containing a CqlBulkOutputFormat), where the close() method blocks forever, it is impossible
to cancel the Flink job even though the blocked thread would respond to an interrupt. The
stack traces below show the state of the important threads when a job is canceled and the
OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on `this.format.close()`.
When the timeout is reached, the Task thread should be interrupted.
> {code}
> "Canceler for DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2)
(2/5)" #6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for monitor
entry [0x00007fb7be079000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
>         at org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
>         at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
>         at java.lang.Thread.run(Thread.java:745)
> "DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)"
#6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on condition [0x00007fb7bdf78000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006c5ab5e20> (a java.util.concurrent.SynchronousQueue$TransferStack)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
>         at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
>         at java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
>         at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
>         at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
>         at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
>         at org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
>         at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
>         at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
>         at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - locked <0x00000006bae5f788> (a java.lang.Object)
>         at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> {code}

This message was sent by Atlassian JIRA

View raw message