flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #6272: [FLINK-9755][network] forward exceptions in Remote...
Date Thu, 12 Jul 2018 09:39:16 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201971896
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
    @@ -360,32 +360,44 @@ public boolean notifyBufferAvailable(Buffer buffer) {
     			return false;
     		}
     
    -		boolean needMoreBuffers = false;
    -		synchronized (bufferQueue) {
    -			checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating
buffers.");
    +		boolean recycleBuffer = true;
    +		try {
    +			boolean needMoreBuffers = false;
    +			synchronized (bufferQueue) {
    +				checkState(isWaitingForFloatingBuffers,
    +					"This channel should be waiting for floating buffers.");
    +
    +				// Important: double check the isReleased state inside synchronized block, so there
is no
    +				// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
    +				if (isReleased.get() ||
    +					bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
    +					isWaitingForFloatingBuffers = false;
    +					buffer.recycleBuffer();
    +					return false;
    +				}
     
    -			// Important: double check the isReleased state inside synchronized block, so there
is no
    -			// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
    -			if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers)
{
    -				isWaitingForFloatingBuffers = false;
    -				buffer.recycleBuffer();
    -				return false;
    -			}
    +				recycleBuffer = false;
    +				bufferQueue.addFloatingBuffer(buffer);
     
    -			bufferQueue.addFloatingBuffer(buffer);
    +				if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
    +					isWaitingForFloatingBuffers = false;
    +				} else {
    +					needMoreBuffers = true;
    +				}
     
    -			if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
    -				isWaitingForFloatingBuffers = false;
    -			} else {
    -				needMoreBuffers =  true;
    +				if (unannouncedCredit.getAndAdd(1) == 0) {
    +					notifyCreditAvailable();
    +				}
     			}
    -		}
     
    -		if (unannouncedCredit.getAndAdd(1) == 0) {
    -			notifyCreditAvailable();
    +			return needMoreBuffers;
    +		} catch (Throwable t) {
    +			if (recycleBuffer) {
    +				buffer.recycleBuffer();
    +			}
    +			setError(t);
    +			return false;
    --- End diff --
    
    The protection (currently) is against the `checkState` here and in `notifyCreditAvailable()`
(the latter also has some external calls).
    We could return `needMoreBuffers` here but I actually see this exception as if it happened
in the responsible thread and therefore not continue to use this listener.


---

Mime
View raw message