spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sri hari kali charan Tummala <kali.tumm...@gmail.com>
Subject Re: sql to spark scala rdd
Date Tue, 02 Aug 2016 02:25:29 GMT
Hi All,

Below code calculates cumulative sum (running sum) and moving average using
scala RDD type of programming, I was using wrong function which is sliding
use scalleft instead.


sc.textFile("C:\\Users\\kalit_000\\Desktop\\Hadoop_IMP_DOC\\spark\\data.txt")
  .map(x => x.split("\\~"))
  .map(x => (x(0), x(1), x(2).toDouble))
  .groupBy(_._1)
  .mapValues{(x => x.toList.sortBy(_._2).zip(Stream from
1).scanLeft(("","",0.0,0.0,0.0,0.0))
  { (a,b) => (b._1._1,b._1._2,b._1._3,(b._1._3.toDouble +
a._3.toDouble),(b._1._3.toDouble + a._3.toDouble)/b._2,b._2)}.tail)}
  .flatMapValues(x => x.sortBy(_._1))
  .foreach(println)

Input Data:-

Headers:-
Key,Date,balance

786~20160710~234
786~20160709~-128
786~20160711~-457
987~20160812~456
987~20160812~567

Output Data:-

Column Headers:-
key, (key,Date,balance , daily balance, running average , row_number based
on key)

(786,(786,20160709,-128.0,-128.0,-128.0,1.0))
(786,(786,20160710,234.0,106.0,53.0,2.0))
(786,(786,20160711,-457.0,-223.0,-74.33333333333333,3.0))

(987,(987,20160812,567.0,1023.0,511.5,2.0))
(987,(987,20160812,456.0,456.0,456.0,1.0))

Reference:-

https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/


Thanks
Sri


On Mon, Aug 1, 2016 at 12:07 AM, Sri <kali.tummala@gmail.com> wrote:

> Hi ,
>
> I solved it using spark SQL which uses similar window functions mentioned
> below , for my own knowledge I am trying to solve using Scala RDD which I
> am unable to.
> What function in Scala supports window function like SQL unbounded
> preceding and current row ? Is it sliding ?
>
>
> Thanks
> Sri
>
> Sent from my iPhone
>
> On 31 Jul 2016, at 23:16, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
> hi
>
> You mentioned:
>
> I already solved it using DF and spark sql ...
>
> Are you referring to this code which is a classic analytics:
>
> SELECT DATE,balance,
>  SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
>
>  AND
>
>  CURRENT ROW) daily_balance
>
>  FROM  table
>
>
> So how did you solve it using DF in the first place?
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 August 2016 at 07:04, Sri <kali.tummala@gmail.com> wrote:
>
>> Hi ,
>>
>> Just wondering how spark SQL works behind the scenes does it not convert
>> SQL to some Scala RDD ? Or Scala ?
>>
>> How to write below SQL in Scala or Scala RDD
>>
>> SELECT DATE,balance,
>>
>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
>>
>> AND
>>
>> CURRENT ROW) daily_balance
>>
>> FROM  table
>>
>>
>> Thanks
>> Sri
>> Sent from my iPhone
>>
>> On 31 Jul 2016, at 13:21, Jacek Laskowski <jacek@japila.pl> wrote:
>>
>> Hi,
>>
>> Impossible - see
>>
>> http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@sliding(size:Int,step:Int):Iterator[Repr]
>> .
>>
>> I tried to show you why you ended up with "non-empty iterator" after
>> println. You should really start with
>> http://www.scala-lang.org/documentation/
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sun, Jul 31, 2016 at 8:49 PM, sri hari kali charan Tummala
>> <kali.tummala@gmail.com> wrote:
>>
>> Tuple
>>
>>
>> [Lscala.Tuple2;@65e4cb84
>>
>>
>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski <jacek@japila.pl> wrote:
>>
>>
>> Hi,
>>
>>
>> What's the result type of sliding(2,1)?
>>
>>
>> Pozdrawiam,
>>
>> Jacek Laskowski
>>
>> ----
>>
>> https://medium.com/@jaceklaskowski/
>>
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>>
>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>>
>> <kali.tummala@gmail.com> wrote:
>>
>> tried this no luck, wht is non-empty iterator here ?
>>
>>
>> OP:-
>>
>> (-987,non-empty iterator)
>>
>> (-987,non-empty iterator)
>>
>> (-987,non-empty iterator)
>>
>> (-987,non-empty iterator)
>>
>> (-987,non-empty iterator)
>>
>>
>>
>> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>>
>>  .map(x => x._2.split("\\~"))
>>
>>  .map(x => (x(0),x(2)))
>>
>>    .map { case (key,value) =>
>>
>> (key,value.toArray.toSeq.sliding(2,1).map(x
>>
>> => x.sum/x.size))}.foreach(println)
>>
>>
>>
>> On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
>>
>> <kali.tummala@gmail.com> wrote:
>>
>>
>> Hi All,
>>
>>
>> I managed to write using sliding function but can it get key as well in
>>
>> my
>>
>> output ?
>>
>>
>> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>>
>>      .map(x => x._2.split("\\~"))
>>
>>      .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
>>
>> (x,x.size)).foreach(println)
>>
>>
>>
>> at the moment my output:-
>>
>>
>> 75.0
>>
>> -25.0
>>
>> 50.0
>>
>> -50.0
>>
>> -100.0
>>
>>
>> I want with key how to get moving average output based on key ?
>>
>>
>>
>> 987,75.0
>>
>> 987,-25
>>
>> 987,50.0
>>
>>
>> Thanks
>>
>> Sri
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala
>>
>> <kali.tummala@gmail.com> wrote:
>>
>>
>> for knowledge just wondering how to write it up in scala or spark RDD.
>>
>>
>> Thanks
>>
>> Sri
>>
>>
>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski <jacek@japila.pl>
>>
>> wrote:
>>
>>
>> Why?
>>
>>
>> Pozdrawiam,
>>
>> Jacek Laskowski
>>
>> ----
>>
>> https://medium.com/@jaceklaskowski/
>>
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>>
>> On Sat, Jul 30, 2016 at 4:42 AM, Kali.tummala@gmail.com
>>
>> <Kali.tummala@gmail.com> wrote:
>>
>> Hi All,
>>
>>
>> I managed to write business requirement in spark-sql and hive I am
>>
>> still
>>
>> learning scala how this below sql be written using spark RDD not
>>
>> spark
>>
>> data
>>
>> frames.
>>
>>
>> SELECT DATE,balance,
>>
>> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
>>
>> AND
>>
>> CURRENT ROW) daily_balance
>>
>> FROM  table
>>
>>
>>
>>
>>
>>
>> --
>>
>> View this message in context:
>>
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
>>
>> Sent from the Apache Spark User List mailing list archive at
>>
>> Nabble.com <http://nabble.com>.
>>
>>
>>
>> ---------------------------------------------------------------------
>>
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>>
>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sri Tummala
>>
>>
>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sri Tummala
>>
>>
>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sri Tummala
>>
>>
>>
>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sri Tummala
>>
>>
>>
>


-- 
Thanks & Regards
Sri Tummala

Mime
View raw message