flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Brice Bingman (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows
Date Fri, 23 Jun 2017 13:15:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060881#comment-16060881
] 

Brice Bingman commented on FLINK-6990:
--------------------------------------

[~Cody] I chose the smallest scenario that shows the slowdown on my machine.  If you are on
a machine with better resources, you may need to increase the event input rate or increase
the window size to see the slowdown.  I noticed if I reduced the window size to 1 minute,
the problem goes away.

[~fhueske] 
1) So on each slide of a time window, the data in that window is replicated and put into the
new window?  That does seem like a lot of overhead.  Why is everything replicated?  Is there
any way to disable that?
2) While this example could be implemented in a ReduceFunction, I also need to support more
complicated calculations such as a linear regression or an exponential moving average where
you would need access to all the data in the window.  Which is why I've been stress testing
with the WindowFunction.

Perhaps there should be a more performant time window that doesn't replicate data on each
slide.  Similar to how the count window is implemented.

> Poor performance with Sliding Time Windows
> ------------------------------------------
>
>                 Key: FLINK-6990
>                 URL: https://issues.apache.org/jira/browse/FLINK-6990
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, Streaming
>    Affects Versions: 1.3.0
>         Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>            Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a simple
example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
>         //Streaming 10,000 events per second
>         see.addSource(new SourceFunction<TestObject>() {
>             transient ScheduledExecutorService executor;
>             @Override
>             public synchronized void run(final SourceContext<TestObject> ctx) throws
Exception {
>                 executor = Executors.newSingleThreadScheduledExecutor();
>                 executor.scheduleAtFixedRate(new Runnable() {
>                     @Override
>                     public void run() {
>                         for (int k = 0; k < 10; k++) {
>                             for (int i = 0; i < 1000; i++) {
>                                 TestObject obj = new TestObject();
>                                 obj.setKey(k);
>                                 ctx.collect(obj);
>                             }
>                         }
>                     }
>                 }, 0, 1, TimeUnit.SECONDS);
>                 this.wait();
>             }
>             @Override
>             public synchronized void cancel() {
>                 executor.shutdown();
>                 this.notify();
>             }
>         }).keyBy("key")
>         .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new
WindowFunction<TestObject, String, Tuple, TimeWindow>() {
>             @Override
>             public void apply(Tuple key, TimeWindow window, Iterable<TestObject>
input, Collector<String> out) throws Exception {
>                 int count = 0;
>                 for (Object obj : input) {
>                     count++;
>                 }
>                 out.collect(key.getField(0) + ": " + count);
>             }
>         })
>         .print();
>         see.execute();
>     }
>     public static class TestObject {
>         private Integer key;
>         public Integer getKey() {
>             return key;
>         }
>         public void setKey(Integer key) {
>             this.key = key;
>         }
>     }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I would expect
a steady stream of output at 1 second intervals.  For comparison, you can switch to a count
window of similar size which peforms just fine:
> {code:java}
>    .countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String,
Tuple, GlobalWindow>() {
>                     @Override
>                     public void apply(Tuple key, GlobalWindow window, Iterable<TestObject>
input, Collector<String> out) throws Exception {
>                         int count = 0;
>                         for (Object obj : input) {
>                             count++;
>                         }
>                         out.collect(key.getField(0) + ": " + count);
>                     }
>                 })
> {code}
> I would expect the sliding time window to perform similarly to a count window.  The sliding
time window also uses significantly more cpu and memory than the count window.  I would also
expect resource consumption to be similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking
with the checkpointLock which acts like a global lock.  There should be a lock per key or
preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message