flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9755) Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated to the responsible thread
Date Thu, 12 Jul 2018 09:36:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541383#comment-16541383
] 

ASF GitHub Bot commented on FLINK-9755:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201970654
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
    @@ -360,32 +360,44 @@ public boolean notifyBufferAvailable(Buffer buffer) {
     			return false;
     		}
     
    -		boolean needMoreBuffers = false;
    -		synchronized (bufferQueue) {
    -			checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating
buffers.");
    +		boolean recycleBuffer = true;
    +		try {
    +			boolean needMoreBuffers = false;
    +			synchronized (bufferQueue) {
    +				checkState(isWaitingForFloatingBuffers,
    +					"This channel should be waiting for floating buffers.");
    +
    +				// Important: double check the isReleased state inside synchronized block, so there
is no
    +				// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
    +				if (isReleased.get() ||
    +					bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
    +					isWaitingForFloatingBuffers = false;
    +					buffer.recycleBuffer();
    +					return false;
    +				}
     
    -			// Important: double check the isReleased state inside synchronized block, so there
is no
    -			// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
    -			if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers)
{
    -				isWaitingForFloatingBuffers = false;
    -				buffer.recycleBuffer();
    -				return false;
    -			}
    +				recycleBuffer = false;
    --- End diff --
    
    I don't think we should ignore the `checkState` since a failure there in our task is of
course an implementation error but should not leak buffers for other tasks still running and
working with this buffer pool.
    Regarding the `buffer.recycleBuffer()`, you are right: we kind of always assume this cannot
throw, but here we should maybe guard against recycling twice.


> Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated to the responsible
thread
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9755
>                 URL: https://issues.apache.org/jira/browse/FLINK-9755
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> The credit-based flow control implementation of RemoteInputChannel#notifyBufferAvailable()
does not forward errors (like the {{IllegalStateException}}) to the thread that is being notified.
The calling code at {{LocalBufferPool#recycle}}, however, relies on the callback forwarding
errors and completely ignores any failures.
> Therefore, we could end up with a program waiting forever for the callback and not even
a failure message in the logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message