flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6888) Can not determine TypeInformation of ACC type of AggregateFunction when ACC is a Scala case/tuple class
Date Tue, 20 Jun 2017 12:49:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055700#comment-16055700

ASF GitHub Bot commented on FLINK-6888:

Github user wuchong commented on a diff in the pull request:

    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
    @@ -358,20 +358,27 @@ abstract class TableEnvironment(val config: TableConfig) {
         * Registers an [[AggregateFunction]] under a unique name. Replaces already existing
         * user-defined functions under this name.
    -  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC](
    +  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC: TypeInformation](
           name: String, function: AggregateFunction[T, ACC]): Unit = {
         // check if class not Scala object
         // check if class could be instantiated
    -    val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
         // register in Table API
         functionCatalog.registerFunction(name, function.getClass)
         // register in SQL API
    -    val sqlFunctions = createAggregateSqlFunction(name, function, typeInfo, typeFactory)
    +    val sqlFunctions = createAggregateSqlFunction(
    --- End diff --
    @fhueske thanks for your explanation. I tried to wrap the UDAGG in an `AggregateFunctionWrapper`,
but find that it is not an easy way. Because I can't override the user-defined contract methods,
such as `accumulate`, `retract`, `merge`. And in code generation, we generate the acc type
class depend on `createAccumulator` method return type, but the return type of `createAccumulator`
of `AggregateFunctionWrapper` can only be `Any` which will result in error.
    In addition, I plan to support composite result type for UDAGG. This also need the return
type not only to be known to Calcite for semantic validation but also to be known for code

> Can not determine TypeInformation of ACC type of AggregateFunction when ACC is a Scala
case/tuple class
> -------------------------------------------------------------------------------------------------------
>                 Key: FLINK-6888
>                 URL: https://issues.apache.org/jira/browse/FLINK-6888
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>             Fix For: 1.4.0
> Currently the {{ACC}} TypeInformation of {{org.apache.flink.table.functions.AggregateFunction[T,
ACC]}} is extracted using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class
or tuple class, the TypeInformation will fall back to {{GenericType}} which result in bad
performance when state de/serialization. 
> I suggest to extract the ACC TypeInformation when called {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
>   //Overloaded accumulate method
>   def accumulate(acc: Accumulator, value: Long): Unit = {
>   }
>   override def createAccumulator(): Accumulator = Accumulator(0, 0)
>   override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.

This message was sent by Atlassian JIRA

View raw message