spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re: Why must the dstream.foreachRDD(...) parameter be serializable?
Date Wed, 28 Jan 2015 02:33:29 GMT
I believe this is needed for driver recovery in Spark Streaming. If your Spark driver program
crashes, Spark Streaming can recover the application by reading the set of DStreams and output
operations from a checkpoint file (see https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing).
But to do that, it needs to remember all the operations you're running periodically, including
those in foreachRDD.

Matei

> On Jan 27, 2015, at 6:15 PM, Tobias Pfeiffer <tgp@preferred.jp> wrote:
> 
> Hi,
> 
> I want to do something like
> 
> dstream.foreachRDD(rdd => if (someCondition) ssc.stop())
> 
> so in particular the function does not touch any element in the RDD and runs completely
within the driver. However, this fails with a NotSerializableException because $outer is not
serializable etc. The DStream code says:
> 
>   def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
>     // because the DStream is reachable from the outer object here, and because 
>     // DStreams can't be serialized with closures, we can't proactively check 
>     // it for serializability and so we pass the optional false to SparkContext.clean
>     new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
>   }
> 
> To be honest, I don't understand the comment. Why must that function be serializable
even when there is no RDD action involved?
> 
> Thanks
> Tobias


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message