spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "koert kuipers (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-15051) Aggregator with DataFrame does not allow Alias
Date Mon, 02 May 2016 02:51:12 GMT
koert kuipers created SPARK-15051:
-------------------------------------

             Summary: Aggregator with DataFrame does not allow Alias
                 Key: SPARK-15051
                 URL: https://issues.apache.org/jira/browse/SPARK-15051
             Project: Spark
          Issue Type: Bug
          Components: SQL
         Environment: Spark 2.0.0-SNAPSHOT
            Reporter: koert kuipers


this works:
{noformat}
object SimpleSum extends Aggregator[Row, Int, Int] {
  def zero: Int = 0
  def reduce(b: Int, a: Row) = b + a.getInt(1)
  def merge(b1: Int, b2: Int) = b1 + b2
  def finish(b: Int) = b
  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
  def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
val df = List(("a", 1), ("a", 2), ("a", 3)).toDF("k", "v")
df.groupBy("k").agg(SimpleSum.toColumn).show
{noformat}

but it breaks when i try to give the new column a name:
{noformat}
df.groupBy("k").agg(SimpleSum.toColumn as "b").show
{noformat}

the error is:
{noformat}
   org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [k#192], [k#192,(SimpleSum(unknown),mode=Complete,isDistinct=false)
AS b#200];
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:54)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:270)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:51)
   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:51)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:54)
   at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:61)
{noformat}

the reason it breaks is because Column.as(alias: String) returns a Column not a TypedColumn,
and as a result the method TypedColumn.withInputType does not get called.

P.S. The whole TypedColumn.withInputType seems actually rather fragile to me. I wish Aggregators
simply also kept the input encoder and that whole bit can be removed about dynamically trying
to insert it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message