flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9755) Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated to the responsible thread
Date Tue, 10 Jul 2018 07:56:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16538213#comment-16538213
] 

ASF GitHub Bot commented on FLINK-9755:
---------------------------------------

Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201242662
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
    @@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) {
     			return false;
     		}
     
    -		boolean needMoreBuffers = false;
    -		synchronized (bufferQueue) {
    -			checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating
buffers.");
    +		boolean recycleBuffer = true;
    +		try {
    +			boolean needMoreBuffers = false;
    +			synchronized (bufferQueue) {
    +				checkState(isWaitingForFloatingBuffers,
    +					"This channel should be waiting for floating buffers.");
    +
    +				// Important: double check the isReleased state inside synchronized block, so there
is no
    +				// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
    +				if (isReleased.get() ||
    +					bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
    +					isWaitingForFloatingBuffers = false;
    +					buffer.recycleBuffer();
    +					return false;
    +				}
     
    -			// Important: double check the isReleased state inside synchronized block, so there
is no
    -			// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
    -			if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers)
{
    -				isWaitingForFloatingBuffers = false;
    -				buffer.recycleBuffer();
    -				return false;
    -			}
    +				// note: this call may fail, for better cleanup, increase the counter first
    +				if (unannouncedCredit.getAndAdd(1) == 0) {
    +					notifyCreditAvailable();
    --- End diff --
    
    From failure point, `notifyCreditAvailable` should be called before `bufferQueue.addFloatingBuffer`.
But from another point,  it seems more strict in logic to confirm buffer ready before announcing
credit, otherwise the channel may receive new data before queuing this floating buffer, although
it can hardly happen based on current implementation.
    
    Another concern is that `notifyCreditAvailable` itself is thread safe. But considering
handling failure easily, we place it under `synchronized` part. Currently the process of `notifyCreditAvailable`
is very lightweight, so the cost can be ignored.
    
    Maybe the above two concerns are unnecessary, and I can accept the current modifications.


> Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated to the responsible
thread
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9755
>                 URL: https://issues.apache.org/jira/browse/FLINK-9755
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> The credit-based flow control implementation of RemoteInputChannel#notifyBufferAvailable()
does not forward errors (like the {{IllegalStateException}}) to the thread that is being notified.
The calling code at {{LocalBufferPool#recycle}}, however, relies on the callback forwarding
errors and completely ignores any failures.
> Therefore, we could end up with a program waiting forever for the callback and not even
a failure message in the logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message