flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...
Date Wed, 01 Mar 2017 23:42:44 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103806213
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
    @@ -737,101 +632,121 @@ object AggregateUtil {
               aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMinAggregate
    +                new ByteMinAggFunction
                   case SMALLINT =>
    -                new ShortMinAggregate
    +                new ShortMinAggFunction
                   case INTEGER =>
    -                new IntMinAggregate
    +                new IntMinAggFunction
                   case BIGINT =>
    -                new LongMinAggregate
    +                new LongMinAggFunction
                   case FLOAT =>
    -                new FloatMinAggregate
    +                new FloatMinAggFunction
                   case DOUBLE =>
    -                new DoubleMinAggregate
    +                new DoubleMinAggFunction
                   case DECIMAL =>
    -                new DecimalMinAggregate
    +                new DecimalMinAggFunction
                   case BOOLEAN =>
    -                new BooleanMinAggregate
    +                new BooleanMinAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Min aggregate does no support type:" + sqlType)
                 }
               } else {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMaxAggregate
    +                new ByteMaxAggFunction
                   case SMALLINT =>
    -                new ShortMaxAggregate
    +                new ShortMaxAggFunction
                   case INTEGER =>
    -                new IntMaxAggregate
    +                new IntMaxAggFunction
                   case BIGINT =>
    -                new LongMaxAggregate
    +                new LongMaxAggFunction
                   case FLOAT =>
    -                new FloatMaxAggregate
    +                new FloatMaxAggFunction
                   case DOUBLE =>
    -                new DoubleMaxAggregate
    +                new DoubleMaxAggFunction
                   case DECIMAL =>
    -                new DecimalMaxAggregate
    +                new DecimalMaxAggFunction
                   case BOOLEAN =>
    -                new BooleanMaxAggregate
    +                new BooleanMaxAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Max aggregate does no support type:" + sqlType)
                 }
               }
             }
             case _: SqlCountAggFunction =>
    -          aggregates(index) = new CountAggregate
    +          aggregates(index) = new CountAggFunction
             case unSupported: SqlAggFunction =>
               throw new TableException("unsupported Function: " + unSupported.getName)
           }
    -      setAggregateDataOffset(index)
    -    }
    -
    -    // set the aggregate intermediate data start index in Row, and update current value.
    -    def setAggregateDataOffset(index: Int): Unit = {
    -      aggregates(index).setAggOffsetInRow(aggOffset)
    -      aggOffset += aggregates(index).intermediateDataType.length
         }
     
         (aggFieldIndexes, aggregates)
       }
     
    -  private def createAggregateBufferDataType(
    -    groupings: Array[Int],
    -    aggregates: Array[Aggregate[_]],
    -    inputType: RelDataType,
    -    windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
    +  private def createDataSetAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[TableAggregateFunction[_]],
    +      inputType: RelDataType,
    +      windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
     
         // get the field data types of group keys.
    -    val groupingTypes: Seq[TypeInformation[_]] = groupings
    -      .map(inputType.getFieldList.get(_).getType)
    -      .map(FlinkTypeFactory.toTypeInfo)
    +    val groupingTypes: Seq[TypeInformation[_]] =
    +      groupings
    +        .map(inputType.getFieldList.get(_).getType)
    +        .map(FlinkTypeFactory.toTypeInfo)
     
         // get all field data types of all intermediate aggregates
    -    val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
    +    val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
    +      val clazz: Class[_] = agg.getClass
    --- End diff --
    
    We need to obtain the `TypeInformation` of the `Accumulator` here, not the type of the
`AggregateFunction`.
    We might need to add a `getAccumulatorType()` method to the `AggregateFunction` if we
cannot extract the type from the object returned by `AggregateFunction.createAccumulator()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message