flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Piotr Nowojski (Jira)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-22377) Ignore if writer is stopped when aborting channel state writing
Date Tue, 04 May 2021 08:34:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-22377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Piotr Nowojski updated FLINK-22377:
-----------------------------------
    Description: 
When channel state write is being aborted it can happen that writer is already stopped.
In that case, an error will be thrown:
{code} 
2021-04-19 09:05:52,716 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask        
 Could not perform checkpoint 401 for operator Source: Custom Source (5/6)#0 while the invokable
was not in state running.
java.lang.RuntimeException: unable to send request to worker
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:229)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.abort(ChannelStateWriterImpl.java:190)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cleanup(SubtaskCheckpointCoordinatorImpl.java:472)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1062)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1046)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:964)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:936)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) [flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) [flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.lang.IllegalStateException: not running
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitPriority(ChannelStateWriteRequestExecutorImpl.java:133)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:224)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        ... 18 more
{code}

Need to 
- check whether abort calls are valid (why is the "IllegalStateException: not running" being
thrown? who is closing/aborting channelStateWriter? Do we need to have this kind of race condition
and async closing of the `channelStateWrtier`?)
- don't throw an exception if not running for abort calls (either ignore the exception, or
make closing single threaded from task thread and check that `channelStateWriter` is closed
before accessing it in `notifyCheckpointAborted`)
- add exception as suppressed (if any) in SubtaskCheckpointCoordinatorImpl.cleanup


  was:
When channel state write is being aborted it can happen that writer is already stopped.
In that case, an error will be thrown:
{code} 
2021-04-19 09:05:52,716 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask        
 Could not perform checkpoint 401 for operator Source: Custom Source (5/6)#0 while the invokable
was not in state running.
java.lang.RuntimeException: unable to send request to worker
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:229)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.abort(ChannelStateWriterImpl.java:190)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cleanup(SubtaskCheckpointCoordinatorImpl.java:472)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1062)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1046)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:964)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:936)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) [flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) [flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.lang.IllegalStateException: not running
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitPriority(ChannelStateWriteRequestExecutorImpl.java:133)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:224)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        ... 18 more
{code}

Need to 
- check whether abort calls are valid
- don't throw an exception if not running for abort calls
- add exception as suppressed (if any) in SubtaskCheckpointCoordinatorImpl.cleanup



> Ignore if writer is stopped when aborting channel state writing
> ---------------------------------------------------------------
>
>                 Key: FLINK-22377
>                 URL: https://issues.apache.org/jira/browse/FLINK-22377
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network, Runtime / Task
>    Affects Versions: 1.13.0
>            Reporter: Roman Khachatryan
>            Priority: Minor
>
> When channel state write is being aborted it can happen that writer is already stopped.
> In that case, an error will be thrown:
> {code} 
> 2021-04-19 09:05:52,716 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask   
      Could not perform checkpoint 401 for operator Source: Custom Source (5/6)#0 while the
invokable was not in state running.
> java.lang.RuntimeException: unable to send request to worker
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:229)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.abort(ChannelStateWriterImpl.java:190)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cleanup(SubtaskCheckpointCoordinatorImpl.java:472)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1062)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1046)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:964)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:936)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) [flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) [flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:834) [?:?]
> Caused by: java.lang.IllegalStateException: not running
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitPriority(ChannelStateWriteRequestExecutorImpl.java:133)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:224)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         ... 18 more
> {code}
> Need to 
> - check whether abort calls are valid (why is the "IllegalStateException: not running"
being thrown? who is closing/aborting channelStateWriter? Do we need to have this kind of
race condition and async closing of the `channelStateWrtier`?)
> - don't throw an exception if not running for abort calls (either ignore the exception,
or make closing single threaded from task thread and check that `channelStateWriter` is closed
before accessing it in `notifyCheckpointAborted`)
> - add exception as suppressed (if any) in SubtaskCheckpointCoordinatorImpl.cleanup



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message