spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vivek YS <vivek...@gmail.com>
Subject Re: GroupByKey results in OOM - Any other alternative
Date Mon, 16 Jun 2014 01:24:59 GMT
The more fundamental question is why doesn't groupByKey return RDD[(K,
RDD[V])] instead of RDD[(K, Iterable[V])].

I wrote something like this (Yet to test. & I am not sure if this is even
correct) I appreciate any suggestions/comments

  def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = {
    def createCombiner(v: V) = self.context.parallelize(Array(v))
    def mergeValue(buf: RDD[V], v: V) = buf ++
self.context.parallelize(Array(v))
    def mergeCombiners(c1: RDD[V], c2: RDD[V]) = c1 ++ c2
    val bufs = combineByKey[RDD[V]](
      createCombiner _, mergeValue _, mergeCombiners _, partitioner,
mapSideCombine=false)
    bufs
  }

--Vivek


On Mon, Jun 16, 2014 at 6:37 AM, Krishna Sankar <ksankar42@gmail.com> wrote:

> Ian,
>    Yep, HLL is an appropriate mechanism. The countApproxDistinctByKey is
> a wrapper around the
> com.clearspring.analytics.stream.cardinality.HyperLogLogPlus.
> Cheers
> <k/>
>
>
> On Sun, Jun 15, 2014 at 4:50 PM, Ian O'Connell <ian@ianoconnell.com>
> wrote:
>
>> Depending on your requirements when doing hourly metrics calculating
>> distinct cardinality, a much more scalable method would be to use a hyper
>> log log data structure.
>> a scala impl people have used with spark would be
>> https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala
>>
>>
>> On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman <
>> suren.hiraman@velos.io> wrote:
>>
>>> Vivek,
>>>
>>> If the foldByKey solution doesn't work for you, my team uses
>>> RDD.persist(DISK_ONLY) to avoid OOM errors.
>>>
>>> It's slower, of course, and requires tuning other config parameters. It
>>> can also be a problem if you do not have enough disk space, meaning that
>>> you have to unpersist at the right points if you are running long flows.
>>>
>>> For us, even though the disk writes are a performance hit, we prefer the
>>> Spark programming model to Hadoop M/R. But we are still working on getting
>>> this to work end to end on 100s of GB of data on our 16-node cluster.
>>>
>>> Suren
>>>
>>>
>>>
>>> On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS <vivek.ys@gmail.com> wrote:
>>>
>>>> Thanks for the input. I will give foldByKey a shot.
>>>>
>>>> The way I am doing is, data is partitioned hourly. So I am computing
>>>> distinct values hourly. Then I use unionRDD to merge them and compute
>>>> distinct on the overall data.
>>>>
>>>> > Is there a way to know which key,value pair is resulting in the OOM
?
>>>> > Is there a way to set parallelism in the map stage so that, each
>>>> worker will process one key at time. ?
>>>>
>>>> I didn't realise countApproxDistinctByKey is using hyperloglogplus.
>>>> This should be interesting.
>>>>
>>>> --Vivek
>>>>
>>>>
>>>> On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen <sowen@cloudera.com> wrote:
>>>>
>>>>> Grouping by key is always problematic since a key might have a huge
>>>>> number of values. You can do a little better than grouping *all* values
and
>>>>> *then* finding distinct values by using foldByKey, putting values into
a
>>>>> Set. At least you end up with only distinct values in memory. (You don't
>>>>> need two maps either, right?)
>>>>>
>>>>> If the number of distinct values is still huge for some keys, consider
>>>>> the experimental method countApproxDistinctByKey:
>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285
>>>>>
>>>>> This should be much more performant at the cost of some accuracy.
>>>>>
>>>>>
>>>>> On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS <vivek.ys@gmail.com>
wrote:
>>>>>
>>>>>> Hi,
>>>>>>    For last couple of days I have been trying hard to get around
this
>>>>>> problem. Please share any insights on solving this problem.
>>>>>>
>>>>>> Problem :
>>>>>> There is a huge list of (key, value) pairs. I want to transform this
>>>>>> to (key, distinct values) and then eventually to (key, distinct values
>>>>>> count)
>>>>>>
>>>>>> On small dataset
>>>>>>
>>>>>> groupByKey().map( x => (x_1, x._2.distinct)) ...map(x => (x_1,
>>>>>> x._2.distinct.count))
>>>>>>
>>>>>> On large data set I am getting OOM.
>>>>>>
>>>>>> Is there a way to represent Seq of values from groupByKey as RDD
and
>>>>>> then perform distinct over it ?
>>>>>>
>>>>>> Thanks
>>>>>> Vivek
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> SUREN HIRAMAN, VP TECHNOLOGY
>>> Velos
>>> Accelerating Machine Learning
>>>
>>> 440 NINTH AVENUE, 11TH FLOOR
>>> NEW YORK, NY 10001
>>> O: (917) 525-2466 ext. 105
>>> F: 646.349.4063
>>> E: suren.hiraman@v <suren.hiraman@sociocast.com>elos.io
>>> W: www.velos.io
>>>
>>>
>>
>

Mime
View raw message