flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3942) Add support for INTERSECT
Date Wed, 29 Jun 2016 22:10:43 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355890#comment-15355890
] 

ASF GitHub Bot commented on FLINK-3942:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r69037698
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
---
    @@ -83,66 +82,68 @@ class DataSetIntersect(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    var leftDataSet: DataSet[Any] = null
    -    var rightDataSet: DataSet[Any] = null
    +    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     
    -    expectedType match {
    -      case None =>
    -        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -        rightDataSet =
    -          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    -      case _ =>
    -        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    -        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    -    }
    -
    -    val config = tableEnv.getConfig
    -
    -    val returnType = determineReturnType(
    -      getRowType,
    -      expectedType,
    -      config.getNullCheck,
    -      config.getEfficientTypeUsage)
    -
    -    val generator = new CodeGenerator(
    -      config,
    -      false,
    -      leftDataSet.getType,
    -      Some(rightDataSet.getType))
    -
    -    val conversion = generator.generateConverterResultExpression(
    -      returnType,
    -      left.getRowType.getFieldNames)
    +    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
     
    +    val leftType = leftDataSet.getType
    +    val rightType = rightDataSet.getType
     
    -    val body = s"""
    -            |${conversion.code}
    -            |${generator.collectorTerm}.collect(${conversion.resultTerm});
    -            |""".stripMargin
    +    // If it is atomic type, the field expression need to be "*".
    --- End diff --
    
    I think you can use `*` for all cases. Composite types should be (recursively) expanded
to all atomic member types.


> Add support for INTERSECT
> -------------------------
>
>                 Key: FLINK-3942
>                 URL: https://issues.apache.org/jira/browse/FLINK-3942
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>    Affects Versions: 1.1.0
>            Reporter: Fabian Hueske
>            Assignee: Jark Wu
>            Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a DataSet
API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite {{LogicalIntersect}}
into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message