flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-15775) SourceFunctions are instantiated twice when pulled on from 2 Sinks
Date Tue, 28 Jan 2020 13:28:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17025106#comment-17025106
] 

Aljoscha Krettek commented on FLINK-15775:
------------------------------------------

I'm afraid this is both: a misunderstanding of how the API works and a shortcoming in the
API that makes it easy to have this misunderstanding and that also makes it hard (or impossible)
to express certain requirements. 

Registering the table source, i.e. putting it in the catalog only registers the source meta-data,
it doesn't instantiate any source. All SQL queries are considered to be independent of the
other queries so you get two queries that both instantiate a source from the same source metadata.

In order to express your use case you would need something like
{code}
tEnv.registerTableSource("foo_table", new FooTableSource());
Table source = tEnv.from("foo_table");
Table out0 = source.sqlQuery("SELECT * FROM <I DON'T KNOW> WHERE field_1 = 0");
Table out1 = source.sqlQuery("SELECT * FROM <I DON'T KNOW> WHERE field_1 = 1");
tEnv.registerTableSink("syso_sink_0", new SysoSink());
tEnv.registerTableSink("syso_sink_1", new SysoSink());
out0.insertInto("syso_sink_0");
out1.insertInto("syso_sink_1"); 
{code}
but that doesn't currently exist.

[~twalthr] Do you maybe have some input on this?


> SourceFunctions are instantiated twice when pulled on from 2 Sinks
> ------------------------------------------------------------------
>
>                 Key: FLINK-15775
>                 URL: https://issues.apache.org/jira/browse/FLINK-15775
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.1, 1.10.0
>            Reporter: Benoît Paris
>            Priority: Major
>         Attachments: flink-test-duplicated-sources.zip
>
>
> When pulled on by two sinks, the SourceFunctions of a TableSource will get instantiated
twice; (and subsequently opened by the parallelism number, which is expected behavior):
> The following will instantiate the FooTableSource's SourceFunction once (OK behavior,
but not the processing we want):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> out0.insertInto("syso_sink_0");
> {code}
>  
> This will instantiate the FooTableSource's SourceFunction twice (Not OK, as we're missing
half the inputs in each SysoSink):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> tEnv.registerTableSink("syso_sink_1", new SysoSink());
> out0.insertInto("syso_sink_0");
> out1.insertInto("syso_sink_1"); 
> {code}
>  
> This might not be a problem for Kafka's SourceFunctions, as we can always reread from
a log; but it is a data loss problem when the source data can't be reproduced.
> Actually, this might be me not understanding the API. Is there a way to make the runtime
read from the same opened SourceFunctions?
> Attached is Java code that logs the faulty opening of the SourceFunctions, pom.xml, and
logical execution plans for the duplicated case, and the workaround.
>  
> ----
> Workaround: make a conversion to an appendStream. Somehow this makes the planner think
it has to put a materialization barrier after the Source and read from that:
>  
> {code:java}
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
>  tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);{code}
>  
>  
> Best Regards,
> Ben



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message