flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] rkhachatryan commented on a change in pull request #10151: [FLINK-14231] Handle the processing-time timers before closing operator to properly support endInput
Date Fri, 15 Nov 2019 11:25:41 GMT
rkhachatryan commented on a change in pull request #10151: [FLINK-14231] Handle the processing-time
timers before closing operator to properly support endInput
URL: https://github.com/apache/flink/pull/10151#discussion_r346754713
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -1395,6 +1381,114 @@ private void checkpointStreamOperator(StreamOperator<?> op)
throws Exception {
 		}
 	}
 
+	/**
+	 * This class executes {@link StreamOperator#close()} of all operators in the chain
+	 * of this {@link StreamTask} one by one through the mailbox thread. Closing happens
+	 * from <b>head to tail</b> operator in the chain, contrary to {@link StreamOperator#open()}
+	 * which happens <b>tail to head</b> (see {@link #openAllOperators()}.
+	 *
+	 * <p>Before closing an operator, it quiesces the processing time service of the operator
+	 * and wait for all pending timers to finish (execute or cancel). And after closing, the
+	 * {@link org.apache.flink.streaming.api.operators.BoundedOneInput#endInput} of its next
+	 * operator is called to notify that the input is finished.
+	 */
+	protected final class ClosingOperatorOperation {
+
+		private final MailboxExecutor mainMailboxExecutor;
+
+		private final CompletableFuture<?> closeFuture;
+
+		public ClosingOperatorOperation() {
+			this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
+			this.closeFuture = new CompletableFuture<>();
+		}
+
+		public CompletableFuture<?> closeAllOperatorsAsync() {
+			sendQuiescingTimeServiceLetter(getHeadOperatorIndex());
+			return closeFuture;
+		}
+
+		private void sendQuiescingTimeServiceLetter(int operatorIndex) {
+			mainMailboxExecutor.execute(
+				() -> quiesceTimerService(operatorIndex), "quiesceProcessingTimerService(index: "
+ operatorIndex + ")");
+		}
+
+		private void sendClosingOperatorLetter(int operatorIndex) {
+			mainMailboxExecutor.execute(
+				() -> closeOperator(operatorIndex), "closeOperator(index: " + operatorIndex + ")");
+		}
+
+		private void quiesceTimerService(int operatorIndex) {
+			CompletableFuture<?> timersDoneFuture = null;
+
+			StreamOperator<?> operator = getOperator(operatorIndex);
+			if (operator != null) {
+				ProcessingTimeServiceImpl processingTimeService = processingTimeServices.get(operator);
+				if (processingTimeService != null) {
+					processingTimeService.quiesce();
+					timersDoneFuture = processingTimeService.getTimersDoneFutureAfterQuiescing();
+				}
+			}
+			if (timersDoneFuture == null) {
+				timersDoneFuture = CompletableFuture.completedFuture(null);
+			}
+
+			timersDoneFuture.thenRun(() -> sendClosingOperatorLetter(operatorIndex));
+		}
+
+		private void closeOperator(int operatorIndex) {
+			StreamOperator<?> operator = getOperator(operatorIndex);
+
+			synchronized (getCheckpointLock()) {
+				try {
+					if (operator != null) {
+						operator.close();
+					}
+				} catch (Throwable t) {
+					handleAsyncException("Caught exception while closing operator.", t);
+				}
+
+				try {
+					if (!isTailOperator(operatorIndex)) {
+						// The operators on the chain, except for the head operator, must be one-input operators.
+						// So after the upstream operator on the chain is closed, the input of its downstream
operator
+						// reaches the end.
+						operatorChain.endNonHeadOperatorInput(getNextOperator(operatorIndex));
+					}
+				} catch (Throwable t) {
+					handleAsyncException("Caught exception while processing the endInput of operator.",
t);
+				}
+			}
+
+			if (!isTailOperator(operatorIndex)) {
+				sendQuiescingTimeServiceLetter(getNextOperatorIndex(operatorIndex));
+				return;
+			}
+
+			closeFuture.complete(null);
+		}
+
+		private int getHeadOperatorIndex() {
 
 Review comment:
   IMO it's operatorChain responsibility to find operator by index, check if it's tail, etc.
So these methods should reside there.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message