the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi



On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski <ognen@plainvanillagames.com> wrote:
Strike that. Figured it out. Don't you just hate it when you fire off an email and you figure it out as it is being sent? ;)
Ognen


On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:
What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] = {
    val spd = new PipelineDate(start)
    val epd = new PipelineDate(end)
    // filter for event1 events and return RDDs that are maps of user_ids and 0
    val f = sc.textFile(spd.toJsonHdfsFileName)
    val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
    val ev1c = ev1rdd.count.toDouble

    // do the same as above for event2 events, only substitute 0s with 1s
    val ev2rdds = for {
       dt <- PipelineDate.getPeriod(spd+1,epd)
       val f1 = sc.textFile(dt.toJsonHdfsFileName)
    } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)

    // cache all event1 and event2 RDDs
    ev2rdds.foreach(_.cache)
    val cts = for {
      ev2 <- ev2rdds
    } yield ev2.count

    val retent = for {
      ev2rdd <- ev2rdds
      val ret = ev1rdd.union(ev2rdd).groupByKey()
    } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)

    val rcts = retent.map(_.count)
println("----------------------------------------------------------------------")
    println(s"${rcts}")
    println(s"${cts}")

    for {
      c <- rcts
    } yield(ev1c/c.toDouble)
    //Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the union().groupBy().filter(....) segment is working (the List(0,0) output). The app is not failing, it finishes just fine.

Any ideas?
Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski