spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Sliding Average over Window in Spark Streaming
Date Mon, 09 May 2016 07:06:49 GMT
In general working out minimum or max of say prices (I do not know your use
case) is pretty straight forward.

For example

val maxValue = price.reduceByWindow((x:Double,y:Double) => if(x > y) x else
y,Seconds(windowLength), Seconds(slidingInterval))
maxValue.print()

The average values are running totals. You can possible try

//
lines.foreachRDD { rdd =>
  val test = rdd.map(_.split(',').view(2)).map(_.toDouble)
  val c = test.count
  val s = test.sum
  val r = s/c
 rdd.foreach { record =>
  }
}

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 6 May 2016 at 16:28, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:

> Hi Matthias,
>
> Say with the following
>
> you have
>
> "Batch interval" is the basic interval at which the system with receive
> the data in batches.
> val ssc = new StreamingContext(sparkConf, Seconds(n))
> // window length - The duration of the window below that must be multiple
> of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
> val windowLength = x
> // sliding interval - The interval at which the window operation is
> performed in other words data is collected within this "previous interval x"
> val slidingInterval = y
>
> OK so you want to use something like below to get the average value within
> y interval for a given parameter? The logic may be incorrect below
>
> val countByValueAndWindow = RS.filter(_
> AVG(VALUE)).countByValueAndWindow(Seconds(windowLength),
> Seconds(slidingInterval))
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> 2016-05-06 15:54 GMT+01:00 Matthias Niehoff <
> matthias.niehoff@codecentric.de>:
>
>> Hi,
>>
>> If i want to have a sliding average over the 10 minutes for some keys I
>> can do something like
>> groupBy(window(…),“my-key“).avg(“some-values“) in Spark 2.0
>>
>> I try to implement this sliding average using Spark 1.6.x:
>> I tried with reduceByKeyAndWindow but it did not find a solution. Imo i
>> have to keep all the values in the window to compute the average. One way
>> would be add every new value to a list in the reduce method and then to the
>> avg computation in a separate map, but this seems kind of ugly.
>>
>> Do you have an idea how to solve this?
>>
>> Thanks!
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
>
>

Mime
View raw message