flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask
Date Fri, 22 Apr 2016 09:57:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253666#comment-15253666
] 

Aljoscha Krettek commented on FLINK-3669:
-----------------------------------------

Hi,
it's almost done but I think we need both the {{processingTimeTimers}} set and the {{processingTimeTimerTimestamps}}
MultiSet. The former is used to not add repeatedly to the queue while the latter is used for
not registering a lot of timers at {{StreamTask}}.

As it is now in {{registerProcessingTimeTimer}}:
{code}
public void registerProcessingTimeTimer(long time) {
    Timer<K, W> timer = new Timer<>(time, key, window);
    //If this is the first timer added for this timestamp (per key and window) register a
TriggerTask and add Timer to Queue
    if (processingTimeTimerTimestamps.add(time,1) == 0) {
        processingTimeTimersQueue.add(timer);
        ScheduledFuture<?> scheduledFuture= getRuntimeContext().registerTimer(time,
WindowOperator.this);
        processingTimeTimerFutures.put(time, scheduledFuture);
    }
}
{code}
we correctly schedule only one timer at the {{StreamTask}} per timestamp but we also only
schedule one timer in the queue. If we register a timer for the same timestamp from multiple
keys we ignore the timers for all but the first key to register. I think it should be:
 {code}
public void registerProcessingTimeTimer(long time) {
    Timer<K, W> timer = new Timer<>(time, key, window);

    // make sure we only put one timer per key into the queue
    if (processingTimeTimers.add(timer)) {
        processingTimeTimersQueue.add(timer);

        //If this is the first timer added for this timestamp register a TriggerTask
        if (processingTimeTimerTimestamps.add(time, 1) == 0) {
            ScheduledFuture<?> scheduledFuture= getRuntimeContext().registerTimer(time,
WindowOperator.this);
            processingTimeTimerFutures.put(time, scheduledFuture);
        }
    }
}
{code}

but man, this stuff is tricky to figure out... :-)

> WindowOperator registers a lot of timers at StreamTask
> ------------------------------------------------------
>
>                 Key: FLINK-3669
>                 URL: https://issues.apache.org/jira/browse/FLINK-3669
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.1
>            Reporter: Aljoscha Krettek
>            Assignee: Konstantin Knauf
>            Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every processing-time
timer that a Trigger registers. We should combine several registered trigger timers to only
register one low-level timer (timer coalescing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message