flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhijiangW <...@git.apache.org>
Subject [GitHub] flink pull request #6272: [FLINK-9755][network] forward exceptions in Remote...
Date Tue, 10 Jul 2018 07:55:49 GMT
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201242662
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
    @@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) {
     			return false;
     		}
     
    -		boolean needMoreBuffers = false;
    -		synchronized (bufferQueue) {
    -			checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating
buffers.");
    +		boolean recycleBuffer = true;
    +		try {
    +			boolean needMoreBuffers = false;
    +			synchronized (bufferQueue) {
    +				checkState(isWaitingForFloatingBuffers,
    +					"This channel should be waiting for floating buffers.");
    +
    +				// Important: double check the isReleased state inside synchronized block, so there
is no
    +				// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
    +				if (isReleased.get() ||
    +					bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
    +					isWaitingForFloatingBuffers = false;
    +					buffer.recycleBuffer();
    +					return false;
    +				}
     
    -			// Important: double check the isReleased state inside synchronized block, so there
is no
    -			// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
    -			if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers)
{
    -				isWaitingForFloatingBuffers = false;
    -				buffer.recycleBuffer();
    -				return false;
    -			}
    +				// note: this call may fail, for better cleanup, increase the counter first
    +				if (unannouncedCredit.getAndAdd(1) == 0) {
    +					notifyCreditAvailable();
    --- End diff --
    
    From failure point, `notifyCreditAvailable` should be called before `bufferQueue.addFloatingBuffer`.
But from another point,  it seems more strict in logic to confirm buffer ready before announcing
credit, otherwise the channel may receive new data before queuing this floating buffer, although
it can hardly happen based on current implementation.
    
    Another concern is that `notifyCreditAvailable` itself is thread safe. But considering
handling failure easily, we place it under `synchronized` part. Currently the process of `notifyCreditAvailable`
is very lightweight, so the cost can be ignored.
    
    Maybe the above two concerns are unnecessary, and I can accept the current modifications.


---

Mime
View raw message