spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Loots <eric.lo...@gmail.com>
Subject Unit testing and Spark Streaming
Date Fri, 12 Dec 2014 13:17:37 GMT
Hi,

I’ve started my first experiments with Spark Streaming and started with setting up an environment
using ScalaTest to do unit testing. Poked around on this mailing list and googled the topic.

One of the things I wanted to be able to do is to use Scala Sequences as data source in the
tests (instead of using files for example). For this, queueStream on a StreamingContext came
in handy.

I now have a setup that allows me to run WordSpec style tests like in:

class StreamTests extends StreamingContextBaseSpec("Some-tests") with Matchers with WordsCountsTestData
{

  "Running word count" should {
    "produce the correct word counts for a non-empty list of words" in {

      val streamingData = injectData(data1)
      val wordCountsStream = WordCounter.wordCounter(streamingData)
      val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc)
      val sliceSet = wordCounts.toSet

      wordCounts.toSet shouldBe wordCounts1
    }

    "return count = 1 for the empty string" in {

      val streamingData: InputDStream[String] = injectData(data2)
      val wordCountsStream: DStream[(String, Int)] = WordCounter.wordCounter(streamingData)
      val wordCounts: Seq[(String, Int)] = startStreamAndExtractResult(wordCountsStream, ssc)

      wordCounts.toSet shouldBe wordCounts2
    }
    "return an empty result for an empty list of words" in {

      val streamingData = injectData(data3)
      val wordCountsStream = WordCounter.wordCounter(streamingData)
      val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc)

      wordCounts.toSet shouldBe wordCounts3
    }

  }

  "Running word count with filtering out words with single occurrence" should {
    "produce the correct word counts for a non-empty list of words" in {

      val streamingData = injectData(data1)
      val wordCountsStream = WordCounter.wordCountOverOne(streamingData)
      val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc)

      wordCounts.toSet shouldBe wordCounts1.filter(_._2 > 1)
    }
  }
}

where WordsCountsTestData (added at the end of this message) is a trait that contains the
test data and the correct results. 

The two methods under test in the above test code (WordCounter.wordCounter and WordCounter.wordCountOverOne)
are:

object WordCounter {
  def wordCounter(input: InputDStream[String]): DStream[(String, Int)] = {
    val pairs = input.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts
  }

  def wordCountOverOne(input: InputDStream[String]): DStream[(String, Int)] = {
    val pairs = input.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts filter (_._2 > 1)
  }
}

StreamingContextBaseSpec contains the actual test helper methods such as injectData and startStreamAndExtractResult.

package spark.testing

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.scalatest.{BeforeAndAfter, WordSpec}

import scala.collection.mutable.Queue
import scala.reflect.ClassTag

class StreamingContextBaseSpec(name: String, silenceSpark : Boolean = true) extends WordSpec
with BeforeAndAfter {

  val BatchDuration = 10                  // milliseconds
  val DeltaTBefore  = 20 * BatchDuration
  val DeltaTAfter   = 10 * BatchDuration
  def injectData[T: ClassTag](data: Seq[T]): InputDStream[T] = {
    val dataAsRDD = ssc.sparkContext.parallelize(data)
    val dataAsRDDOnQueue = Queue(dataAsRDD)
    ssc.queueStream(dataAsRDDOnQueue, oneAtATime = false)
  }

  def startStreamAndExtractResult[T: ClassTag](stream: DStream[T], ssc: StreamingContext):
Seq[T] = {
    stream.print()
    println(s"~~~> starting execution context $ssc")
    val sTime = System.currentTimeMillis()
    ssc.start()
    val startWindow = new Time(sTime - DeltaTBefore)
    val endWindow = new Time(sTime + DeltaTAfter)
    val sliceRDDs = stream.slice(startWindow, endWindow)
    sliceRDDs.map(rdd => rdd.collect()).flatMap(data => data.toVector)
  }

  var ssc: StreamingContext = _

  before {
    System.clearProperty("spark.driver.port")
    System.clearProperty("spark.driver.host")
    if ( silenceSpark ) SparkUtil.silenceSpark()
    val conf = new SparkConf().setMaster("local").setAppName(name)
    ssc = new StreamingContext(conf, Milliseconds(BatchDuration))
  }

  after {
    println(s"~~~> stopping execution context $ssc")
    System.clearProperty("spark.driver.port")
    System.clearProperty("spark.driver.host")
    ssc.stop(stopSparkContext = true, stopGracefully = true)
    ssc.awaitTermination()
    ssc = null
  }
}

