the issue was with print?
printing on worker?

Strike that. Figured it out. Don't you just hate it when you fire off an email and you figure it out as it is being sent? ;)

On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:
What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] = {
    val spd = new PipelineDate(start)
    val epd = new PipelineDate(end)
    // filter for event1 events and return RDDs that are maps of user_ids and 0
    val f = sc.textFile(spd.toJsonHdfsFileName)
    val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
    val ev1c = ev1rdd.count.toDouble

    // do the same as above for event2 events, only substitute 0s with 1s
    val ev2rdds = for {
       dt <- PipelineDate.getPeriod(spd+1,epd)
       val f1 = sc.textFile(dt.toJsonHdfsFileName)
    } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)

    // cache all event1 and event2 RDDs
    val cts = for {
      ev2 <- ev2rdds
    } yield ev2.count

    val retent = for {
      ev2rdd <- ev2rdds
      val ret = ev1rdd.union(ev2rdd).groupByKey()
    } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)

    val rcts =

    for {
      c <- rcts
    } yield(ev1c/c.toDouble)

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the union().groupBy().filter(....) segment is working (the List(0,0) output). The app is not failing, it finishes just fine.

Any ideas?

