spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <>
Subject Re: spark challenge: zip with next???
Date Fri, 30 Jan 2015 17:03:25 GMT
yeah i meant foldLeft by key, sorted by date.
it is non-commutative because i care about the order of processing the
values (chronological). i dont see how i can do it with a reduce
efficiently, but i would be curious to hear otherwise. i might be biased
since this is such a typical operation in map-reduce.

so basically assuming its logs of servers being RDD[(String, Long)] where
String is the server name and Long is the timestamp, you keep a state that
contains the last observed timestamp (if any) and the list of found gaps.
so state type would be (Option[Long], List[Long]). as you process items in
the timeseries chronologically you always update the last observed
timestamp and possible add to the list of found gaps.

foldLeftByKey on RDD[(K, V)] looks something like this:
def foldLeftByKey(state: X)(update: (X, V) => X)(implicit ord:
Ordering[V]): RDD[(K, X)]

and the logic would be (just made this up, didnt test or compile):

rdd.foldLeftByKey((None: Option[Long]), List.empty[Long])){
  case ((Some(prev), gaps), curr) if (curr - prev > thres) => (Some(curr),
curr :: gaps) // gap found
  case ((_, gaps, curr) => ((Some(curr), gaps) // no gap found

the sort required within timeseries would be done efficiently by spark in
the shuffle (assuming sort-based shuffle is enabled). the foldLeftByKey
would never require the entire timeseries per key to be in memory. however
every timeseries would be processed by a single task, so it might take a
while if the timeseries is very large.

On Fri, Jan 30, 2015 at 11:05 AM, Michael Malak <>

> But isn't foldLeft() overkill for the originally stated use case of max
> diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative
> non-associative accumulation as opposed to an embarrassingly parallel
> operation such as this one?
> This use case reminds me of FIR filtering in DSP. It seems that RDDs could
> use something that serves the same purpose as
> scala.collection.Iterator.sliding.
>   ------------------------------
>  *From:* Koert Kuipers <>
> *To:* Mohit Jaggi <>
> *Cc:* Tobias Pfeiffer <>; "Ganelin, Ilya" <
>>; derrickburns <>; "
>" <>
> *Sent:* Friday, January 30, 2015 7:11 AM
> *Subject:* Re: spark challenge: zip with next???
> assuming the data can be partitioned then you have many timeseries for
> which you want to detect potential gaps. also assuming the resulting gaps
> info per timeseries is much smaller data then the timeseries data itself,
> then this is a classical example to me of a sorted (streaming) foldLeft,
> requiring an efficient secondary sort in the spark shuffle. i am trying to
> get that into spark here:
> On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi <>
> wrote:
> you can use the MLLib function or do the following (which is what I had
> done):
> - in first pass over the data, using mapPartitionWithIndex, gather the
> first item in each partition. you can use collect (or aggregator) for this.
> “key” them by the partition index. at the end, you will have a map
>    (partition index) --> first item
> - in the second pass over the data, using mapPartitionWithIndex again,
> look at two (or in the general case N items at a time, you can use scala’s
> sliding iterator) items at a time and check the time difference(or any
> sliding window computation). To this mapParitition, pass the map created in
> previous step. You will need to use them to check the last item in this
> partition.
> If you can tolerate a few inaccuracies then you can just do the second
> step. You will miss the “boundaries” of the partitions but it might be
> acceptable for your use case.
> On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer <> wrote:
> Hi,
> On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya <
>> wrote:
>  Make a copy of your RDD with an extra entry in the beginning to offset.
> The you can zip the two RDDs and run a map to generate an RDD of
> differences.
> Does that work? I recently tried something to compute differences between
> each entry and the next, so I did
>   val rdd1 = ... // null element + rdd
>   val rdd2 = ... // rdd + null element
> but got an error message about zip requiring data sizes in each partition
> to match.
> Tobias

View raw message