spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Denis Bolshakov <bolshakov.de...@gmail.com>
Subject Re: Pasting into spark-shell doesn't work for Databricks example
Date Tue, 22 Nov 2016 14:35:57 GMT
Hello,

We have the same issue,

We use latest release 2.0.2.

Setup with 1.6.1 works fine.

Could somebody provide a workaround how to fix that?

Kind regards,
Denis

On 21 November 2016 at 20:23, jggg777 <jonrgregg@gmail.com> wrote:

> I'm simply pasting in the UDAF example from this page and getting errors
> (basic EMR setup with Spark 2.0):
> https://docs.cloud.databricks.com/docs/latest/databricks_
> guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/
> 03%20UDF%20and%20UDAF%20-%20scala.html
>
> The imports appear to work, but then I see errors like "not found: type
> UserDefinedAggregateFunction".
>
> If I run ":paste" and paste it in that way it does work, but I'm interested
> in knowing why Ctrl-V doesn't.  What is happening under the hood which
> makes
> it seem like the imports are working even though they aren't?  And is there
> a way to fix this in general?
>
> >>>
> scala> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
>
> scala> import org.apache.spark.sql.expressions.
> UserDefinedAggregateFunction
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
>
> scala> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Row
>
> scala> import org.apache.spark.sql.types._
> import org.apache.spark.sql.types._
>
> scala>
>
> scala> class GeometricMean extends UserDefinedAggregateFunction {
>      |   // This is the input fields for your aggregate function.
>      |   override def inputSchema: org.apache.spark.sql.types.StructType =
>      |     StructType(StructField("value", DoubleType) :: Nil)
>      |
>      |   // This is the internal fields you keep for computing your
> aggregate.
>      |   override def bufferSchema: StructType = StructType(
>      |     StructField("count", LongType) ::
>      |     StructField("product", DoubleType) :: Nil
>      |   )
>      |
>      |   // This is the output type of your aggregatation function.
>      |   override def dataType: DataType = DoubleType
>      |
>      |   override def deterministic: Boolean = true
>      |
>      |   // This is the initial value for your buffer schema.
>      |   override def initialize(buffer: MutableAggregationBuffer): Unit =
> {
>      |     buffer(0) = 0L
>      |     buffer(1) = 1.0
>      |   }
>      |
>      |   // This is how to update your buffer schema given an input.
>      |   override def update(buffer: MutableAggregationBuffer, input: Row):
> Unit = {
>      |     buffer(0) = buffer.getAs[Long](0) + 1
>      |     buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
>      |   }
>      |
>      |   // This is how to merge two objects with the bufferSchema type.
>      |   override def merge(buffer1: MutableAggregationBuffer, buffer2:
> Row): Unit = {
>      |     buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
>      |     buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
>      |   }
>      |
>      |   // This is where you output the final value, given the final value
> of your bufferSchema.
>      |   override def evaluate(buffer: Row): Any = {
>      |     math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
>      |   }
>      | }
> <console>:11: error: not found: type UserDefinedAggregateFunction
>        class GeometricMean extends UserDefinedAggregateFunction {
>                                    ^
> <console>:14: error: not found: value StructType
>            StructType(StructField("value", DoubleType) :: Nil)
>            ^
> <console>:14: error: not found: value StructField
>            StructType(StructField("value", DoubleType) :: Nil)
>                       ^
> <console>:14: error: not found: value DoubleType
>            StructType(StructField("value", DoubleType) :: Nil)
>                                            ^
> <console>:17: error: not found: type StructType
>          override def bufferSchema: StructType = StructType(
>                                     ^
> <console>:17: error: not found: value StructType
>          override def bufferSchema: StructType = StructType(
>                                                  ^
> <console>:18: error: not found: value StructField
>            StructField("count", LongType) ::
>            ^
> <console>:18: error: not found: value LongType
>            StructField("count", LongType) ::
>                                 ^
> <console>:19: error: not found: value StructField
>            StructField("product", DoubleType) :: Nil
>            ^
> <console>:19: error: not found: value DoubleType
>            StructField("product", DoubleType) :: Nil
>                                   ^
> <console>:23: error: not found: type DataType
>          override def dataType: DataType = DoubleType
>                                 ^
> <console>:23: error: not found: value DoubleType
>          override def dataType: DataType = DoubleType
>                                            ^
> <console>:28: error: not found: type MutableAggregationBuffer
>          override def initialize(buffer: MutableAggregationBuffer): Unit =
> {
>                                          ^
> <console>:34: error: not found: type MutableAggregationBuffer
>          override def update(buffer: MutableAggregationBuffer, input: Row):
> Unit = {
>                                      ^
> <console>:34: error: not found: type Row
>          override def update(buffer: MutableAggregationBuffer, input: Row):
> Unit = {
>                                                                       ^
> <console>:40: error: not found: type MutableAggregationBuffer
>          override def merge(buffer1: MutableAggregationBuffer, buffer2:
> Row): Unit = {
>                                      ^
> <console>:40: error: not found: type Row
>          override def merge(buffer1: MutableAggregationBuffer, buffer2:
> Row): Unit = {
>                                                                         ^
> <console>:46: error: not found: type Row
>          override def evaluate(buffer: Row): Any = {
>                                        ^
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
> for-Databricks-example-tp28113.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>


-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.denis@gmail.com

Mime
View raw message