[ https://issues.apache.org/jira/browse/FLINK-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16538259#comment-16538259
]
ASF GitHub Bot commented on FLINK-9755:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/6272#discussion_r201260221
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) {
return false;
}
- boolean needMoreBuffers = false;
- synchronized (bufferQueue) {
- checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating
buffers.");
+ boolean recycleBuffer = true;
+ try {
+ boolean needMoreBuffers = false;
+ synchronized (bufferQueue) {
+ checkState(isWaitingForFloatingBuffers,
+ "This channel should be waiting for floating buffers.");
+
+ // Important: double check the isReleased state inside synchronized block, so there
is no
+ // race condition when notifyBufferAvailable and releaseAllResources running in parallel.
+ if (isReleased.get() ||
+ bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+ isWaitingForFloatingBuffers = false;
+ buffer.recycleBuffer();
+ return false;
+ }
- // Important: double check the isReleased state inside synchronized block, so there
is no
- // race condition when notifyBufferAvailable and releaseAllResources running in parallel.
- if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers)
{
- isWaitingForFloatingBuffers = false;
- buffer.recycleBuffer();
- return false;
- }
+ // note: this call may fail, for better cleanup, increase the counter first
+ if (unannouncedCredit.getAndAdd(1) == 0) {
+ notifyCreditAvailable();
--- End diff --
yes, this one is a tricky one:
Logically it should be: first adding the floating buffer, then enqueuing the channel.
However, `notifyCreditAvailable()` is outside our control and could potentially fail. If it
fails, we have two options:
1) revert any change done by `notifyBufferAvailable()` and freeing the buffer so that
it can be directly re-used by `LocalBufferPool`
2) keep everything done by `notifyBufferAvailable()` and rely on `setError()` to clean
up eventually - this means we should not recycle the buffer if it was already added to the
`bufferQueue`.
Giving it a second thought, it may indeed be better to go for solution 2 and keep the
logical execution as you proposed (infact, now I can't really think of a reason why I did
not do that). We should be able to steer the recycling via the `recycleBuffer` as is already
done in other cases.
> Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated to the responsible
thread
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-9755
> URL: https://issues.apache.org/jira/browse/FLINK-9755
> Project: Flink
> Issue Type: Bug
> Components: Network
> Affects Versions: 1.5.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The credit-based flow control implementation of RemoteInputChannel#notifyBufferAvailable()
does not forward errors (like the {{IllegalStateException}}) to the thread that is being notified.
The calling code at {{LocalBufferPool#recycle}}, however, relies on the callback forwarding
errors and completely ignores any failures.
> Therefore, we could end up with a program waiting forever for the callback and not even
a failure message in the logs.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
|