flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jark Wu (Jira)" <j...@apache.org>
Subject [jira] [Assigned] (FLINK-12215) Introduce SqlProcessFunction for blink streaming runtime
Date Mon, 01 Jun 2020 03:41:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-12215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Jark Wu reassigned FLINK-12215:

    Assignee:     (was: Jark Wu)

> Introduce SqlProcessFunction for blink streaming runtime
> --------------------------------------------------------
>                 Key: FLINK-12215
>                 URL: https://issues.apache.org/jira/browse/FLINK-12215
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Runtime
>            Reporter: Jark Wu
>            Priority: Minor
> Currently, we are heavily using DataStream's {{KeyedProcessFunction}} when implementing
Blink SQL runtime. But there are some disadvantages that lead us to introduce a SQL own ProcessFunction.
> 1. {{KeyedProcessFunction}} do not support {{endInput(Collector)}}.
> This is needed to achieve a same semantic for batch and stream. For example: {{SELECT
COUNT(\*) FROM T}} should return {{0}} when input is empty, but now there is no output result.
That's why we need the {{endInput(Collector)}} to emit a final result. I know this is not
a real world streaming use case, but is worth to do.
> 2. {{KeyedProcessFunction}} is an abstract class. 
> As discussed in FLINK-11409, if it is an interface it will be easy to extract some common
logic to a base class and share it between ProcessFunction and CoProcessFunction and other
functions. But it doesn't work when it is an abstract class. We also encountered this problem
when we want to reuse some code. However, it's hard to make {{KeyedProcessFunction}} as an
interface because of compatibility. 
> 3. {{KeyedProcessFunction}} doesn't expose {{setCurrentKey}}.
> We have some optimization about lazy state writing, i.e. buffer the changes in heap and
flush them to state when doing snapshot. That needs to change current key of the operator/function.
> That's why we want to introduce a SQL own {{ProcessFunction}} interface. Maybe we can
call it {{SqlProcessFunction}}. The name can be discussed in the JIRA.
> The initial idea of {{SqlPrcessFunction}}: 
> {code:java}
> public interface SqlProcessFunction<K, I, O> extends Function {
>      void processElement(I value, Context<K> ctx, Collector<O> out) throws
>       void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<O> out)
throws Exception;
>       void endInput(Context<K> ctx, Collector<O> out) throws Exception;
> 	interface Context<K> {
> 		TimerService timerService();
>                 K getCurrentKey();
> 		void setCurrentKey(K key);
> 	}
> 	interface OnTimerContext<K> extends Context<K> {
> 		TimeDomain timeDomain();
> 	}
> }
> {code}

This message was sent by Atlassian Jira

View raw message