flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kurt Young (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-11974) Introduce StreamOperatorSubstitutor to help table perform the whole Operator CodeGen
Date Fri, 12 Apr 2019 01:19:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815885#comment-16815885

Kurt Young commented on FLINK-11974:

[~pnowojski] Regarding to  #1, my gut feeling is it might be okay if we only have 1 or 2 OneInputOperatorWrapper
chained in a single StreamTask. If we have more chaining operators, especially after we make
two input operator chaining possible, even a lightweight wrapper will certainly reduce the
possibility to be inlined by JIT.

I agree with that StreamOperatorSubstitutor is something like OperatorFactory, maybe we can
see it as a simplest OperatorFactory version. If we want to introduce this concept, how many
changes we want to introduce at first step? Are we going to change the existing codes, .e.g.
how DataStream API transfer stream operators to runtime? If not, we must find a solution to
let OperatorFactory co-exist with current path, this will involve more thinking and design.

> Introduce StreamOperatorSubstitutor to help table perform the whole Operator CodeGen
> ------------------------------------------------------------------------------------
>                 Key: FLINK-11974
>                 URL: https://issues.apache.org/jira/browse/FLINK-11974
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Operators
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
> If we need CodeGen an entire Operator, one possible solution is to introduce an OperatorWrapper,
then generate a CodeGen sub-Operator in OperatorWrapper's open, and then proxy all methods
to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a StreamOperatorSubstitutor:
> {code:java}
> /**
>  * Basic interface for stream operator substitutes. It is transferred to the streamTask
>  * serialization, and produce an actual stream operator to the streamTask, who uses the
>  * stream operator to run.
>  *
>  * @param <OUT> output type of the actual stream operator
>  */
> public interface StreamOperatorSubstitutor<OUT> {
>    /**
>     * Produces the actual stream operator.
>     *
>     * @param userCodeClassLoader the user code class loader to use.
>     * @return the actual stream operator created on {@code StreamTask}.
>     */
>    StreamOperator<OUT> getActualStreamOperator(ClassLoader userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
>    return (T) ((StreamOperatorSubstitutor) operator).getActualStreamOperator(cl);
> } else {
>    return (T) operator;
> }
> {code}
> to get the real operator.

This message was sent by Atlassian JIRA

View raw message