flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6936) Add multiple targets support for custom partitioner
Date Wed, 28 Jun 2017 13:19:03 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066473#comment-16066473

Aljoscha Krettek commented on FLINK-6936:

I see, I have a few quick comments but no completely satisfactory solution with regards to

You can try using {{OperatorStateStore.getUnionListState()}} [javadoc|https://github.com/apache/flink/blob/2ef4900aa279e75844a9f8536cfe007c2542187d/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java#L77-L77]
for the state that is broadcast. With this type of state the checkpointed state of all parallel
operator instances is collected and sent to all operator instances when restoring. In your
case, where the state is the same on all operator instances, you would only checkpoint on
operator instance 0 and thereby get the same state on all operator instances when restoring
(independent of changed parallelism).

Regarding data partitioning, couldn't you use something like

      .process(new CommonStreamJoin[Order, Order, Order2](
        new JoinFunction[Order, Order, Order2] {
          override def join(left: Order, right: Order): Order2 = {
            Order2(left.user, right.user, left.product, right.product, left.amount, right.amount);
        }, 60000, 1000))

{{CommonStreamJoin}} would have to be a {{CoProcessFunction}} and you would get rid of the
two other UDFs that only to packing/unpacking and routing. (This doesn't work if you want
different multicast patterns, of course, but for the example that you showed where one stream
is randomly partitioned and the other is broadcast it works.)

> Add multiple targets support for custom partitioner
> ---------------------------------------------------
>                 Key: FLINK-6936
>                 URL: https://issues.apache.org/jira/browse/FLINK-6936
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
>            Priority: Minor
> The current user-facing Partitioner only allows returning one target.
> {code:java}
> @Public
> public interface Partitioner<K> extends java.io.Serializable, Function {
> 	/**
> 	 * Computes the partition for the given key.
> 	 *
> 	 * @param key The key.
> 	 * @param numPartitions The number of partitions to partition into.
> 	 * @return The partition index.
> 	 */
> 	int partition(K key, int numPartitions);
> }
> {code}
> Actually, this function should return multiple partitions and this may be a historical
> There could be at least three approaches to solve this.
> # Make the `protected DataStream<T> setConnectionType(StreamPartitioner<T>
partitioner)` method in DataStream public and that allows users to directly define StreamPartitioner.
> # Change the `partition` method in the Partitioner interface to return an int array instead
of a single int value.
> # Add a new `multicast` method to DataStream and provide a MultiPartitioner interface
which returns an int array.
> Considering the consistency of API, the 3rd approach seems to be an acceptable choice.
[~aljoscha], what do you think?

This message was sent by Atlassian JIRA

View raw message