spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christophe Préaud <christophe.pre...@kelkoo.com>
Subject Re: Partition n keys into exacly n partitions
Date Tue, 13 Sep 2016 07:48:41 GMT
Hi,

A custom partitioner is indeed the solution.

Here is a sample code:
import org.apache.spark.Partitioner

class KeyPartitioner(keyList: Seq[Any]) extends Partitioner {

  def numPartitions: Int = keyList.size + 1

  def getPartition(key: Any): Int = keyList.indexOf(key) + 1

  override def equals(other: Any): Boolean = other match {
    case h: KeyPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}


It allows to repartition a RDD[(K, V)] so that all lines with the same
key value (and only those lines) will be on the same partition.

You need to pass as parameter to the constructor a Seq[K] keyList containing all the possible
values for the keys in the RDD[(K, V)], e.g.:
val rdd = sc.parallelize(
  Seq((1,'a),(2,'a),(3,'a),(1,'b),(2,'b),(1,'c),(3,'c),(4,'d))
)
rdd.partitionBy(new KeyPartitioner(Seq(1,2,3,4)))

will put:
- (1,'a) (1,'b) and (1,'c) in partition 1
- (2,'a) and (2,'b) in partition 2
- (3,'a) and (3,'c) in partition 3
- (4,'d) in partition 4
and nothing in partition 0

If a key is not defined in the keyList, it will be put in partition 0:
rdd.partitionBy(new KeyPartitioner(Seq(1,2,3)))
will put:
- (1,'a) (1,'b) and (1,'c) in partition 1
- (2,'a) and (2,'b) in partition 2
- (3,'a) and (3,'c) in partition 3
- (4,'d) in partition 0


Please let me know if it fits your needs.

Regards,
Christophe.

On 12/09/16 19:03, Denis Bolshakov wrote:
Just provide own partitioner.

One I wrote a partitioner which keeps similar keys together in one  partitioner.

Best regards,
Denis

On 12 September 2016 at 19:44, sujeet jog <sujeet.jog@gmail.com<mailto:sujeet.jog@gmail.com>>
wrote:
Hi,

Is there a way to partition set of data with n keys into exactly n partitions.

For ex : -

tuple of 1008 rows with key as x
tuple of 1008 rows with key as y   and so on  total 10 keys ( x, y etc )

Total records = 10080
NumOfKeys = 10

i want to partition the 10080 elements into exactly 10 partitions with each partition having
elements with unique key

Is there a way to make this happen ?.. any ideas on implementing custom partitioner.


The current partitioner i'm using is HashPartitioner from which there are cases where key.hascode()
% numPartitions  for keys of x & y become same.

 hence many elements with different keys fall into single partition at times.



Thanks,
Sujeet



--
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.denis@gmail.com<mailto:bolshakov.denis@gmail.com>


________________________________
Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 158 Ter Rue du Temple 75003 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive
de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire
et d'en avertir l'expéditeur.

Mime
View raw message