Hi, 

I've had some troubles developing a Specs2 matcher that checks that a predicate holds for all the elements of an RDD, and using it for testing a simple Spark Streaming program. I've finally been able to get a code that works, you can see it in https://gist.github.com/juanrh/dffd060e3a371676b83c, but I wanted to check with the list that I'm using the right approach. First I defined the matcher as follows:

def foreachRecord[T](predicate : T => Boolean) : Matcher[RDD[T]] = {  (rdd : RDD[T]) =>

      val failingRecords = rdd.filter(! predicate(_))

      (

        failingRecords.isEmpty,

        "each record fulfils the predicate",

        s"predicate failed for records ${failingRecords.take(4).mkString(", ")} ..."  

      )

    }


which works ok for examples like

def simpleBatchTest = {

    val rdd = sc.parallelize(1 to 100, 3)

    rdd should foreachRecord(_ > 0)

  }  


The problem started when I tried to use it to check that a predicate holds for all batches / RDDs of a DStream. The idea was using foreachRDD to update a driver local org.specs2.execute.Result object, to make an and for all the batches: 

def simpleStreamingTest : Result = {

    val ssc = new StreamingContext(sc, Duration(300))

    val record = "hola"

    val batches = Seq.fill(5)(Seq.fill(10)(record))

    val queue = new Queue[RDD[String]]

    queue ++= batches.map(batch => sc.parallelize(batch, numSlices = 2))

    val inputDStream = ssc.queueStream(queue, oneAtATime = true)

    var result : Result = ok

    inputDStream.foreachRDD { rdd => 

       val r = AsResult { rdd should foreachRecord(_ == record) }

       result = result and r

    }

    

    ssc.start()

    StreamingContextUtils.awaitForNBatchesCompleted(batches.length)(ssc)

    ssc.stop(stopSparkContext=false, stopGracefully=false)    

    println(s"result : ${result}")

    result

  }


This leads to an exception running the test, because AsResult { rdd should foreachRecord(_ == record) } fails because the closure argument of foreachRecord needs to access the local variable 'record', and to do that it tries to serialize the whole context, including 'result', for which it fails with: 

         Driver stacktrace:,org.apache.spark.SparkException: Job aborted due to stage failure: 

         Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): 

        java.io.InvalidClassException: org.specs2.execute.Success; no valid constructor


To solve this I followed http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ to specify more carefully which values would be accessed by the closure, defining the matcher

 def foreachRecord[T,C](predicateContext : C)(toPredicate : C => (T => Boolean)) : Matcher[RDD[T]] = {  

      val predicate = toPredicate(predicateContext)

      foreachRecord(predicate)

    }


and then replacing the call above with 

      val r = AsResult { rdd should foreachRecord(record)(r => { _ == r} ) } 


I'm writing to the list because I wanted to conform that this is a proper solution, and wanted to ask you guys if somebody can imagine a better solution, as this is not too bad but the call foreachRecord(record)(r => { _ == r} ) is still a bit ugly. It is curious that Spark's closure cleaner is smart enough to avoid sending 'result' to the closure in this very similar example:

      val r2 = AsResult { rdd.filter(_ != record).count === 0 }

Also, I wanted to confirm that the approach of updating a var in driver in a foreachRDD is a good idea, I understand that foreachRDD runs in the driver, so that should be ok as long as the local variable is updated using action results only.

Thanks a lot in advance.

Greetings, 

Juan