spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shlomi <>
Subject Splitting RDD by partition
Date Thu, 19 May 2016 20:59:01 GMT
Hey Sparkers,

I have a workflow where I have to ensure certain keys are always in the same
RDD partition (its a mandatory algorithmic invariant). I can easily achieve
this by having a custom partitioner. 

This results in a single RDD that requires further computations. However,
currently there are two completely different computations for different
partitions. Some partitions are done by this steps and could already be
written to disk, while the rest still needs a few more
map/filter/shuffle/etc steps to complete 

The simplest idea I have for this is to have some way to partition the RDD
into multiple RDD's based on partition numbers (which I know from my custom

I have managed to achieve this like so (splitting to only 2 RDDs):
// Split an rdd according to its partition number
def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
RDD[T]) = {
  val splits = rdd.mapPartitions { iter =>
    val partId = TaskContext.get.partitionId
    val left     = if (partId <  hotPartitions) iter else empty
    val right   = if (partId >= hotPartitions) iter else empty
    Seq(left, right).iterator

  val left = splits.mapPartitions { iter =>}
  val right = splits.mapPartitions { iter =>
  (left, right)

Is this the best way? This seems to cause some shuffling, however I am not
sure how they impact performance..

Is there another way, maybe even a more involved way, to achieve this? 


View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message