flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ChengXiangLi <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3474] support partial aggregate
Date Sun, 06 Mar 2016 07:38:22 GMT
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55134609
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
---
    @@ -17,115 +17,144 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    -import java.math.BigInteger
     import com.google.common.math.LongMath
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
     
    -// for byte, short, int we return int
    -abstract class IntegralAvgAggregate[T: Numeric] extends Aggregate[T] {
    +abstract class IntegralAvgAggregate[T] extends Aggregate[T] {
    +  private final val intermediateType = Array(SqlTypeName.BIGINT, SqlTypeName.BIGINT)
       
    -  var sum: Long = 0
    -  var count: Long = 0
    +  override def initiate(partial: Row): Unit = {
    +    partial.setField(aggOffsetInRow, 0L)
    +    partial.setField(aggOffsetInRow + 1, 0L)
    +  }
    +
    +  override def merge(partial: Row, buffer: Row): Unit = {
    +    val partialSum = partial.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val partialCount = partial.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    buffer.setField(aggOffsetInRow, LongMath.checkedAdd(partialSum, bufferSum))
    +    buffer.setField(aggOffsetInRow + 1, LongMath.checkedAdd(partialCount, bufferCount))
    +  }
     
    -  override def initiateAggregate: Unit = {
    -    sum = 0
    -    count = 0
    +  override def intermediateDataType: Array[SqlTypeName] = {
    +    intermediateType
       }
     
    +  override def supportPartial: Boolean = true
     }
     
     class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
    -
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Byte])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Byte]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Byte = {
    -    (sum / count).toByte
    +  override def evaluate(buffer: Row): Byte = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toByte
       }
     }
     
     class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Short])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Short]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Short = {
    -    (sum / count).toShort
    +  override def evaluate(buffer: Row): Short = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toShort
       }
     }
     
     class IntAvgAggregate extends IntegralAvgAggregate[Int] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Int])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Int]
    +    partial.setField(aggOffsetInRow, input.toLong)
    --- End diff --
    
    Previously, `BigInteger` is used inside aggregate function to store the sum value. For
partial aggregate, the partial sum and count value need to be transferred between Flink operators,
which is part of the data type, there is not corresponding type for `BigInteger` in Calcite's
`SqlTypeName`, i have not found a suitable solution for this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message