flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3596) DataSet RelNode refactoring
Date Wed, 09 Mar 2016 16:28:40 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15187351#comment-15187351
] 

ASF GitHub Bot commented on FLINK-3596:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1777#discussion_r55545155
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
---
    @@ -69,37 +72,55 @@ class DataSetGroupReduce(
     
         expectedType match {
           case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
    -        throw new PlanGenException("GroupReduce operations currently only support returning
Rows.")
    +        throw new PlanGenException("Aggregate operations currently only support returning
Rows.")
           case _ => // ok
         }
     
    +    val groupingKeys = (0 until grouping.length).toArray
    +    // add grouping fields, position keys in the input, and input type
    +    val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
    +      inputType, rowType, grouping)
    +
         val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
           config,
           // tell the input operator that this operator currently only supports Rows as input
           Some(TypeConverter.DEFAULT_ROW_TYPE))
     
    +    val intermediateType = determineReturnType(
    +      aggregateResult.intermediateDataType,
    +      expectedType,
    +      config.getNullCheck,
    +      config.getEfficientTypeUsage)
    +
    +
         // get the output types
    -    val fieldsNames = rowType.getFieldNames
         val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
         .map(f => f.getType.getSqlTypeName)
         .map(n => TypeConverter.sqlTypeToTypeInfo(n))
         .toArray
     
         val rowTypeInfo = new RowTypeInfo(fieldTypes)
    +
    +    val mappedInput = inputDS.map(aggregateResult.mapFunc.apply(
    +      config, inputDS.getType, intermediateType))
    +
         val groupReduceFunction =
    -      func.apply(config, inputDS.getType.asInstanceOf[RowTypeInfo], rowTypeInfo)
    +      aggregateResult.reduceGroupFunc.apply(
    --- End diff --
    
    `AggregateUtil` can directly return the `GroupReduceFunction`


> DataSet RelNode refactoring
> ---------------------------
>
>                 Key: FLINK-3596
>                 URL: https://issues.apache.org/jira/browse/FLINK-3596
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API
>    Affects Versions: 1.1.0
>            Reporter: Vasia Kalavri
>            Assignee: Vasia Kalavri
>
> After discussion with [~fhueske], [~chengxiang li], and [~twalthr], we have decided to
make the following refactoring:
> - Make the DataSet RelNodes correspond to logical relational operators.
> - Move the code generation from the rules into the DataSet RelNodes.
> - Remove the Flink RelNode layer and have a 1-pass translation instead of a 2-pass translation
as currently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message