flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation
Date Sun, 06 Mar 2016 07:38:40 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15182033#comment-15182033
] 

ASF GitHub Bot commented on FLINK-3474:
---------------------------------------

Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55134609
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
---
    @@ -17,115 +17,144 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    -import java.math.BigInteger
     import com.google.common.math.LongMath
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
     
    -// for byte, short, int we return int
    -abstract class IntegralAvgAggregate[T: Numeric] extends Aggregate[T] {
    +abstract class IntegralAvgAggregate[T] extends Aggregate[T] {
    +  private final val intermediateType = Array(SqlTypeName.BIGINT, SqlTypeName.BIGINT)
       
    -  var sum: Long = 0
    -  var count: Long = 0
    +  override def initiate(partial: Row): Unit = {
    +    partial.setField(aggOffsetInRow, 0L)
    +    partial.setField(aggOffsetInRow + 1, 0L)
    +  }
    +
    +  override def merge(partial: Row, buffer: Row): Unit = {
    +    val partialSum = partial.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val partialCount = partial.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    buffer.setField(aggOffsetInRow, LongMath.checkedAdd(partialSum, bufferSum))
    +    buffer.setField(aggOffsetInRow + 1, LongMath.checkedAdd(partialCount, bufferCount))
    +  }
     
    -  override def initiateAggregate: Unit = {
    -    sum = 0
    -    count = 0
    +  override def intermediateDataType: Array[SqlTypeName] = {
    +    intermediateType
       }
     
    +  override def supportPartial: Boolean = true
     }
     
     class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
    -
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Byte])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Byte]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Byte = {
    -    (sum / count).toByte
    +  override def evaluate(buffer: Row): Byte = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toByte
       }
     }
     
     class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Short])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Short]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Short = {
    -    (sum / count).toShort
    +  override def evaluate(buffer: Row): Short = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toShort
       }
     }
     
     class IntAvgAggregate extends IntegralAvgAggregate[Int] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Int])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Int]
    +    partial.setField(aggOffsetInRow, input.toLong)
    --- End diff --
    
    Previously, `BigInteger` is used inside aggregate function to store the sum value. For
partial aggregate, the partial sum and count value need to be transferred between Flink operators,
which is part of the data type, there is not corresponding type for `BigInteger` in Calcite's
`SqlTypeName`, i have not found a suitable solution for this now.


> Partial aggregate interface design and sort-based implementation
> ----------------------------------------------------------------
>
>                 Key: FLINK-3474
>                 URL: https://issues.apache.org/jira/browse/FLINK-3474
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API
>            Reporter: Chengxiang Li
>            Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to Flink user functions.
As hash-based combiner is not available yet(see PR #1517), we would use sort-based combine
as default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message