flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3554] [streaming] Emit a MAX Watermark ...
Date Tue, 01 Mar 2016 13:37:52 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1750#discussion_r54566728
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
---
    @@ -66,19 +71,60 @@ public void run(final Object lockingObject, final Output<StreamRecord<OUT>>
coll
     				throw new Exception(String.valueOf(timeCharacteristic));
     		}
     
    -		userFunction.run(ctx);
    -
    -		ctx.close();
    +		// copy to a field to give the 'cancel()' method access
    +		this.ctx = ctx;
    +		
    +		try {
    +			userFunction.run(ctx);
    +
    +			// if we get here, then the user function either exited after being done (finite source)
    +			// or the function was canceled or stopped. For the finite source case, we should
emit
    +			// a final watermark that indicates that we reached the end of event-time
    +			if (!isCanceledOrStopped()) {
    +				ctx.emitWatermark(Watermark.MAX_WATERMARK);
    +			}
    +		} finally {
    +			// make sure that the context is closed in any case
    +			ctx.close();
    +		}
     	}
     
     	public void cancel() {
    +		// important: marking the source as stopped has to happen before the function is stopped.
    +		// the flag that tracks this status is volatile, to the memory model also guarantees
    --- End diff --
    
    typo in comment: to => so


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message