flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4207) WindowOperator becomes very slow with allowed lateness
Date Mon, 25 Jul 2016 13:54:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391923#comment-15391923
] 

ASF GitHub Bot commented on FLINK-4207:
---------------------------------------

Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2277
  
    This doesn't work for some cases. Consider, when you change `testCleanupTimerWithEmptyListStateForTumblingWindows()`
to this:
    
    ```
    	@Test
    	public void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception {
    		final int WINDOW_SIZE = 2;
    		final long LATENESS = 100;
    
    		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
    
    		ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
    			new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig()));
    
    		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String,
Integer>>, String, TimeWindow> operator =
    			new WindowOperator<>(
    				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
    				new TimeWindow.Serializer(),
    				new TupleKeySelector(),
    				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
    				windowStateDesc,
    				new InternalIterableWindowFunction<>(new PassThroughFunction()),
    				new EventTimeTriggerAccum(LATENESS),
    				LATENESS);
    
    		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness
=
    			new OneInputStreamOperatorTestHarness<>(operator);
    
    		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
    
    		operator.setInputType(inputType, new ExecutionConfig());
    		testHarness.open();
    
    		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
    
    		// normal element
    		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
    		testHarness.processWatermark(new Watermark(1599));
    		testHarness.processWatermark(new Watermark(1999));
    		testHarness.processWatermark(new Watermark(2100));
    		testHarness.processWatermark(new Watermark(5000));
    
    		expected.add(new Watermark(1599));
    		expected.add(new StreamRecord<>("GOT: (key2,1)", 1999));
    		expected.add(new Watermark(1999)); // here it fires and purges
    		expected.add(new Watermark(2100)); // here is the cleanup timer
    		expected.add(new Watermark(5000));
    
    		System.out.println("OUTPUT" + testHarness.getOutput());
    
    		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(),
new Tuple2ResultSortComparator());
    		testHarness.close();
    	}
    ```
    
    with `EventTimeTriggerAccum` like this:
    ```
    	/**
    	 * A trigger that fires at the end of the window but does not
    	 * purge the state of the fired window. This is to test the state
    	 * garbage collection mechanism.
    	 */
    	public class EventTimeTriggerAccum extends Trigger<Object, TimeWindow> {
    		private static final long serialVersionUID = 1L;
    
    		private long cleanupTime;
    
    		private EventTimeTriggerAccum() {
    			cleanupTime = 0L;
    		}
    
    		public EventTimeTriggerAccum(long cleanupTime) {
    			this.cleanupTime = cleanupTime;
    		}
    
    		@Override
    		public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext
ctx) throws Exception {
    			if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    				// if the watermark is already past the window fire immediately
    				return TriggerResult.FIRE;
    			} else {
    				ctx.registerEventTimeTimer(window.maxTimestamp());
    				return TriggerResult.CONTINUE;
    			}
    		}
    
    		@Override
    		public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    			return time == window.maxTimestamp() || time == window.maxTimestamp() + cleanupTime
?
    				TriggerResult.FIRE_AND_PURGE :
    				TriggerResult.CONTINUE;
    		}
    
    		@Override
    		public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
    			return TriggerResult.CONTINUE;
    		}
    
    		@Override
    		public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    			ctx.deleteEventTimeTimer(window.maxTimestamp());
    		}
    
    		@Override
    		public boolean canMerge() {
    			return true;
    		}
    
    		@Override
    		public TriggerResult onMerge(TimeWindow window,
    									 OnMergeContext ctx) {
    			ctx.registerEventTimeTimer(window.maxTimestamp());
    			return TriggerResult.CONTINUE;
    		}
    
    		@Override
    		public String toString() {
    			return "EventTimeTrigger()";
    		}
    	}
    ```
    
    and `PassThroughFunction` like this:
    ```
    	private class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>,
String, String, TimeWindow> {
    		private static final long serialVersionUID = 1L;
    
    		@Override
    		public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>>
input, Collector<String> out) throws Exception {
    			out.collect("GOT: " + Joiner.on(",").join(input));
    		}
    	}
    ```
    
    Essentially, we want to trigger the window on cleanup time only when new data arrived
after the purge. No new data arrives but the window function is called anyways.
    
    I'm afraid the solution to this really is to change `FoldingState` and `ListState` to
return `null` when there is no state available. (Instead of "default value" and "empty iterable"
as it is now). 


> WindowOperator becomes very slow with allowed lateness
> ------------------------------------------------------
>
>                 Key: FLINK-4207
>                 URL: https://issues.apache.org/jira/browse/FLINK-4207
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>
> In this simple example the throughput (as measured by the count the window emits) becomes
very low when an allowed lateness is set:
> {code}
> public class WindowWordCount {
> 	public static void main(String[] args) throws Exception {
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> 		env.setParallelism(1);
> 		env.addSource(new InfiniteTupleSource(100_000))
> 				.keyBy(0)
> 				.timeWindow(Time.seconds(3))
> 				.allowedLateness(Time.seconds(1))
> 				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
> 					@Override
> 					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
> 							Tuple2<String, Integer> value2) throws Exception {
> 						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
> 					}
> 				})
> 				.filter(new FilterFunction<Tuple2<String, Integer>>() {
> 					private static final long serialVersionUID = 1L;
> 					@Override
> 					public boolean filter(Tuple2<String, Integer> value) throws Exception {
> 						return value.f0.startsWith("Tuple 0");
> 					}
> 				})
> 				.print();
> 		// execute program
> 		env.execute("WindowWordCount");
> 	}
> 	public static class InfiniteTupleSource implements ParallelSourceFunction<Tuple2<String,
Integer>> {
> 		private static final long serialVersionUID = 1L;
> 		private int numGroups;
> 		public InfiniteTupleSource(int numGroups) {
> 			this.numGroups = numGroups;
> 		}
> 		@Override
> 		public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception
{
> 			long index = 0;
> 			while (true) {
> 				Tuple2<String, Integer> tuple = new Tuple2<>("Tuple " + (index % numGroups),
1);
> 				out.collect(tuple);
> 				index++;
> 			}
> 		}
> 		@Override
> 		public void cancel() {
> 		}
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message