flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jark Wu (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
Date Fri, 01 Nov 2019 11:39:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964784#comment-16964784
] 

Jark Wu commented on FLINK-14591:
---------------------------------

I think we can put the merge logic in {{StreamExecutor#execute()}}. This is the last step
to submit a job. 

>  Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-14591
>                 URL: https://issues.apache.org/jira/browse/FLINK-14591
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Wei Zhong
>            Priority: Minor
>
> In current implementation of blink planner, the method "PlannerBase#mergeParameter" will
be called by "PlannerBase#translate" method to merge the configuration inside TableConfig
into global job parameters:
> {code:scala}
>   override def translate(
>       modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
>     if (modifyOperations.isEmpty) {
>       return List.empty[Transformation[_]]
>     }
>     mergeParameters()
>     val relNodes = modifyOperations.map(translateToRel)
>     val optimizedRelNodes = optimize(relNodes)
>     val execNodes = translateToExecNodePlan(optimizedRelNodes)
>     translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, toDataStream,
insertInto, etc.
> But as shown above, there is a chance that the method return directly and not call the
"mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method and "TableEnvironment#execute"
method, these configurations will not be merged into global job parameters because the "mergeParameters"
method is not called:
> {code:scala}
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance.useBlinkPlanner.build)
>     ...
>     ...
>     val result = ...
>     val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
>     tEnv.registerTableSink("MySink", sink)
>     tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
>     result.insertInto("MySink")
>     
>     // the "jobparam2" configuration will loss
>     tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
>     tEnv.execute("test")
>     val jobConfig = env.getConfig.getGlobalJobParameters.toMap
>     
>     assertTrue(jobConfig.get("jobparam1")=="value1")
>     // this assertion will fail:
>     assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message