flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4207) WindowOperator becomes very slow with allowed lateness
Date Mon, 25 Jul 2016 13:56:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391927#comment-15391927
] 

ASF GitHub Bot commented on FLINK-4207:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2277#discussion_r72067210
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
---
    @@ -408,11 +410,22 @@ public void processWatermark(Watermark mark) throws Exception {
     				if (windowAssigner instanceof MergingWindowAssigner) {
     					mergingWindows = getMergingWindowSet();
     					W stateWindow = mergingWindows.getStateWindow(context.window);
    +					if (stateWindow == null) {
    +						// then the window is already purged and this is a cleanup
    +						// timer set due to allowed lateness that has nothing to clean,
    +						// so it is safe to just ignore
    +						continue;
    +					}
     					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
     				} else {
     					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
     				}
     
    +				if (windowState.get() == null) {
    --- End diff --
    
    This doubles the amount of accesses to the state. For example, for RocksDB that's two
DB lookups instead of one. I think we can get the state contents once and pass that to `fireOrContinue()`
instead of the state object. 
    
    Same for other instances of this snipped. 


> WindowOperator becomes very slow with allowed lateness
> ------------------------------------------------------
>
>                 Key: FLINK-4207
>                 URL: https://issues.apache.org/jira/browse/FLINK-4207
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>
> In this simple example the throughput (as measured by the count the window emits) becomes
very low when an allowed lateness is set:
> {code}
> public class WindowWordCount {
> 	public static void main(String[] args) throws Exception {
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> 		env.setParallelism(1);
> 		env.addSource(new InfiniteTupleSource(100_000))
> 				.keyBy(0)
> 				.timeWindow(Time.seconds(3))
> 				.allowedLateness(Time.seconds(1))
> 				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
> 					@Override
> 					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
> 							Tuple2<String, Integer> value2) throws Exception {
> 						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
> 					}
> 				})
> 				.filter(new FilterFunction<Tuple2<String, Integer>>() {
> 					private static final long serialVersionUID = 1L;
> 					@Override
> 					public boolean filter(Tuple2<String, Integer> value) throws Exception {
> 						return value.f0.startsWith("Tuple 0");
> 					}
> 				})
> 				.print();
> 		// execute program
> 		env.execute("WindowWordCount");
> 	}
> 	public static class InfiniteTupleSource implements ParallelSourceFunction<Tuple2<String,
Integer>> {
> 		private static final long serialVersionUID = 1L;
> 		private int numGroups;
> 		public InfiniteTupleSource(int numGroups) {
> 			this.numGroups = numGroups;
> 		}
> 		@Override
> 		public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception
{
> 			long index = 0;
> 			while (true) {
> 				Tuple2<String, Integer> tuple = new Tuple2<>("Tuple " + (index % numGroups),
1);
> 				out.collect(tuple);
> 				index++;
> 			}
> 		}
> 		@Override
> 		public void cancel() {
> 		}
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message