flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10367) Avoid recursion stack overflow during releasing SingleInputGate
Date Thu, 22 Nov 2018 15:25:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696018#comment-16696018
] 

ASF GitHub Bot commented on FLINK-10367:
----------------------------------------

zhijiangW commented on a change in pull request #6829: [FLINK-10367][network] Introduce NotificationResult
for BufferListener to solve recursive stack overflow
URL: https://github.com/apache/flink/pull/6829#discussion_r235761668
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ##########
 @@ -258,36 +259,38 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws
Interrupte
 	@Override
 	public void recycle(MemorySegment segment) {
 		BufferListener listener;
-		synchronized (availableMemorySegments) {
-			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
-				returnMemorySegment(segment);
-				return;
-			} else {
-				listener = registeredListeners.poll();
-
-				if (listener == null) {
-					availableMemorySegments.add(segment);
-					availableMemorySegments.notify();
+		NotificationResult notificationResult = NotificationResult.NONE;
+		while (!notificationResult.bufferUsed()) {
+			synchronized (availableMemorySegments) {
+				if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
+					returnMemorySegment(segment);
 					return;
+				} else {
+					listener = registeredListeners.poll();
+
+					if (listener == null) {
+						availableMemorySegments.add(segment);
+						availableMemorySegments.notify();
+						return;
+					}
 				}
 			}
-		}
-
-		// We do not know which locks have been acquired before the recycle() or are needed in
the
-		// notification and which other threads also access them.
-		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock
(FLINK-9676)
-		// Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer
-		// (either directly or later during error handling) and therefore eventually end up in
this
-		// method again.
-		boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
 
-		if (needMoreBuffers) {
-			synchronized (availableMemorySegments) {
-				if (isDestroyed) {
-					// cleanup tasks how they would have been done if we only had one synchronized block
-					listener.notifyBufferDestroyed();
-				} else {
-					registeredListeners.add(listener);
+			// We do not know which locks have been acquired before the recycle() or are needed in
the
 
 Review comment:
   Make sense, the logic indeeds a little long for `recycle` method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Avoid recursion stack overflow during releasing SingleInputGate
> ---------------------------------------------------------------
>
>                 Key: FLINK-10367
>                 URL: https://issues.apache.org/jira/browse/FLINK-10367
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>              Labels: pull-request-available
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} will be invoked
before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to release all
the input channels, then destroy the {{BufferPool}}.  For {{RemoteInputChannel#releaseAllResources}},
it will return floating buffers to the {{BufferPool}} {{which assigns this recycled buffer
to the other listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already released before,
it will directly recycle this buffer to the {{BufferPool}} which takes another listener to
notify available buffer. The above process may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will cause {{StackOverflow}}
error because of recursion. And in our testing job, the scale of 10,000 input channels ever
caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of unregistering
this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to release
all the internal input channels. To do so, all the listeners in {{BufferPool}} will be removed
during destroying, and the input channel will not have further interactions during {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to expand another
interface method for removing buffer listener, further currently the internal data structure
in {{BufferPool}} can not support remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message