flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3474] support partial aggregate
Date Fri, 04 Mar 2016 10:33:03 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55014256
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
---
    @@ -17,115 +17,144 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    -import java.math.BigInteger
     import com.google.common.math.LongMath
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
     
    -// for byte, short, int we return int
    -abstract class IntegralAvgAggregate[T: Numeric] extends Aggregate[T] {
    +abstract class IntegralAvgAggregate[T] extends Aggregate[T] {
    +  private final val intermediateType = Array(SqlTypeName.BIGINT, SqlTypeName.BIGINT)
       
    -  var sum: Long = 0
    -  var count: Long = 0
    +  override def initiate(partial: Row): Unit = {
    +    partial.setField(aggOffsetInRow, 0L)
    +    partial.setField(aggOffsetInRow + 1, 0L)
    +  }
    +
    +  override def merge(partial: Row, buffer: Row): Unit = {
    +    val partialSum = partial.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val partialCount = partial.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    buffer.setField(aggOffsetInRow, LongMath.checkedAdd(partialSum, bufferSum))
    +    buffer.setField(aggOffsetInRow + 1, LongMath.checkedAdd(partialCount, bufferCount))
    +  }
     
    -  override def initiateAggregate: Unit = {
    -    sum = 0
    -    count = 0
    +  override def intermediateDataType: Array[SqlTypeName] = {
    +    intermediateType
       }
     
    +  override def supportPartial: Boolean = true
     }
     
     class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
    -
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Byte])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Byte]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Byte = {
    -    (sum / count).toByte
    +  override def evaluate(buffer: Row): Byte = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toByte
       }
     }
     
     class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Short])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Short]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Short = {
    -    (sum / count).toShort
    +  override def evaluate(buffer: Row): Short = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toShort
       }
     }
     
     class IntAvgAggregate extends IntegralAvgAggregate[Int] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Int])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Int]
    +    partial.setField(aggOffsetInRow, input.toLong)
    --- End diff --
    
    The previous version used a `BigInteger` to compute the sum of a `Long` average. Can you
change it back or explain why you would like to change that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message