spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <>
Subject RE:
Date Mon, 13 Jul 2015 17:30:56 GMT
Thank you very much for your time, here is how I designed the case classes, as far as I know
they apply properly.

Ps: By the way, what do you mean by “The programming guide?”

abstract class Validator {

    // positions to access with Row.getInt(x)
    val shortsale_in_pos = 10
    val month_pos = 11
    val foreclosure_start_dt_pos = 14
    val filemonth_dt_pos = 12
    val reo_start_dt_pos = 14
    // ..

    // redesign into Iterable of Rows -->
    def validate(input: org.apache.spark.sql.Row): Validator


case object Nomatch extends Validator {
    def validate(input: Row): Validator = this

case object Shortsale extends Validator {
    def validate(input: Row): Validator = {
        var check1: Boolean = if (input.getDouble(shortsale_in_pos) > 140.0) true else
        if (check1) this else Nomatch


From: Yana Kadiyska []
Sent: Monday, July 13, 2015 2:16 PM
To: Ellafi, Saif A.
Subject: Re:

It's a bit hard to tell from the snippets of code but it's likely related to the fact that
when you serialize instances the enclosing class, if any, also gets serialized, as well as
any other place where fields used in the closure come from...e.g.check this discussion:

The programming guide also has good advice on serialization issues. I would particulary check
how Shortsale/Nomatch/Foreclosure are declared (I'd advise making them top-level case classes)...

On Mon, Jul 13, 2015 at 12:32 PM, <<>>

For some experiment I am doing, I am trying to do the following.

1.Created an abstract class Validator. Created case objects from Validator with validate(row:
Row): Boolean method.

2. Adding in a list all case objects

3. Each validate takes a Row into account, returns “itself” if validate returns true,
so then, I do this to return an arbitrary number for each match
def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {

    var result: Int = -1

    for (validator <- validators) {
        validator.validate(row) match {
            case Shortsale =>  result = 0
            case Foreclosure => result = 1
            case Nomatch => result 99

val validators = List[ClientPath](

4. Then I run the map[Int](row => evaluate_paths(row, validators)

But this blows up, it does not like the creation of the list of validators when executing
an action function on the RDD such as take(1).
I have tried also instead of list, an Iterator and Array, but no case. Also replaced the for
loop with a while loop.
Curiously, I tried with custom-made Rows, and the execution works properly, when calling evaluate_paths(some_row,

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 125.0 failed
1 times, most recent failure: Lost task 0.0 in stage 125.0 (TID 830, localhost):
no valid constructor at$ExceptionInfo.newInvalidClassException(
at at
at at
at at
at at
at at
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at
at org.apache.spark.executor.Executor$ at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$ at

Driver stacktrace: at<>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$


Any advice grateful

View raw message