flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
Date Wed, 02 Oct 2019 13:26:46 GMT
AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization
in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r330547315
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##########
 @@ -293,140 +252,138 @@ public void close() throws Exception {
 			waitInFlightInputsFinished();
 		}
 		finally {
-			Exception exception = null;
-
-			try {
-				super.close();
-			} catch (InterruptedException interrupted) {
-				exception = interrupted;
-
-				Thread.currentThread().interrupt();
-			} catch (Exception e) {
-				exception = e;
-			}
-
-			try {
-				// terminate the emitter, the emitter thread and the executor
-				stopResources(true);
-			} catch (InterruptedException interrupted) {
-				exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-				Thread.currentThread().interrupt();
-			} catch (Exception e) {
-				exception = ExceptionUtils.firstOrSuppressed(e, exception);
-			}
-
-			if (exception != null) {
-				LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
-			}
+			super.close();
 		}
 	}
 
-	@Override
-	public void dispose() throws Exception {
-		Exception exception = null;
+	/**
+	 * Add the given stream element to the operator's stream element queue. This operation blocks
until the element
+	 * has been added.
+	 *
+	 * <p>Between two insertion attempts, this method yields the execution to the mailbox,
such that events as well
+	 * as asynchronous results can be processed.
+	 *
+	 * @param streamElement to add to the operator's queue
+	 * @throws InterruptedException if the current thread has been interrupted while yielding
to mailbox
+	 * @return a handle that allows to set the result of the async computation for the given
element.
+	 */
+	private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) throws InterruptedException
{
+		assert(Thread.holdsLock(checkpointingLock));
 
-		try {
-			super.dispose();
-		} catch (InterruptedException interrupted) {
-			exception = interrupted;
+		pendingStreamElement = streamElement;
 
 Review comment:
   Fixed in hotfix.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message