spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: What is Seq[V] in updateStateByKey?
Date Fri, 02 May 2014 04:02:54 GMT
Depends on your code. Referring to the earlier example, if you do

words.map(x => (x,1)).updateStateByKey(....)

then for a particular word, if a batch contains 6 occurrences of that word,
then the Seq[V] will be [1, 1, 1, 1, 1, 1]

Instead if you do

words.map(x => (x,1)).reduceByKey(_ + _).updateStateByKey(...)

then Seq[V] will be [ 6 ]  , that is, all the 1s will be summed up already
due to the reduceByKey.

TD



On Thu, May 1, 2014 at 7:29 AM, Adrian Mocanu <amocanu@verticalscope.com>wrote:

> So Seq[V] contains only "new" tuples. I initially thought that whenever a
> new tuple was found, it would add it to Seq and call the update function
> immediately so there wouldn't be more than 1 update to Seq per function
> call.
>
> Say I want to sum tuples with the same key is an RDD using
> updateStateByKey, Then (1) Seq[V] would contain the numbers for a
> particular key and my S state could be the sum?
> Or would (2) Seq contain partial sums (say sum per partition?) which I
> then need to sum into the final sum?
>
> After writing this out and thinking a little more about it I think #2 is
> correct. Can you confirm?
>
> Thanks again!
> -A
>
> -----Original Message-----
> From: Sean Owen [mailto:sowen@cloudera.com]
> Sent: April-30-14 4:30 PM
> To: user@spark.apache.org
> Subject: Re: What is Seq[V] in updateStateByKey?
>
> S is the previous count, if any. Seq[V] are potentially many new counts.
> All of them have to be added together to keep an accurate total.  It's as
> if the count were 3, and I tell you I've just observed 2, 5, and 1
> additional occurrences -- the new count is 3 + (2+5+1) not
> 1 + 1.
>
>
> I butted in since I'd like to ask a different question about the same line
> of code. Why:
>
>       val currentCount = values.foldLeft(0)(_ + _)
>
> instead of
>
>       val currentCount = values.sum
>
> This happens a few places in the code. sum seems equivalent and likely
> quicker. Same with things like "filter(_ == 200).size" instead of "count(_
> == 200)"... pretty trivial but hey.
>
>
> On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu <amocanu@verticalscope.com>
> wrote:
> > Hi TD,
> >
> > Why does the example keep recalculating the count via fold?
> >
> > Wouldn’t it make more sense to get the last count in values Seq and
> > add 1 to it and save that as current count?
> >
> >
> >
> > From what Sean explained I understand that all values in Seq have the
> > same key. Then when a new value for that key is found it is added to
> > this Seq collection and the update function is called.
> >
> >
> >
> > Is my understanding correct?
>

Mime
View raw message