flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xingcan Cui (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6936) Add multiple targets support for custom partitioner
Date Wed, 28 Jun 2017 02:11:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065817#comment-16065817

Xingcan Cui commented on FLINK-6936:

[~aljoscha], thanks for your attention. You are right, that the state management seems to
be the bottleneck since the {{KeyedState}} can not be used here and the {{OperatorState}}
is still incomplete (it only supports {{ListState}}). In my view, the main problem lies in
the current state mechanism (whether keyed or unkeyed) only supports "symmetrical" states,
i.e., it is impossible to assign a designated portion of the global state to a dedicated instance.

For the moment, I have implemented a simple record-to-window join according to the design
document ([Inner Join in Flink|https://goo.gl/4AdR7h]). The body just looks like that.
orderA.connect(orderB).process(new JoinMerge[Order, Order]())
      .multicast(new JoinPartitioner[Order, Order])
      .process(new CommonStreamJoin[Order, Order, Order2](
        new JoinFunction[Order, Order, Order2] {
          override def join(left: Order, right: Order): Order2 = {
            Order2(left.user, right.user, left.product, right.product, left.amount, right.amount);
        }, 60000, 1000))
The key idea is to randomly split the left stream and duplicate the right stream to all downstream
Correspondingly, I use {{OperatorState}} ({{ListState}}) to store a portion of the cached
left stream and a full copy of the cached right stream in each {{CommonStreamJoin}} instance.
It seems to be quite costly and will not work correctly under some circumstances (e.g., increase
the parallelism, state inconsistency occurs), but I cannot imagine other implementations unless
there is a better state management mechanism provided. What do you think?


Codes for the three main classes {{JoinMerge}}, {{JoinPartitioner}} and {{CommonStreamJoin}}
(it does not remove expired data now) are as follows.
class JoinMerge[L, R] extends CoProcessFunction[L, R, Either[L, R]] {
    override def processElement1(value: L, ctx: CoProcessFunction[L, R, Either[L, R]]#Context,
                                 out: Collector[Either[L, R]]): Unit = {

    override def processElement2(value: R, ctx: CoProcessFunction[L, R, Either[L, R]]#Context,
                                 out: Collector[Either[L, R]]): Unit = {
  class JoinPartitioner[L, R] extends MultiPartitioner[Either[L, R]] {
    var targets: Array[Int] = null
    override def partition(record: Either[L, R], numPartitions: Int): Array[Int] = {
      if (record.isLeft) {
        if (!(null != targets && targets.length == numPartitions)) {
          targets = Array.range(0, numPartitions)
        return targets
      } else {
  class CommonStreamJoin[L, R, O](val joinFunction: JoinFunction[L, R, O], val cacheTime:
Long, val rOffset: Long)
    extends ProcessFunction[Either[L, R], O] with CheckpointedFunction {

    val leftCache = new TreeMap[Long, JList[L]]
    val rightCache = new TreeMap[Long, JList[R]]

    var leftState: ListState[TMap[Long, JList[L]]] = null
    var rightState: ListState[MEntry[Long, JList[R]]] = null

    val leftDescriptor = new ListStateDescriptor[TMap[Long, JList[L]]]("leftCache",
      TypeInformation.of(new TypeHint[TMap[Long, JList[L]]]() {}))
    val rightDescriptor = new ListStateDescriptor[MEntry[Long, JList[R]]]("rightCache",
      TypeInformation.of(new TypeHint[MEntry[Long, JList[R]]]() {}))

    var result: O = _

    override def processElement(value: Either[L, R],
                                ctx: ProcessFunction[Either[L, R], O]#Context,
                                out: Collector[O]): Unit = {
      //TODO this should be replaced with timestamps contained in records
      val time = ctx.timerService().currentProcessingTime()
      if (value.isRight) {
        val right = value.right.get
        if (!rightCache.containsKey(time)) {
          rightCache.put(time, new util.LinkedList[R]())
        for (leftList <- leftCache.values()) {
          for (left <- leftList) {
            result = joinFunction.join(left, right)
            if (null != result) {
      } else {
        val left = value.left.get
        if (!leftCache.containsKey(time)) {
          leftCache.put(time, new util.LinkedList[L]())
        for (rightList <- rightCache.values()) {
          for (right <- rightList) {
            result = joinFunction.join(left, right)
            if (null != result) {

    override def snapshotState(context: FunctionSnapshotContext): Unit = {
      for (entry <- rightCache.entrySet()) {

    override def initializeState(context: FunctionInitializationContext): Unit = {
      leftState = context.getOperatorStateStore.getListState(leftDescriptor)
      rightState = context.getOperatorStateStore.getListState(rightDescriptor)
      if (context.isRestored) {
        for (e <- rightState.get()) {
          if (!rightCache.containsKey(e.getKey)) {
            rightCache.put(e.getKey, e.getValue)
          } else {

> Add multiple targets support for custom partitioner
> ---------------------------------------------------
>                 Key: FLINK-6936
>                 URL: https://issues.apache.org/jira/browse/FLINK-6936
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
>            Priority: Minor
> The current user-facing Partitioner only allows returning one target.
> {code:java}
> @Public
> public interface Partitioner<K> extends java.io.Serializable, Function {
> 	/**
> 	 * Computes the partition for the given key.
> 	 *
> 	 * @param key The key.
> 	 * @param numPartitions The number of partitions to partition into.
> 	 * @return The partition index.
> 	 */
> 	int partition(K key, int numPartitions);
> }
> {code}
> Actually, this function should return multiple partitions and this may be a historical
> There could be at least three approaches to solve this.
> # Make the `protected DataStream<T> setConnectionType(StreamPartitioner<T>
partitioner)` method in DataStream public and that allows users to directly define StreamPartitioner.
> # Change the `partition` method in the Partitioner interface to return an int array instead
of a single int value.
> # Add a new `multicast` method to DataStream and provide a MultiPartitioner interface
which returns an int array.
> Considering the consistency of API, the 3rd approach seems to be an acceptable choice.
[~aljoscha], what do you think?

This message was sent by Atlassian JIRA

View raw message