From Iain Cundy <>
Subject RE: [MARKETING] Spark Streaming stateful transformation mapWithState function getting error scala.MatchError: [Ljava.lang.Object]
Date Mon, 14 Mar 2016 14:06:19 GMT
Hi Vinti

I don’t program in scala, but I think you’ve changed the meaning of the current variable
– look again at what it state and what is new data.

Assuming it works like the Java API, to use this function to maintain State you must call
State.update, while you can return anything, not just the state.


From: Vinti Maheshwari []
Sent: 12 March 2016 22:10
To: user
Subject: [MARKETING] Spark Streaming stateful transformation mapWithState function getting
error scala.MatchError: [Ljava.lang.Object]

Hi All,

I wanted to replace my updateStateByKey function with mapWithState function (Spark 1.6) to
improve performance of my program.

I was following these two documents:

but i am getting error scala.MatchError: [Ljava.lang.Object]

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 71.0 failed
4 times, most recent failure: Lost task 0.3 in stage 71.0 (TID 88,
scala.MatchError: [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;)

at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)

at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)

at scala.Option.flatMap(Option.scala:170)

at HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84)

Reference code:

def trackStateFunc(key:String, value:Option[Array[Long]], current:State[Array[Long]]) = {

        //either we can use this

        // current.update(value)

 :+ current).orElse(Some(current)).flatMap{

          case x:Array[Long] => Try( + _).toArray).toOption

          case None => ???



      val statespec:StateSpec[String, Array[Long], Array[Long], Option[Array[Long]]] = StateSpec.function(trackStateFunc

      val state: MapWithStateDStream[String, Array[Long], Array[Long], Option[Array[Long]]]
= parsedStream.mapWithState(statespec)

My previous working code which was using updateStateByKey function:

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(

        (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
 +: current).orElse(Some(current))

          .flatMap(as => Try( + _).toArray).toOption)

Anyone has idea what can be the issue?
Thanks & Regards,

