Thank you, extending Serializable solved the issue. I am left with more questions than answers though :-).





From: Yana Kadiyska []
Sent: Monday, July 13, 2015 2:49 PM
To: Ellafi, Saif A.
Cc:; Liu, Weicheng
Subject: Re:


I would certainly try to mark the Validator class as Serializable...If that doesn't do it you can also try and see if this flag sheds more light:


 By programming guide I mean this: I could almost swear I had seen an extended section on tricky serialization issues (i.e. scenarios where you end up serializing more than you think because of what your closure captures) but I can't locate this section now...


On Mon, Jul 13, 2015 at 1:30 PM, <> wrote:

Thank you very much for your time, here is how I designed the case classes, as far as I know they apply properly.


Ps: By the way, what do you mean by “The programming guide?”


abstract class Validator {


    // positions to access with Row.getInt(x)

    val shortsale_in_pos = 10

    val month_pos = 11

    val foreclosure_start_dt_pos = 14

    val filemonth_dt_pos = 12

    val reo_start_dt_pos = 14

    // ..


    // redesign into Iterable of Rows -->

    def validate(input: org.apache.spark.sql.Row): Validator




case object Nomatch extends Validator {

    def validate(input: Row): Validator = this



case object Shortsale extends Validator {

    def validate(input: Row): Validator = {

        var check1: Boolean = if (input.getDouble(shortsale_in_pos) > 140.0) true else false

        if (check1) this else Nomatch






From: Yana Kadiyska []
Sent: Monday, July 13, 2015 2:16 PM
To: Ellafi, Saif A.
Subject: Re:


It's a bit hard to tell from the snippets of code but it's likely related to the fact that when you serialize instances the enclosing class, if any, also gets serialized, as well as any other place where fields used in the closure come from...e.g.check this discussion:


The programming guide also has good advice on serialization issues. I would particulary check how Shortsale/Nomatch/Foreclosure are declared (I'd advise making them top-level case classes)...


On Mon, Jul 13, 2015 at 12:32 PM, <> wrote:



For some experiment I am doing, I am trying to do the following.


1.Created an abstract class Validator. Created case objects from Validator with validate(row: Row): Boolean method.


2. Adding in a list all case objects


3. Each validate takes a Row into account, returns “itself” if validate returns true, so then, I do this to return an arbitrary number for each match

def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {


    var result: Int = -1


    for (validator <- validators) {

        validator.validate(row) match {

            case Shortsale =>  result = 0

            case Foreclosure => result = 1

            case Nomatch => result 99







val validators = List[ClientPath](




4. Then I run the map[Int](row => evaluate_paths(row, validators)


But this blows up, it does not like the creation of the list of validators when executing an action function on the RDD such as take(1).

I have tried also instead of list, an Iterator and Array, but no case. Also replaced the for loop with a while loop.

Curiously, I tried with custom-made Rows, and the execution works properly, when calling evaluate_paths(some_row, validators).


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 125.0 (TID 830, localhost): $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$; no valid constructor at$ExceptionInfo.newInvalidClassException( at at at at at at at at at at at




at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at at org.apache.spark.executor.Executor$ at java.util.concurrent.ThreadPoolExecutor.runWorker( at java.util.concurrent.ThreadPoolExecutor$ at


Driver stacktrace: at$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$




Any advice grateful