spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Nerothin <jasonnerot...@gmail.com>
Subject Re: combineByKey
Date Fri, 05 Apr 2019 17:30:28 GMT
Take a look at this SOF:

https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

On Fri, Apr 5, 2019 at 12:25 PM Madabhattula Rajesh Kumar <
mrajaforu@gmail.com> wrote:

> Hi,
>
> Thank you for the details. It is a typo error while composing the mail.
> Below is the actual flow.
>
> Any idea, why the combineByKey is not working. aggregateByKey is working.
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
>
> def createCombiner = (Id: String, value: String) => (value :: Nil).toSet
>
> def mergeValue = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
>      sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *aggregateByKey =>*
>
> val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
> (x.Id, x.value))).aggregateByKey(Set[String]())(
>         (aggr, value) => aggr ++ Set(value._2),
>         (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap
>
>  print(result)
>
> Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
> Set(t1, t2))
>
> Regards,
> Rajesh
>
> On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin <jasonnerothin@gmail.com>
> wrote:
>
>> I broke some of your code down into the following lines:
>>
>>     import spark.implicits._
>>
>>     val a: RDD[Messages]= sc.parallelize(messages)
>>     val b: Dataset[Messages] = a.toDF.as[Messages]
>>     val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp
>> + "-" + x.Id, (x.Id, x.value))}
>>
>> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
>> have the types you think for the reduceByKey.
>>
>> I recommend breaking the code down like this to statement-by-statement
>> when you get into a dance with the Scala type system.
>>
>> The type-safety that you're after (that eventually makes life *easier*)
>> is best supported by Dataset (would have prevented the .id vs .Id error).
>> Although there are some performance tradeoffs vs RDD and DataFrame...
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
>> mrajaforu@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Any issue in the below code.
>>>
>>> case class Messages(timeStamp: Int, Id: String, value: String)
>>>
>>> val messages = Array(
>>>       Messages(0, "d1", "t1"),
>>>       Messages(0, "d1", "t1"),
>>>       Messages(0, "d1", "t1"),
>>>       Messages(0, "d1", "t1"),
>>>       Messages(0, "d1", "t2"),
>>>       Messages(0, "d1", "t2"),
>>>       Messages(0, "d1", "t3"),
>>>       Messages(0, "d1", "t4"),
>>>       Messages(0, "d2", "t1"),
>>>       Messages(0, "d2", "t1"),
>>>       Messages(0, "d2", "t5"),
>>>       Messages(0, "d2", "t6"),
>>>       Messages(0, "d2", "t2"),
>>>       Messages(0, "d2", "t2"),
>>>       Messages(0, "d3", "t1"),
>>>       Messages(0, "d3", "t1"),
>>>       Messages(0, "d3", "t2")
>>>     )
>>>
>>> //Defining createCombiner, mergeValue and mergeCombiner functions
>>> def createCombiner = (id: String, value: String) => Set(value)
>>>
>>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>>> String)) => accumulator1 ++ Set(accumulator2._2)
>>>
>>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>>> accumulator2
>>>
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> *Compile Error:-*
>>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>>  required: ((String, String)) => ?
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> Regards,
>>> Rajesh
>>>
>>>
>>
>> --
>> Thanks,
>> Jason
>>
>

-- 
Thanks,
Jason

Mime
View raw message