flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3554] [streaming] Emit a MAX Watermark ...
Date Fri, 04 Mar 2016 11:25:22 GMT
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1750#issuecomment-192241682
  
    Yes, that's  a problem. Could you maybe open an issue for that here: https://issues.apache.org/jira/browse/FLINK
    
    In the meantime, I have a workaround for you. You can use a modified version of the CountTrigger:
    
    ```
    public class ClosingCountTrigger<W extends Window> extends Trigger<Object, W>
{
    	private static final long serialVersionUID = 1L;
    
    	private final long maxCount;
    
    	private final ValueStateDescriptor<Long> stateDesc =
    			new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L);
    
    
    	private ClosingCountTrigger(long maxCount) {
    		this.maxCount = maxCount;
    	}
    
    	@Override
    	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext
ctx) throws IOException {
    		ValueState<Long> count = ctx.getPartitionedState(stateDesc);
    		long currentCount = count.value() + 1;
    		count.update(currentCount);
    		if (currentCount >= maxCount) {
    			count.update(0L);
    			return TriggerResult.FIRE_AND_PURGE;
    		}
    		
    		// register timer for final watermark
    		ctx.registerEventTimeTimer(Long.MAX_VALUE);
    		return TriggerResult.CONTINUE;
    	}
    
    	@Override
    	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
    		// final triggering
    		return TriggerResult.FIRE_AND_PURGE;
    	}
    
    	@Override
    	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws
Exception {
    		return TriggerResult.CONTINUE;
    	}
    
    	@Override
    	public void clear(W window, TriggerContext ctx) throws Exception {
    		ctx.getPartitionedState(stateDesc).clear();
    		ctx.deleteEventTimeTimer(Long.MAX_VALUE);
    	}
    
    	@Override
    	public String toString() {
    		return "ClosingCountTrigger(" +  maxCount + ")";
    	}
    
    	/**
    	 * Creates a trigger that fires once the number of elements in a pane reaches the given
count.
    	 *
    	 * @param maxCount The count of elements at which to fire.
    	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
    	 */
    	public static <W extends Window> ClosingCountTrigger<W> of(long maxCount)
{
    		return new ClosingCountTrigger<>(maxCount);
    	}
    }
    ```
    
    you can use it like this:
    
    ```
    stream
        .keyBy(...)
        .window(GlobalWindows.create())
        .trigger(ClosingCountTrigger.of(<my count>)
        .apply() // or .reduce()
    ```
    
    This will behave like the normal count trigger but also register a callback for the final
`Long.MAX_VALUE` watermark. Then, when the final watermark arrives it will trigger one last
window emission.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message