spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Manjunath, Kiran" <kiman...@akamai.com>
Subject GenericRowWithSchema cannot be cast to java.lang.Double : UDAF error
Date Fri, 04 Nov 2016 20:46:29 GMT
I am trying to implement a sample “sum” functionality over rolling window.
Below code may not make sense (may not be efficient) but during the course of other major
implementation, have stumbled on below error which is blocking.

Error Obtained -  “GenericRowWithSchema cannot be cast to java.lang.Double” during evaluation.
Is this a known problem in Spark?
I am using 2.0.0

Code
====

class GeometricMean extends UserDefinedAggregateFunction {

    def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil)
    def bufferSchema: StructType = new StructType().add("info", ArrayType(DoubleType),false)


    def dataType : DataType =  DoubleType
    def deterministic: Boolean = true

    def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = ArrayBuffer.empty[Double]

    }

    def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
        val arr1 = buffer.getAs[Seq[Double]](0)
        val arr = ArrayBuffer(input) ++ arr1
        buffer(0) = arr
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        val arr1 = buffer1.getAs[Seq[Double]](0)
        val arr = arr1 ++  buffer2.getAs[Seq[Double]](0)
        buffer1.update(0,arr)
    }

    def evaluate(buffer: Row): Any = {
        var s : Double = 0
        val arr = buffer.getAs[Seq[Double]](0)
        val arrd = arr.toArray
        arrd.foreach(s += _)
        s
    }
}

val GM  = new GeometricMean
val r = new scala.util.Random(88)
val schema = new StructType().add("id",IntegerType).add("Count",IntegerType)

val rnNum1 = for( i <- 1 to 10) yield { Row(i,r.nextInt(10-0+1)) }

val wSpec1 = Window.orderBy("id").rowsBetween(-1, +3)
val rdd = sc.parallelize(rnNum)
val df = sqlContext.createDataFrame(rdd,schema)
val dfWithMovingAvg = df.withColumn( "movingAvg",avg(df.col("Count")).over(wSpec1)).withColumn("customMovingAvg",GM(df.col("Count")).over(wSpec1))
dfWithMovingAvg.take(5).foreach(println)


Error
===


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed
1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)



Regards,
Kiran
Mime
View raw message