flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Caizhi Weng (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-13943) Provide api to convert flink table to java List (e.g. Table#collect)
Date Mon, 18 Nov 2019 08:34:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-13943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976354#comment-16976354
] 

Caizhi Weng edited comment on FLINK-13943 at 11/18/19 8:33 AM:
---------------------------------------------------------------

Hi dear Flink community. My ideas for this improvement are as follows:

h3. Solution 1: Using {{CollectTableSink}} and {{CollectOutputFormat}}

Blink planner already contains {{CollectTableSink}} and {{CollectOutputFormat}} classes which
can serialize data streams into a list using {{SerializedListAccumulator}}. These classes
greatly simplifies this improvement.

As a starting point, we can add a method which is very similar to {{DataSet#collect}}: we
execute the current table, and fetch the results collected by the accumulators, then deserialize
it into our desired list.

The problem is that: this solution can only be applied to batch jobs whose results are of
moderate size, for batch jobs having huge results size or for never-ending streaming jobs,
as we cannot store the results in memory, this solution is not applicable.

h3. Solution 2: Using {{DataStreamUtils::collect}}

If we do not need the result to be a set, we can also fetch the result in an iterative style.
flink-streaming-java model already contains the {{DataStreamUtils::collect}} method which
returns an iterator to read results from the local or remote execution environment.

The good thing is that this solution is applicable for both batch and streaming. Although
I haven't tested it, from my understanding about the code, the problem of this solution is
that we'll have to send the results buffer by buffer instead of record by record (otherwise
it might be too slow), and that if the client consumes the results too slowly, it might block
the whole task.

To solve this, we may introduce two config options, one is to control the heap memory usage
of {{CollectSink}} so that it can temporarily buffer unconsumed results, the other is to set
the consuming strategy (BLOCKING or NON_BLOCKING). For blocking strategy, {{CollectSink}}
will block the task if its memory buffer is full, otherwise it will throw away the oldest
unconsumed result to make space for the new ones.


was (Author: tsreaper):
Hi dear Flink community. My ideas for this improvement are as follows:

Blink planner already contains {{CollectTableSink}} and {{CollectOutputFormat}} classes which
can serialize data streams into a list using {{SerializedListAccumulator}}. These classes
greatly simplifies this improvement.

As a starting point, we can add a method which is very similar to {{DataSet#collect}}: we
execute the current table, and fetch the results collected by the accumulators, then deserialize
it into our desired list.

The problem is that: this solution can only be applied to batch jobs whose results are of
moderate size, for batch jobs having huge results size or for never-ending streaming jobs,
as we cannot store the results in memory, this solution is not applicable.

> Provide api to convert flink table to java List (e.g. Table#collect)
> --------------------------------------------------------------------
>
>                 Key: FLINK-13943
>                 URL: https://issues.apache.org/jira/browse/FLINK-13943
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / API
>            Reporter: Jeff Zhang
>            Assignee: Caizhi Weng
>            Priority: Major
>
> It would be nice to convert flink table to java List so that I can do other data manipulation
in client side after execution flink job. For flink planner, I can convert flink table to
DataSet and use DataSet#collect, but for blink planner, there's no such api.
> EDIT from FLINK-14807:
> Currently, it is very unconvinient for user to fetch data of flink job unless specify
sink expclitly and then fetch data from this sink via its api (e.g. write to hdfs sink, then
read data from hdfs). However, most of time user just want to get the data and do whatever
processing he want. So it is very necessary for flink to provide api Table#collect for this
purpose. 
> Other apis such as Table#head, Table#print is also helpful.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message