flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10845) Support DISTINCT aggregates for batch
Date Tue, 04 Dec 2018 15:00:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708820#comment-16708820
] 

ASF GitHub Bot commented on FLINK-10845:
----------------------------------------

twalthr commented on a change in pull request #7079: [FLINK-10845][table] Support multiple
different DISTINCT aggregates for batch
URL: https://github.com/apache/flink/pull/7079#discussion_r238696433
 
 

 ##########
 File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ##########
 @@ -191,7 +191,17 @@ object ScalarOperators {
           )
         )
     }
-  }  
+  }
+
+  def generateDistinctFrom(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val newleft = left.copy(nullTerm = GeneratedExpression.NEVER_NULL)
 
 Review comment:
   This looks not correct to me. By setting the expression to never null, you basically compare
the default values of expressions. Long has `0L` for example. For example, a `0 IS NOT DISTINCT
FROM NULL` might return `true`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support DISTINCT aggregates for batch
> -------------------------------------
>
>                 Key: FLINK-10845
>                 URL: https://issues.apache.org/jira/browse/FLINK-10845
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: xueyu
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, we support distinct aggregates for streaming. However, executing the same
query on batch like the following test:
> {code}
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val sqlQuery =
>       "SELECT b, " +
>       "  SUM(DISTINCT (a / 3)), " +
>       "  COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
>       "  COUNT(DISTINCT c) " +
>       "FROM MyTable " +
>       "GROUP BY b"
>     val data = new mutable.MutableList[(Int, Long, String)]
>     data.+=((1, 1L, "Hi"))
>     data.+=((2, 2L, "Hello"))
>     data.+=((3, 2L, "Hello world"))
>     data.+=((4, 3L, "Hello world, how are you?"))
>     data.+=((5, 3L, "I am fine."))
>     data.+=((6, 3L, "Luke Skywalker"))
>     data.+=((7, 4L, "Comment#1"))
>     data.+=((8, 4L, "Comment#2"))
>     data.+=((9, 4L, "Comment#3"))
>     data.+=((10, 4L, "Comment#4"))
>     data.+=((11, 5L, "Comment#5"))
>     data.+=((12, 5L, "Comment#6"))
>     data.+=((13, 5L, "Comment#7"))
>     data.+=((14, 5L, "Comment#8"))
>     data.+=((15, 5L, "Comment#9"))
>     data.+=((16, 6L, "Comment#10"))
>     data.+=((17, 6L, "Comment#11"))
>     data.+=((18, 6L, "Comment#12"))
>     data.+=((19, 6L, "Comment#13"))
>     data.+=((20, 6L, "Comment#14"))
>     data.+=((21, 6L, "Comment#15"))
>     val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
>     tEnv.registerTable("MyTable", t)
>     tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
> {code}
> Fails with:
> {code}
> org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT DISTINCT FROM

> If you think this function should be supported, you can create an issue and start a discussion
for it.
> 	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
> 	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
> 	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
> 	at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
> 	at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
> 	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
> 	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
> 	at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
> 	at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50)
> 	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message