So far for the prelude, now my questions:
Is this a good way to perform this kind of testing ?
Are there more efficient ways to run this kind of testing ?
To reduce the test run time, I’m running the stream with a batch interval of only 10ms and
a window that extends to 100ms (This seems to work fine as far as I can see. When the batch
interval is reduced further, result data is not picked up unless the window is extended further).
Is this approach OK ?
How can the log level in test mode be reduced (or extended when needed) ?
Cheers, Eric

For the sake of completeness, the test data in trait WordsCountsTestData:

trait WordsCountsTestData {
  val data1: List[String] =
    """Roch writes, "After years of relative silence, I'd like to put back on my blogging
      | hat and update my patient readership about the significant ZFS technological
      | improvements that have integrated since Sun and ZFS became Oracle brands.
      | Since there is so much to cover, I tee up this series of article with a short
      | description of 9 major performance topics that have evolved significantly in
      | the last years. Later, I will describe each topic in more details in individual
      | blog entries. Of course, these selected advancements represents nowhere near an
      | exhaustive list. There has been over 650 changes to the ZFS code in the last 4 years
    """.stripMargin.split("[\\s.,:]+").toList

  val wordCounts1 = Set(
    ("the", 4), ("in", 4),
    ("ZFS", 3), ("years", 3), ("of", 3), ("to", 3),
    ("my", 2), ("last", 2), ("that", 2), ("I", 2), ("and", 2), ("have", 2),
    ("much", 1), ("silence", 1), ("near", 1), ("so", 1), ("update", 1), ("cover", 1),
    ("Later", 1), ("course", 1), ("9", 1), ("significantly", 1), ("over", 1), ("nowhere",
1),
    ("description", 1), ("article", 1), ("650", 1), ("4", 1), ("topics", 1), ("is", 1),
    ("since", 1), ("been", 1), ("with", 1), ("Sun", 1), ("each", 1), ("hat", 1), ("brands",
1),
    ("exhaustive", 1), ("individual", 1), ("like", 1), ("significant", 1), ("blog", 1),
    ("there", 1), ("more", 1), ("readership", 1), ("these", 1), ("back", 1), ("changes", 1),
    ("this", 1), ("code", 1), ("blogging", 1), ("Roch", 1), ("selected", 1), ("became", 1),
    ("up", 1), ("There", 1), ("major", 1), ("represents", 1), ("patient", 1), ("has", 1),
    ("advancements", 1), ("integrated", 1), ("performance", 1), ("Oracle", 1), ("an", 1),
    ("improvements", 1), ("Of", 1), ("details", 1), ("series", 1), ("\"After", 1), ("list",
1),
    ("put", 1), ("writes", 1), ("topic", 1), ("short", 1), ("about", 1), ("evolved", 1),
    ("tee", 1), ("I'd", 1), ("will", 1), ("a", 1), ("on", 1), ("relative", 1), ("Since", 1),
    ("technological", 1), ("describe", 1), ("entries", 1))

  val data2: List[String] =
    """""".stripMargin.split("[\\s.,:]+").toList

  val wordCounts2: Set[(String, Int)] = Set(("",1))

  val data3: List[String] = List.empty[String]

  val wordCounts3: Set[(String, Int)] = Set.empty[(String,Int)]


}

 Eric Loots
 SBI Consulting
M
+32-475-478 190
E
eric.loots@sbiconsulting.be <mailto:eric.loots@sbiconsulting.be>
A
Business Park King Square - Veldkant 33a - 2550 Kontich


Mime
View raw message