flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3554) Bounded sources should emit a Max Watermark when they are done
Date Fri, 04 Mar 2016 11:25:40 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179764#comment-15179764
] 

ASF GitHub Bot commented on FLINK-3554:
---------------------------------------

Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1750#issuecomment-192241682
  
    Yes, that's  a problem. Could you maybe open an issue for that here: https://issues.apache.org/jira/browse/FLINK
    
    In the meantime, I have a workaround for you. You can use a modified version of the CountTrigger:
    
    ```
    public class ClosingCountTrigger<W extends Window> extends Trigger<Object, W>
{
    	private static final long serialVersionUID = 1L;
    
    	private final long maxCount;
    
    	private final ValueStateDescriptor<Long> stateDesc =
    			new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L);
    
    
    	private ClosingCountTrigger(long maxCount) {
    		this.maxCount = maxCount;
    	}
    
    	@Override
    	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext
ctx) throws IOException {
    		ValueState<Long> count = ctx.getPartitionedState(stateDesc);
    		long currentCount = count.value() + 1;
    		count.update(currentCount);
    		if (currentCount >= maxCount) {
    			count.update(0L);
    			return TriggerResult.FIRE_AND_PURGE;
    		}
    		
    		// register timer for final watermark
    		ctx.registerEventTimeTimer(Long.MAX_VALUE);
    		return TriggerResult.CONTINUE;
    	}
    
    	@Override
    	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
    		// final triggering
    		return TriggerResult.FIRE_AND_PURGE;
    	}
    
    	@Override
    	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws
Exception {
    		return TriggerResult.CONTINUE;
    	}
    
    	@Override
    	public void clear(W window, TriggerContext ctx) throws Exception {
    		ctx.getPartitionedState(stateDesc).clear();
    		ctx.deleteEventTimeTimer(Long.MAX_VALUE);
    	}
    
    	@Override
    	public String toString() {
    		return "ClosingCountTrigger(" +  maxCount + ")";
    	}
    
    	/**
    	 * Creates a trigger that fires once the number of elements in a pane reaches the given
count.
    	 *
    	 * @param maxCount The count of elements at which to fire.
    	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
    	 */
    	public static <W extends Window> ClosingCountTrigger<W> of(long maxCount)
{
    		return new ClosingCountTrigger<>(maxCount);
    	}
    }
    ```
    
    you can use it like this:
    
    ```
    stream
        .keyBy(...)
        .window(GlobalWindows.create())
        .trigger(ClosingCountTrigger.of(<my count>)
        .apply() // or .reduce()
    ```
    
    This will behave like the normal count trigger but also register a callback for the final
`Long.MAX_VALUE` watermark. Then, when the final watermark arrives it will trigger one last
window emission.


> Bounded sources should emit a Max Watermark when they are done
> --------------------------------------------------------------
>
>                 Key: FLINK-3554
>                 URL: https://issues.apache.org/jira/browse/FLINK-3554
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> For proper event time support in bounded sources, these sources should emit a final watermark
before shutting down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message