Depending on your requirements when doing hourly metrics calculating distinct cardinality, a much more scalable method would be to use a hyper log log data structure.
a scala impl people have used with spark would be https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala


On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman <suren.hiraman@velos.io> wrote:
Vivek,

If the foldByKey solution doesn't work for you, my team uses RDD.persist(DISK_ONLY) to avoid OOM errors.

It's slower, of course, and requires tuning other config parameters. It can also be a problem if you do not have enough disk space, meaning that you have to unpersist at the right points if you are running long flows.

For us, even though the disk writes are a performance hit, we prefer the Spark programming model to Hadoop M/R. But we are still working on getting this to work end to end on 100s of GB of data on our 16-node cluster.

Suren



On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS <vivek.ys@gmail.com> wrote:
Thanks for the input. I will give foldByKey a shot. 
 
The way I am doing is, data is partitioned hourly. So I am computing distinct values hourly. Then I use unionRDD to merge them and compute distinct on the overall data. 

> Is there a way to know which key,value pair is resulting in the OOM ?
> Is there a way to set parallelism in the map stage so that, each worker will process one key at time. ?

I didn't realise countApproxDistinctByKey is using hyperloglogplus. This should be interesting.

--Vivek 


On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen <sowen@cloudera.com> wrote:
Grouping by key is always problematic since a key might have a huge number of values. You can do a little better than grouping *all* values and *then* finding distinct values by using foldByKey, putting values into a Set. At least you end up with only distinct values in memory. (You don't need two maps either, right?)

If the number of distinct values is still huge for some keys, consider the experimental method countApproxDistinctByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285

This should be much more performant at the cost of some accuracy.


On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS <vivek.ys@gmail.com> wrote:
Hi,
   For last couple of days I have been trying hard to get around this problem. Please share any insights on solving this problem.

Problem :
There is a huge list of (key, value) pairs. I want to transform this to (key, distinct values) and then eventually to (key, distinct values count) 

On small dataset 

groupByKey().map( x => (x_1, x._2.distinct)) ...map(x => (x_1, x._2.distinct.count))

On large data set I am getting OOM. 

Is there a way to represent Seq of values from groupByKey as RDD and then perform distinct over it ?

Thanks
Vivek





--
                                                            
SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105