flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephan Ewen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()
Date Wed, 12 Oct 2016 08:47:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568076#comment-15568076

Stephan Ewen commented on FLINK-4803:

The task thread is actually interrupted. Immediately, after the call to "cancel()" and then
periodically, if it does not exit.

The problem here seems to be that "cancel()" cannot proceed (after which the interruption
would come) because in the HadoopOutputFormatBase, it is blocked on a lock held by the regular
"close()" call.

I think there are two ways to address that:
  - Generic (1) - [FLINK-4715] - the TaskManagers should exit themselves (and rely on Yarn
/ Mesos / container service) to be restarted. This is the hard fallback that should catch
all problems with respect to cancellation.
  - Generic (2) Let the cancel() call and the interrupt() calls come from two different threads
(or add a watchdog that calls "interrupt()" if "cancel()" blocks for too long)
  - DataSink specific - attempt to call close() on cancellation, but do not block on locks
and rather throw an exception.

What is your take on each of these approaches?

> Job Cancel can hang forever waiting for OutputFormat.close()
> ------------------------------------------------------------
>                 Key: FLINK-4803
>                 URL: https://issues.apache.org/jira/browse/FLINK-4803
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.1
>            Reporter: Shannon Carey
> If the Flink job uses a badly-behaved OutputFormat (in this example, a HadoopOutputFormat
containing a CqlBulkOutputFormat), where the close() method blocks forever, it is impossible
to cancel the Flink job even though the blocked thread would respond to an interrupt. The
stack traces below show the state of the important threads when a job is canceled and the
OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on `this.format.close()`.
When the timeout is reached, the Task thread should be interrupted.
> {code}
> "Canceler for DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2)
(2/5)" #6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for monitor
entry [0x00007fb7be079000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
>         at org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
>         at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
>         at java.lang.Thread.run(Thread.java:745)
> "DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)"
#6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on condition [0x00007fb7bdf78000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006c5ab5e20> (a java.util.concurrent.SynchronousQueue$TransferStack)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
>         at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
>         at java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
>         at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
>         at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
>         at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
>         at org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
>         at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
>         at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
>         at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
>         - locked <0x00000006bae5f788> (a java.lang.Object)
>         at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> {code}

This message was sent by Atlassian JIRA

View raw message