spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ognen Duzlevski <og...@plainvanillagames.com>
Subject Re: Can anyone offer any insight at all?
Date Fri, 07 Mar 2014 19:08:51 GMT
No.

It was a logical error.

val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == 
event1).map(line => 
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache should have 
mapped to ,0, not ,1

I have had the most awful time figuring out these "looped" things. It 
seems like it is next to impossible to run a .filter() operation in a 
for loop, it seems to work if you yield .filter()

Still don't understand why that is...

Ognen

On 3/7/14, 1:05 PM, Mayur Rustagi wrote:
> the issue was with print?
> printing on worker?
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski 
> <ognen@plainvanillagames.com <mailto:ognen@plainvanillagames.com>> wrote:
>
>     Strike that. Figured it out. Don't you just hate it when you fire
>     off an email and you figure it out as it is being sent? ;)
>     Ognen
>
>
>     On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:
>
>         What is wrong with this code?
>
>         A condensed set of this code works in the spark-shell.
>
>         It does not work when deployed via a jar.
>
>         def
>         calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double]
>         = {
>             val spd = new PipelineDate(start)
>             val epd = new PipelineDate(end)
>             // filter for event1 events and return RDDs that are maps
>         of user_ids and 0
>             val f = sc.textFile(spd.toJsonHdfsFileName)
>             val ev1rdd =
>         f.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>         event1).map(line =>
>         (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
>             val ev1c = ev1rdd.count.toDouble
>
>             // do the same as above for event2 events, only substitute
>         0s with 1s
>             val ev2rdds = for {
>                dt <- PipelineDate.getPeriod(spd+1,epd)
>                val f1 = sc.textFile(dt.toJsonHdfsFileName)
>             } yield
>         (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>         event2).map(line =>
>         (line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)
>
>             // cache all event1 and event2 RDDs
>             ev2rdds.foreach(_.cache)
>             val cts = for {
>               ev2 <- ev2rdds
>             } yield ev2.count
>
>             val retent = for {
>               ev2rdd <- ev2rdds
>               val ret = ev1rdd.union(ev2rdd).groupByKey()
>             } yield ret.filter(e => e._2.length > 1 &&
>         e._2.filter(_==0).length>0)
>
>             val rcts = retent.map(_.count)
>         println("----------------------------------------------------------------------")
>
>             println(s"${rcts}")
>             println(s"${cts}")
>
>             for {
>               c <- rcts
>             } yield(ev1c/c.toDouble)
>             //Map(result:_*)
>           }
>
>         This is what this code prints:
>         List(0, 0)
>         List(785912, 825254)
>         List(Infinity, Infinity)
>
>         My question is: it does not appear that the
>         union().groupBy().filter(....) segment is working (the
>         List(0,0) output). The app is not failing, it finishes just fine.
>
>         Any ideas?
>         Ognen
>
>
>     -- 
>     Some people, when confronted with a problem, think "I know, I'll
>     use regular expressions." Now they have two problems.
>     -- Jamie Zawinski
>
>

-- 
Some people, when confronted with a problem, think "I know, I'll use regular expressions."
Now they have two problems.
-- Jamie Zawinski


Mime
View raw message