Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/6272#discussion_r201969455
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
---
@@ -277,37 +277,17 @@ public void recycle(MemorySegment segment) {
// We do not know which locks have been acquired before the recycle() or are needed
in the
// notification and which other threads also access them.
// -> call notifyBufferAvailable() outside of the synchronized block to avoid a
deadlock (FLINK-9676)
- boolean success = false;
- boolean needMoreBuffers = false;
- try {
- needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- success = true;
- } catch (Throwable ignored) {
- // handled below, under the lock
- }
+ // Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer
and
+ // therefore end up in this method again.
+ needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (!success || needMoreBuffers) {
+ if (needMoreBuffers) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
// cleanup tasks how they would have been done if we only had one synchronized block
- if (needMoreBuffers) {
- listener.notifyBufferDestroyed();
- }
- if (!success) {
- returnMemorySegment(segment);
- }
+ listener.notifyBufferDestroyed();
} else {
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
- if (!success) {
- if (numberOfRequestedMemorySegments > currentPoolSize) {
- returnMemorySegment(segment);
- } else {
- availableMemorySegments.add(segment);
- availableMemorySegments.notify();
- }
- }
+ registeredListeners.add(listener);
--- End diff --
that's how it always was:
a) if the exception was caught in the `notifyBufferAvailable()` handler, `needMoreBuffers`
is `false` and we expect the error handling in that method to fail the logic that the thread
waiting on the listener is relying on - otherwise this thread's error handling needs to re-register.
b) if `notifyBufferAvailable()` throws (this is not allowed anymore!), previous logic
also did not re-add the listener
---
|