spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Madabhattula Rajesh Kumar <mrajaf...@gmail.com>
Subject combineByKey
Date Fri, 05 Apr 2019 07:11:18 GMT
Hi,

Any issue in the below code.

case class Messages(timeStamp: Int, Id: String, value: String)

val messages = Array(
      Messages(0, "d1", "t1"),
      Messages(0, "d1", "t1"),
      Messages(0, "d1", "t1"),
      Messages(0, "d1", "t1"),
      Messages(0, "d1", "t2"),
      Messages(0, "d1", "t2"),
      Messages(0, "d1", "t3"),
      Messages(0, "d1", "t4"),
      Messages(0, "d2", "t1"),
      Messages(0, "d2", "t1"),
      Messages(0, "d2", "t5"),
      Messages(0, "d2", "t6"),
      Messages(0, "d2", "t2"),
      Messages(0, "d2", "t2"),
      Messages(0, "d3", "t1"),
      Messages(0, "d3", "t1"),
      Messages(0, "d3", "t2")
    )

//Defining createCombiner, mergeValue and mergeCombiner functions
def createCombiner = (id: String, value: String) => Set(value)

def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
String)) => accumulator1 ++ Set(accumulator2._2)

def mergeCombiner: (Set[String], Set[String]) => Set[String] =
(accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
accumulator2

sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*Compile Error:-*
 found   : (String, String) => scala.collection.immutable.Set[String]
 required: ((String, String)) => ?
sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

Regards,
Rajesh

Mime
View raw message