spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Mocanu <amoc...@verticalscope.com>
Subject RE: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2
Date Wed, 12 Nov 2014 22:13:45 GMT
You are correct; the filtering I’m talking about is done implicitly. You don’t have to
do it yourself. Spark will do it for you and remove those entries from the state collection.

From: Yana Kadiyska [mailto:yana.kadiyska@gmail.com]
Sent: November-12-14 3:50 PM
To: Adrian Mocanu
Cc: spr; user@spark.incubator.apache.org
Subject: Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when
Key is a Tuple2

Adrian, do you know if this is documented somewhere? I was also under the impression that
setting a key's value to None would cause the key to be discarded (without any explicit filtering
on the user's part) but can not find any official documentation to that effect

On Wed, Nov 12, 2014 at 2:43 PM, Adrian Mocanu <amocanu@verticalscope.com<mailto:amocanu@verticalscope.com>>
wrote:
My understanding is that the reason you have an Option is so you could filter out tuples when
None is returned. This way your state data won't grow forever.

-----Original Message-----
From: spr [mailto:spr@yarcdata.com<mailto:spr@yarcdata.com>]
Sent: November-12-14 2:25 PM
To: user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>
Subject: Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when
Key is a Tuple2

After comparing with previous code, I got it work by making the return a Some instead of Tuple2.
 Perhaps some day I will understand this.


spr wrote
> ------code--------
>
>     val updateDnsCount = (values: Seq[(Int, Time)], state:
> Option[(Int,
> Time)]) => {
>       val currentCount = if (values.isEmpty) 0 else values.map( x =>
> x._1).sum
>       val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else
> values.map( x => x._2).min
>
>       val (previousCount, minTime) = state.getOrElse((0,
> Time(System.currentTimeMillis)))
>
>       //  (currentCount + previousCount, Seq(minTime, newMinTime).min)
> <== old
>       Some(currentCount + previousCount, Seq(minTime, newMinTime).min)
> // <== new
>     }





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<mailto:user-help@spark.apache.org>


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<mailto:user-help@spark.apache.org>

Mime
View raw message