flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3474] support partial aggregate
Date Wed, 02 Mar 2016 17:52:13 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54761986
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
---
    @@ -17,26 +17,77 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     /**
    - * Represents a SQL aggregate function. The user should first initialize the aggregate,
then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2
phases:
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate.
To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce
phase,
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the
start
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct
fields
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence
of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
      */
     trait Aggregate[T] extends Serializable {
    +
    +  protected var aggOffsetInRow: Int = _
    +
       /**
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
        */
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
     
       /**
    -   * Feed the aggregate field value.
    +   * Transform the aggregate field value into intermediate aggregate data.
        * @param value
    +   * @param intermediate
        */
    -  def aggregate(value: Any)
    +  def prepare(value: Any, intermediate: Row): Unit
    --- End diff --
    
    Yes, this would be aggregation specific. 
    For example for a `SUM` aggregation, `prepare` could insert a `0`, which is basically
the same what `initiate` would do. However, it is also OK, to do it in `prepare` directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message