spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yong Zhang <java8...@hotmail.com>
Subject Re: Secondary Sort using Apache Spark 1.6
Date Wed, 29 Mar 2017 16:07:37 GMT
The error message indeed is not very clear.


What you did wrong is that the repartitionAndSortWithinPartitions not only requires PairRDD,
but also OrderedRDD. Your case class as key is NOT Ordered.


Either you extends it from Ordered, or provide a companion object to do the implicit Ordering.


scala> spark.version
res1: String = 2.1.0

scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long)
extends Ordered[DeviceKey] {
     |   import scala.math.Ordered.orderingToOrdered
     |   def compare(that: DeviceKey): Int =
     |      (this.serialNum, this.eventDate, this.EventTs * -1) compare
     |      (that.serialNum, that.eventDate, that.EventTs * -1)
     | }
defined class DeviceKey

scala>

scala> val t = sc.parallelize(List(((DeviceKey("2","100",1),1)),
(DeviceKey("2","100",3),1)), 1)
t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at parallelize at
<console>:26

scala>

scala> class DeviceKeyPartitioner(partitions: Int) extends org.apache.spark.Partitioner
{
     |     require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
     |
     |     override def numPartitions: Int = partitions
     |
     |     override def getPartition(key: Any): Int = {
     |       val k = key.asInstanceOf[DeviceKey]
     |       k.serialNum.hashCode() % numPartitions
     |     }
     | }
defined class DeviceKeyPartitioner

scala>

scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))
res0: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at repartitionAndSortWithinPartitions
at <console>:30


Yong


________________________________
From: Pariksheet Barapatre <pbarapatre@gmail.com>
Sent: Wednesday, March 29, 2017 9:02 AM
To: user
Subject: Secondary Sort using Apache Spark 1.6

Hi,<http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6#>

I am referring web link http://codingjunkie.net/spark-secondary-sort/ to implement secondary
sort in my spark job.

I have defined my key case class as

case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
      implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
       Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
    }
}


but when I try to apply function
t.repartitionAndSortWithinPartitions(partitioner)


#t is a RDD[(DeviceKey, Int)]


I get error
I am getting error as -
value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[(DeviceKey,
Int)]



Example code available at
http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6


Could somebody help me to understand error.


Many Thanks

Pari

--
Cheers,
Pari

Mime
View raw message