flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] rkhachatryan commented on a change in pull request #10151: [FLINK-14231] Handle the processing-time timers before closing operator to properly support endInput
Date Fri, 15 Nov 2019 11:25:41 GMT
rkhachatryan commented on a change in pull request #10151: [FLINK-14231] Handle the processing-time
timers before closing operator to properly support endInput
URL: https://github.com/apache/flink/pull/10151#discussion_r346763174
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -1395,6 +1381,114 @@ private void checkpointStreamOperator(StreamOperator<?> op)
throws Exception {
 		}
 	}
 
+	/**
+	 * This class executes {@link StreamOperator#close()} of all operators in the chain
+	 * of this {@link StreamTask} one by one through the mailbox thread. Closing happens
+	 * from <b>head to tail</b> operator in the chain, contrary to {@link StreamOperator#open()}
+	 * which happens <b>tail to head</b> (see {@link #openAllOperators()}.
 
 Review comment:
   Here we are traversing chain "from outside". Alternative approach would be to allow operators
to pass "close" message to each other (when they're ready). So when the 1st op receives it,
it waits for all its timers to fire and then emits "close" message to all its outputs.
   
   Pros:
   1. will work for any topology, not only for linear chains
   2. less coupling
   3. (subjective) more simple
   
   Cons:
   1. changes to PublicEvolving API (though we probably have to do it anyway to ask operator
for maximum wait time)
   2. depending on implementation, the responsibility of passing the signal is moved to the
user code
   
   Implementation could like this:
   1. add constant StreamStatus.FINISHED
   2. in StreamProcessor: call output.emitStreamStatus(StreamStatus.FINISHED) if END_OF_INPUT
   3. add method endOfInput() to org.apache.flink.streaming.api.operators.Output
   4. add method endOfInput() to StreamOperator // contract: call output.endOfInput when ready
(e.g. using ProcessingTimeService)
   5. in operatorChain.toggleStreamStatus call operator.endOfInput if status is EOI
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message