flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3554) Bounded sources should emit a Max Watermark when they are done
Date Fri, 04 Mar 2016 11:25:40 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179764#comment-15179764

ASF GitHub Bot commented on FLINK-3554:

Github user aljoscha commented on the pull request:

    Yes, that's  a problem. Could you maybe open an issue for that here: https://issues.apache.org/jira/browse/FLINK
    In the meantime, I have a workaround for you. You can use a modified version of the CountTrigger:
    public class ClosingCountTrigger<W extends Window> extends Trigger<Object, W>
    	private static final long serialVersionUID = 1L;
    	private final long maxCount;
    	private final ValueStateDescriptor<Long> stateDesc =
    			new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L);
    	private ClosingCountTrigger(long maxCount) {
    		this.maxCount = maxCount;
    	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext
ctx) throws IOException {
    		ValueState<Long> count = ctx.getPartitionedState(stateDesc);
    		long currentCount = count.value() + 1;
    		if (currentCount >= maxCount) {
    			return TriggerResult.FIRE_AND_PURGE;
    		// register timer for final watermark
    		return TriggerResult.CONTINUE;
    	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
    		// final triggering
    		return TriggerResult.FIRE_AND_PURGE;
    	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws
Exception {
    		return TriggerResult.CONTINUE;
    	public void clear(W window, TriggerContext ctx) throws Exception {
    	public String toString() {
    		return "ClosingCountTrigger(" +  maxCount + ")";
    	 * Creates a trigger that fires once the number of elements in a pane reaches the given
    	 * @param maxCount The count of elements at which to fire.
    	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
    	public static <W extends Window> ClosingCountTrigger<W> of(long maxCount)
    		return new ClosingCountTrigger<>(maxCount);
    you can use it like this:
        .trigger(ClosingCountTrigger.of(<my count>)
        .apply() // or .reduce()
    This will behave like the normal count trigger but also register a callback for the final
`Long.MAX_VALUE` watermark. Then, when the final watermark arrives it will trigger one last
window emission.

> Bounded sources should emit a Max Watermark when they are done
> --------------------------------------------------------------
>                 Key: FLINK-3554
>                 URL: https://issues.apache.org/jira/browse/FLINK-3554
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.0.0
> For proper event time support in bounded sources, these sources should emit a final watermark
before shutting down.

This message was sent by Atlassian JIRA

View raw message