flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Anton Mushin (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4604) Add support for standard deviation/variance
Date Wed, 12 Oct 2016 13:09:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568664#comment-15568664
] 

Anton Mushin commented on FLINK-4604:
-------------------------------------

I tried check function in {{org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule#matches}},
but something went wrong :)
I did so
{code:title=org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule}
override def matches(call: RelOptRuleCall): Boolean = {
    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]

    // check if we have distinct aggregates
    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    if (distinctAggs) {
      throw new TableException("DISTINCT aggregates are currently not supported.")
    }

    // check if we have grouping sets
    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
    if (groupSets || agg.indicator) {
      throw new TableException("GROUPING SETS are currently not supported.")
    }

    (!distinctAggs && !groupSets && !agg.indicator) && !AggregateReduceFunctionsRule.INSTANCE.matches(call)
  }
{code}
And I got next plan and exception:
{noformat}
DataSetCalc(select=[CAST(/(-(CASE(=($f1, 0), null, $f0), /(*(CASE(=($f3, 0), null, $f2), CASE(=($f3,
0), null, $f2)), $f3)), CASE(=($f3, 1), null, -($f3, 1)))) AS $f0, CAST(/(-(CASE(=($f5, 0),
null, $f4), /(*(CASE(=($f7, 0), null, $f6), CASE(=($f7, 0), null, $f6)), $f7)), CASE(=($f7,
1), null, -($f7, 1)))) AS $f1, CAST(/(-(CASE(=($f9, 0), null, $f8), /(*(CASE(=($f11, 0), null,
$f10), CASE(=($f11, 0), null, $f10)), $f11)), CASE(=($f11, 1), null, -($f11, 1)))) AS $f2,
CAST(/(-(CASE(=($f13, 0), null, $f12), /(*(CASE(=($f15, 0), null, $f14), CASE(=($f15, 0),
null, $f14)), $f15)), CASE(=($f15, 1), null, -($f15, 1)))) AS $f3, CAST(/(-(CASE(=($f17, 0),
null, $f16), /(*(CASE(=($f19, 0), null, $f18), CASE(=($f19, 0), null, $f18)), $f19)), CASE(=($f19,
1), null, -($f19, 1)))) AS $f4, CAST(/(-(CASE(=($f21, 0), null, $f20), /(*(CASE(=($f23, 0),
null, $f22), CASE(=($f23, 0), null, $f22)), $f23)), CASE(=($f23, 1), null, -($f23, 1)))) AS
$f5])
  DataSetAggregate(select=[$SUM0($f6) AS $f0, COUNT($f6) AS $f1, $SUM0(_1) AS $f2, COUNT(_1)
AS $f3, $SUM0($f7) AS $f4, COUNT($f7) AS $f5, $SUM0(_2) AS $f6, COUNT(_2) AS $f7, $SUM0($f8)
AS $f8, COUNT($f8) AS $f9, $SUM0(_3) AS $f10, COUNT(_3) AS $f11, $SUM0($f9) AS $f12, COUNT($f9)
AS $f13, $SUM0(_4) AS $f14, COUNT(_4) AS $f15, $SUM0($f10) AS $f16, COUNT($f10) AS $f17, $SUM0(_5)
AS $f18, COUNT(_5) AS $f19, $SUM0($f11) AS $f20, COUNT($f11) AS $f21, $SUM0(_6) AS $f22, COUNT(_6)
AS $f23])
    DataSetCalc(select=[_1, _2, _3, _4, _5, _6])
      DataSetScan(table=[[_DataSetTable_0]])
{noformat}
{noformat}
org.apache.flink.api.table.TableException: Type NULL is not supported. Null values must have
a supported type.

	at org.apache.flink.api.table.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:128)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:553)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:56)
	at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:658)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
	at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
	at org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:181)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:300)
	at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:300)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:300)
	at org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
	at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
	at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
	at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
	at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
	at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
	at org.apache.flink.api.scala.batch.sql.AggregationsITCase.testVarSampAggregate(AggregationsITCase.scala:369)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
	at org.junit.runners.Suite.runChild(Suite.java:127)
	at org.junit.runners.Suite.runChild(Suite.java:26)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
{noformat}
then I remove {{!AggregateReduceFunctionsRule.INSTANCE.matches(call)}} and return [this code|https://issues.apache.org/jira/browse/FLINK-4604?focusedCommentId=15554768&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15554768],
tests is passed.
I search resolution this problem, do you have any ideas about it?

> Add support for standard deviation/variance
> -------------------------------------------
>
>                 Key: FLINK-4604
>                 URL: https://issues.apache.org/jira/browse/FLINK-4604
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Anton Mushin
>         Attachments: 1.jpg
>
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, STDDEV_SAMP,
VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test and document this rule. 
> If we also want to add this aggregates to Table API is up for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message