spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mayur Rustagi <mayur.rust...@gmail.com>
Subject Re: Can anyone offer any insight at all?
Date Fri, 07 Mar 2014 19:05:26 GMT
the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski <
ognen@plainvanillagames.com> wrote:

> Strike that. Figured it out. Don't you just hate it when you fire off an
> email and you figure it out as it is being sent? ;)
> Ognen
>
>
> On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:
>
>> What is wrong with this code?
>>
>> A condensed set of this code works in the spark-shell.
>>
>> It does not work when deployed via a jar.
>>
>> def calcSimpleRetention(start:String,end:String,event1:
>> String,event2:String):List[Double] = {
>>     val spd = new PipelineDate(start)
>>     val epd = new PipelineDate(end)
>>     // filter for event1 events and return RDDs that are maps of user_ids
>> and 0
>>     val f = sc.textFile(spd.toJsonHdfsFileName)
>>     val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","")
>> == event1).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),1)).cache
>>     val ev1c = ev1rdd.count.toDouble
>>
>>     // do the same as above for event2 events, only substitute 0s with 1s
>>     val ev2rdds = for {
>>        dt <- PipelineDate.getPeriod(spd+1,epd)
>>        val f1 = sc.textFile(dt.toJsonHdfsFileName)
>>     } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>> event2).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),1)).distinct)
>>
>>     // cache all event1 and event2 RDDs
>>     ev2rdds.foreach(_.cache)
>>     val cts = for {
>>       ev2 <- ev2rdds
>>     } yield ev2.count
>>
>>     val retent = for {
>>       ev2rdd <- ev2rdds
>>       val ret = ev1rdd.union(ev2rdd).groupByKey()
>>     } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)
>>
>>     val rcts = retent.map(_.count)
>> println("----------------------------------------------------------------------")
>>
>>     println(s"${rcts}")
>>     println(s"${cts}")
>>
>>     for {
>>       c <- rcts
>>     } yield(ev1c/c.toDouble)
>>     //Map(result:_*)
>>   }
>>
>> This is what this code prints:
>> List(0, 0)
>> List(785912, 825254)
>> List(Infinity, Infinity)
>>
>> My question is: it does not appear that the
>> union().groupBy().filter(....) segment is working (the List(0,0) output).
>> The app is not failing, it finishes just fine.
>>
>> Any ideas?
>> Ognen
>>
>
> --
> Some people, when confronted with a problem, think "I know, I'll use
> regular expressions." Now they have two problems.
> -- Jamie Zawinski
>
>

Mime
View raw message