flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
Date Thu, 23 Nov 2017 16:02:11 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264529#comment-16264529
] 

ASF GitHub Bot commented on FLINK-6094:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r152833454
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
---
    @@ -116,14 +135,100 @@ object UpdatingPlanChecker {
               val windowStartEnd = w.getWindowProperties.map(_.name)
               // we have only a unique key if at least one window property is selected
               if (windowStartEnd.nonEmpty) {
    -            keys = Some(groupKeys ++ windowStartEnd)
    +            val smallestAttribute = windowStartEnd.sorted.head
    +            Some((groupKeys.map(e => (e, e)) ++ windowStartEnd.map((_, smallestAttribute))).toList)
    +          } else {
    +            None
    +          }
    +
    +        case j: DataStreamJoin =>
    +          val joinType = j.getJoinType
    +          joinType match {
    +            case JoinRelType.INNER => {
    +              // get key(s) for inner join
    +              val lInputKeys = visit(j.getLeft)
    +              val rInputKeys = visit(j.getRight)
    +              if (lInputKeys.isEmpty || rInputKeys.isEmpty) {
    +                None
    +              } else {
    +                // Output of inner join must have keys if left and right both contain
key(s).
    +                // Key groups from both side will be merged by join equi-predicates
    +                val lFieldNames: Seq[String] = j.getLeft.getRowType.getFieldNames
    +                val rFieldNames: Seq[String] = j.getRight.getRowType.getFieldNames
    +                val lJoinKeys: Seq[String] = j.getJoinInfo.leftKeys.map(lFieldNames.get(_))
    +                val rJoinKeys: Seq[String] = j.getJoinInfo.rightKeys.map(rFieldNames.get(_))
    +
    +                getOutputKeysForInnerJoin(
    +                  lFieldNames ++ rFieldNames,
    +                  lInputKeys.get ++ rInputKeys.get,
    +                  lJoinKeys.zip(rJoinKeys).toList
    +                )
    +              }
    +            }
    +            case _ => throw new UnsupportedOperationException(
    +              s"An Unsupported JoinType [ $joinType ]")
               }
             case _: DataStreamRel =>
    -          // anything else does not forward keys or might duplicate key, so we can stop
    -          keys = None
    +          // anything else does not forward keys, so we can stop
    +          None
           }
         }
     
    -  }
     
    +    def getOutputKeysForInnerJoin(
    +        inNames: Seq[String],
    +        inKeys: List[(String, String)],
    +        joinKeys: List[(String, String)])
    +    : Option[List[(String, String)]] = {
    +
    +      val nameToGroups = mutable.HashMap.empty[String,String]
    +
    +      // merge two groups
    +      def merge(nameA: String, nameB: String): Unit = {
    +        val ga: String = findGroup(nameA);
    --- End diff --
    
    Remove semicolons.


> Implement stream-stream proctime non-window  inner join
> -------------------------------------------------------
>
>                 Key: FLINK-6094
>                 URL: https://issues.apache.org/jira/browse/FLINK-6094
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message