spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "koert kuipers (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-15051) Aggregator with DataFrame does not allow Alias
Date Mon, 02 May 2016 02:52:12 GMT

     [ https://issues.apache.org/jira/browse/SPARK-15051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

koert kuipers updated SPARK-15051:
----------------------------------
    Description: 
this works:
{noformat}
object SimpleSum extends Aggregator[Row, Int, Int] {
  def zero: Int = 0
  def reduce(b: Int, a: Row) = b + a.getInt(1)
  def merge(b1: Int, b2: Int) = b1 + b2
  def finish(b: Int) = b
  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
  def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
val df = List(("a", 1), ("a", 2), ("a", 3)).toDF("k", "v")
df.groupBy("k").agg(SimpleSum.toColumn).show
{noformat}

but it breaks when i try to give the new column a name:
{noformat}
df.groupBy("k").agg(SimpleSum.toColumn as "b").show
{noformat}

the error is:
{noformat}
   org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [k#192], [k#192,(SimpleSum(unknown),mode=Complete,isDistinct=false)
AS b#200];
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:54)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:270)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:51)
   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:51)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:54)
   at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:61)
{noformat}

the reason it breaks is because Column.as(alias: String) returns a Column not a TypedColumn,
and as a result the method TypedColumn.withInputType does not get called.

P.S. The whole TypedColumn.withInputType seems actually rather fragile to me. I wish Aggregators
simply also kept the input encoder and that whole bit can be removed about dynamically trying
to insert the Encoder.

  was:
this works:
{noformat}
object SimpleSum extends Aggregator[Row, Int, Int] {
  def zero: Int = 0
  def reduce(b: Int, a: Row) = b + a.getInt(1)
  def merge(b1: Int, b2: Int) = b1 + b2
  def finish(b: Int) = b
  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
  def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
val df = List(("a", 1), ("a", 2), ("a", 3)).toDF("k", "v")
df.groupBy("k").agg(SimpleSum.toColumn).show
{noformat}

but it breaks when i try to give the new column a name:
{noformat}
df.groupBy("k").agg(SimpleSum.toColumn as "b").show
{noformat}

the error is:
{noformat}
   org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [k#192], [k#192,(SimpleSum(unknown),mode=Complete,isDistinct=false)
AS b#200];
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:54)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:270)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:51)
   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:51)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:54)
   at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:61)
{noformat}

the reason it breaks is because Column.as(alias: String) returns a Column not a TypedColumn,
and as a result the method TypedColumn.withInputType does not get called.

P.S. The whole TypedColumn.withInputType seems actually rather fragile to me. I wish Aggregators
simply also kept the input encoder and that whole bit can be removed about dynamically trying
to insert it.


> Aggregator with DataFrame does not allow Alias
> ----------------------------------------------
>
>                 Key: SPARK-15051
>                 URL: https://issues.apache.org/jira/browse/SPARK-15051
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>         Environment: Spark 2.0.0-SNAPSHOT
>            Reporter: koert kuipers
>
> this works:
> {noformat}
> object SimpleSum extends Aggregator[Row, Int, Int] {
>   def zero: Int = 0
>   def reduce(b: Int, a: Row) = b + a.getInt(1)
>   def merge(b1: Int, b2: Int) = b1 + b2
>   def finish(b: Int) = b
>   def bufferEncoder: Encoder[Int] = Encoders.scalaInt
>   def outputEncoder: Encoder[Int] = Encoders.scalaInt
> }
> val df = List(("a", 1), ("a", 2), ("a", 3)).toDF("k", "v")
> df.groupBy("k").agg(SimpleSum.toColumn).show
> {noformat}
> but it breaks when i try to give the new column a name:
> {noformat}
> df.groupBy("k").agg(SimpleSum.toColumn as "b").show
> {noformat}
> the error is:
> {noformat}
>    org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [k#192], [k#192,(SimpleSum(unknown),mode=Complete,isDistinct=false)
AS b#200];
>    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
>    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:54)
>    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:270)
>    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:51)
>    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
>    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:51)
>    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:54)
>    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
>    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:61)
> {noformat}
> the reason it breaks is because Column.as(alias: String) returns a Column not a TypedColumn,
and as a result the method TypedColumn.withInputType does not get called.
> P.S. The whole TypedColumn.withInputType seems actually rather fragile to me. I wish
Aggregators simply also kept the input encoder and that whole bit can be removed about dynamically
trying to insert the Encoder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message