spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Young, Matthew T" <matthew.t.yo...@intel.com>
Subject Very slow performance on very small record counts
Date Fri, 30 Oct 2015 22:38:13 GMT
In a job I am writing I have encountered impossibly poor performance with Spark Streaming 1.5.1.
The environment is three 16 core/32 GB RAM VMs

The job involves parsing 600 bytes or so of JSON (per record) from Kafka, extracting two values
from the JSON, doing some aggregation and averages, and writing a handful of summary results
back to Kafka each two-second batch.

The issue is that Spark seems to be hitting a hard minimum of 4 seconds to process each batch,
even a batch with as few as 192 records in it!

When I check the Web UI for such a batch I see a proper distribution of offsets (each worker
gets < 10 records) and four jobs for the batch. Three of the jobs are completed very quickly
(as I would expect), but one job essentially dominates the 4 seconds. This WebUI screenshot
is presented in the attachment "Spark Idle Time 2.png".

When I drill down into that job and look at the event timeline I see very odd behavior. The
duration for the longest task is only ~0.3 s, and there is nice parallelism. What seems to
be happening is right at the start of the job there is a few milliseconds of deserialization,
followed by almost 4s(!) of doing absolutely nothing, followed by a few hundred milliseconds
where the actual processing is taking place. This WebUI screenshot is presented in the attachment
"Spark Idle Time.png"

What can cause this delay where Spark does nothing (or reports doing nothing) for so much
time? I have included the code corresponding to the foreachRDD that is triggering this 4 second
job below.

Thank you for your time,

-- Matthew


    // Transmit Kafka config to all workers so they can write back as necessary
    val broadcastBrokers = ssc.sparkContext.broadcast(brokerList)
    val broadcastZookeeper = ssc.sparkContext.broadcast(zookeeper)

    // Define the task for Spark Streaming to do
    messages.foreachRDD(sourceStream => {

      // Parse the JSON into a more usable structure
      val parsed = sourceStream.map(y => parse(y._2))

      val GeosRUSECs = parsed.map {
        x => ((x \ "geoip").extract[String](DefaultFormats, manifest[String]), ((x \ "rusec").extract[Long](DefaultFormats,
manifest[Long]), 1L))
      }.cache
      if (!GeosRUSECs.isEmpty) {
        val totalRUSEC = GeosRUSECs.map(x => (x._2._1)).reduce(_ + _)
        val avgRUSEC = totalRUSEC / GeosRUSECs.count.toDouble

        if (!avgRUSEC.isNaN && !avgRUSEC.isInfinite) {
          // Acquire local Kafka connection
          val producer = kafkaWriter.getProducer(broadcastBrokers.value, broadcastZookeeper.value)
          producer.send(new KeyedMessage[String, String]("SparkRUSECout", avgRUSEC.toString))
        }

        // Wait times for each geo with total wait and number of queries
        val GeosWaitsCounts = GeosRUSECs.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

        val avgRespPerGeo = GeosWaitsCounts.map { case (geo, (totWait, numQueries)) =>
(geo, totWait.toDouble / numQueries) }

        avgRespPerGeo.foreach { geoInfo =>
          val producer = kafkaWriter.getProducer(broadcastBrokers.value, broadcastZookeeper.value)
          producer.send(new KeyedMessage[String, String]("SparkRUSECout", geoInfo._1 + " average
RUSEC: " + geoInfo._2))
        }
      }
    })


Mime
View raw message