flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
Date Mon, 05 Mar 2018 09:58:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16385875#comment-16385875

ASF GitHub Bot commented on FLINK-8560:

Github user aljoscha commented on a diff in the pull request:

    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
    @@ -70,21 +69,15 @@ public void open() throws Exception {
     	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception
    -		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
    -		onTimerContext.timer = timer;
    -		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
    -		onTimerContext.timeDomain = null;
    -		onTimerContext.timer = null;
    +		reinitialize(userFunction, TimeDomain.EVENT_TIME, timer);
    --- End diff --
    Hate to be picky, but I think the name is a bit misleading and we could probably put all
of this in a method `invokeUserTime()` that does what `reinitialise()` and `reset()` do.
    @kl0u I think you can quickly fix that when merging.

> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -------------------------------------------------------------------------
>                 Key: FLINK-8560
>                 URL: https://issues.apache.org/jira/browse/FLINK-8560
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: J├╝rgen Thomann
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.5.0
> Currently it is required to store the key of a keyBy() in the processElement method to
have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for every element
if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar method.
Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html

This message was sent by Atlassian JIRA

View raw message