flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink issue #2277: [FLINK-4207] WindowOperator becomes very slow with allowe...
Date Mon, 25 Jul 2016 13:54:17 GMT
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2277
  
    This doesn't work for some cases. Consider, when you change `testCleanupTimerWithEmptyListStateForTumblingWindows()`
to this:
    
    ```
    	@Test
    	public void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception {
    		final int WINDOW_SIZE = 2;
    		final long LATENESS = 100;
    
    		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
    
    		ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
    			new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig()));
    
    		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String,
Integer>>, String, TimeWindow> operator =
    			new WindowOperator<>(
    				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
    				new TimeWindow.Serializer(),
    				new TupleKeySelector(),
    				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
    				windowStateDesc,
    				new InternalIterableWindowFunction<>(new PassThroughFunction()),
    				new EventTimeTriggerAccum(LATENESS),
    				LATENESS);
    
    		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness
=
    			new OneInputStreamOperatorTestHarness<>(operator);
    
    		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
    
    		operator.setInputType(inputType, new ExecutionConfig());
    		testHarness.open();
    
    		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
    
    		// normal element
    		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
    		testHarness.processWatermark(new Watermark(1599));
    		testHarness.processWatermark(new Watermark(1999));
    		testHarness.processWatermark(new Watermark(2100));
    		testHarness.processWatermark(new Watermark(5000));
    
    		expected.add(new Watermark(1599));
    		expected.add(new StreamRecord<>("GOT: (key2,1)", 1999));
    		expected.add(new Watermark(1999)); // here it fires and purges
    		expected.add(new Watermark(2100)); // here is the cleanup timer
    		expected.add(new Watermark(5000));
    
    		System.out.println("OUTPUT" + testHarness.getOutput());
    
    		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(),
new Tuple2ResultSortComparator());
    		testHarness.close();
    	}
    ```
    
    with `EventTimeTriggerAccum` like this:
    ```
    	/**
    	 * A trigger that fires at the end of the window but does not
    	 * purge the state of the fired window. This is to test the state
    	 * garbage collection mechanism.
    	 */
    	public class EventTimeTriggerAccum extends Trigger<Object, TimeWindow> {
    		private static final long serialVersionUID = 1L;
    
    		private long cleanupTime;
    
    		private EventTimeTriggerAccum() {
    			cleanupTime = 0L;
    		}
    
    		public EventTimeTriggerAccum(long cleanupTime) {
    			this.cleanupTime = cleanupTime;
    		}
    
    		@Override
    		public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext
ctx) throws Exception {
    			if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    				// if the watermark is already past the window fire immediately
    				return TriggerResult.FIRE;
    			} else {
    				ctx.registerEventTimeTimer(window.maxTimestamp());
    				return TriggerResult.CONTINUE;
    			}
    		}
    
    		@Override
    		public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    			return time == window.maxTimestamp() || time == window.maxTimestamp() + cleanupTime
?
    				TriggerResult.FIRE_AND_PURGE :
    				TriggerResult.CONTINUE;
    		}
    
    		@Override
    		public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
    			return TriggerResult.CONTINUE;
    		}
    
    		@Override
    		public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    			ctx.deleteEventTimeTimer(window.maxTimestamp());
    		}
    
    		@Override
    		public boolean canMerge() {
    			return true;
    		}
    
    		@Override
    		public TriggerResult onMerge(TimeWindow window,
    									 OnMergeContext ctx) {
    			ctx.registerEventTimeTimer(window.maxTimestamp());
    			return TriggerResult.CONTINUE;
    		}
    
    		@Override
    		public String toString() {
    			return "EventTimeTrigger()";
    		}
    	}
    ```
    
    and `PassThroughFunction` like this:
    ```
    	private class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>,
String, String, TimeWindow> {
    		private static final long serialVersionUID = 1L;
    
    		@Override
    		public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>>
input, Collector<String> out) throws Exception {
    			out.collect("GOT: " + Joiner.on(",").join(input));
    		}
    	}
    ```
    
    Essentially, we want to trigger the window on cleanup time only when new data arrived
after the purge. No new data arrives but the window function is called anyways.
    
    I'm afraid the solution to this really is to change `FoldingState` and `ListState` to
return `null` when there is no state available. (Instead of "default value" and "empty iterable"
as it is now). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message