flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] zhijiangW commented on a change in pull request #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes
Date Thu, 24 Oct 2019 10:31:06 GMT
zhijiangW commented on a change in pull request #9854: [FLINK-14230][task] Change the endInput
call of the downstream operator to after the upstream operator closes
URL: https://github.com/apache/flink/pull/9854#discussion_r338499468
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##########
 @@ -244,46 +239,32 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception
{
 	}
 
 	/**
-	 * Ends an input (specified by {@code inputId}) of the {@link StreamTask}. The {@code inputId}
+	 * Ends the input of an operator, which specified by {@code inputId}). The {@code inputId}
 	 * is numbered starting from 1, and `1` indicates the first input.
 	 *
+	 * @param operatorIndex The index of the operator in the operator list of the chain.
 	 * @param inputId The ID of the input.
-	 * @throws Exception if some exception happens in the endInput function of an operator.
+	 * @throws Exception if some exception happens in the endInput method of the operator.
 	 */
-	public void endInput(int inputId) throws Exception {
-		if (finishedInputs.areAllInputsSelected()) {
-			return;
-		}
-
-		if (headOperator instanceof TwoInputStreamOperator) {
-			if (finishedInputs.isInputSelected(inputId)) {
-				return;
-			}
-
-			if (headOperator instanceof BoundedMultiInput) {
-				((BoundedMultiInput) headOperator).endInput(inputId);
-			}
+	public void endOperatorInput(int operatorIndex, int inputId) throws Exception {
 
 Review comment:
   This method could be removed actually, because the array of `allOperators` was already
looped in `StreamTask`.
   
   We could provide two methods in `OperatorChain`
   `#endNonHeadOperatorInput(StreamOperator<?> streamOperator)` : it is used by `StreamTask#closeAllOperators`.
   `#endHeadOperatorInput(int inputIndex)` : it is used by `StreamOne/TwoInputProcessor, StreamSource`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message