spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Iain Cundy <Iain.Cu...@amdocs.com>
Subject RE: [MARKETING] Spark Streaming stateful transformation mapWithState function getting error scala.MatchError: [Ljava.lang.Object]
Date Mon, 14 Mar 2016 14:06:19 GMT
Hi Vinti

I don’t program in scala, but I think you’ve changed the meaning of the current variable
– look again at what it state and what is new data.

Assuming it works like the Java API, to use this function to maintain State you must call
State.update, while you can return anything, not just the state.

Cheers
Iain

From: Vinti Maheshwari [mailto:vinti.uiet@gmail.com]
Sent: 12 March 2016 22:10
To: user
Subject: [MARKETING] Spark Streaming stateful transformation mapWithState function getting
error scala.MatchError: [Ljava.lang.Object]


Hi All,

I wanted to replace my updateStateByKey function with mapWithState function (Spark 1.6) to
improve performance of my program.

I was following these two documents: https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html

but i am getting error scala.MatchError: [Ljava.lang.Object]

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 71.0 failed
4 times, most recent failure: Lost task 0.3 in stage 71.0 (TID 88, ttsv-lab-vmdb-01.englab.juniper.net):
scala.MatchError: [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;)

at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)

at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)

at scala.Option.flatMap(Option.scala:170)

at HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84)

Reference code:

def trackStateFunc(key:String, value:Option[Array[Long]], current:State[Array[Long]]) = {



        //either we can use this

        // current.update(value)



        value.map(_ :+ current).orElse(Some(current)).flatMap{

          case x:Array[Long] => Try(x.map(BDV(_)).reduce(_ + _).toArray).toOption

          case None => ???

        }

      }



      val statespec:StateSpec[String, Array[Long], Array[Long], Option[Array[Long]]] = StateSpec.function(trackStateFunc
_)



      val state: MapWithStateDStream[String, Array[Long], Array[Long], Option[Array[Long]]]
= parsedStream.mapWithState(statespec)

My previous working code which was using updateStateByKey function:

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(

        (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {

         prev.map(_ +: current).orElse(Some(current))

          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)

      })
Anyone has idea what can be the issue?
Thanks & Regards,
Vinti

This message and the information contained herein is proprietary and confidential and subject
to the Amdocs policy statement,
you may review at http://www.amdocs.com/email_disclaimer.asp
Mime
View raw message