spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mtimper <>
Subject RDD Blocks skewing to just few executors
Date Tue, 18 Nov 2014 00:40:31 GMT
Hi I'm running a standalone cluster with 8 worker servers. 
I'm developing a streaming app that is adding new lines of text to several
different RDDs each batch interval. Each line has a well randomized unique
identifier that I'm trying to use for partitioning, since the data stream
does contain duplicates lines. I'm doing partitioning with this:

val eventsByKey = { event => (getUID(event), event)}
val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq)
       .partitionBy(new HashPartitioner(numPartions)).map(e => e._2)

I'm adding to the existing RDD like with this:

val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) {
    (currentIter,batchIter) => 
    val uniqEvents = ListBuffer[String]()
    val uids = Map[String,Boolean]()
    Array(currentIter, batchIter).foreach { iter => 
      iter.foreach { event =>
        val uid = getUID(event)
        if (!uids.contains(uid)) {
            uids(uid) = true
            uniqEvents +=event

val count = mergedRDD.count

The reason I'm doing it this way is that when I was doing:

val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct
val count = mergedRDD.count

It would start taking a long time and a lot of shuffles.

The zipPartitions approach does perform better, though after running an hour
or so I start seeing this 
in the webUI.


As you can see most of the data is skewing to just 2 executors, with 1
getting more than half the Blocks. These become a hotspot and eventually I
start seeing OOM errors. I've tried this a half a dozen times and the 'hot'
executors changes, but not the skewing behavior.

Any idea what is going on here?



View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message