It looks like you are trying to use the RDD in a distributed operation, which won't work. The context will be null.

On Jan 21, 2015 1:50 PM, "Luis Ángel Vicente Sánchez" <langel.groups@gmail.com> wrote:
The SparkContext is lost when I call the persist function from the sink function, just before the function call... everything works as intended so I guess is the FunctionN class serialisation what it's causing the problem. I will try to embed the functionality in the sink method to verify that.

2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez <langel.groups@gmail.com>:
The following functions, 

def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { 
    data.foreachRDD { rdd => 
      rdd.cache() 
      val (minTime, maxTime): (Long, Long) = 
        rdd.map { 
          case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time) 
        }.fold((Long.MaxValue, Long.MinValue)) { 
          case ((min, max), (num, _)) => (math.min(min, num), math.max(max, num)) 
        } 
      if (minTime != Long.MaxValue && maxTime != Long.MinValue) { 
        rdd.map(_._1).distinct().foreach { 
          case (game, category) => persist(game, category, minTime, maxTime, rdd) 
        } 
      } 
      rdd.unpersist(blocking = false) 
    } 
  } 

  def persist(game: GameID, category: Category, min: Long, max: Long, data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = { 
    val family: String = s"${parameters.table.family}_${game.repr}_${category.repr}" 
    val cas: CassandraRDD[(Long, Long, String, Array[Byte])] = 
      data.sparkContext.cassandraTable[(Long, Long, String, Array[Byte])](parameters.table.keyspace, family) 
    val fil: RDD[((TimeSeriesKey, Platform), HLL)] = 
      cas 
        .where(""""time" >= ?""", new Date(min)) 
        .where(""""time" <= ?""", new Date(max)) 
        .map { 
          case (date, time, platform, array) => ((TimeSeriesKey(date, time), Platform(platform)), HyperLogLog.fromBytes(array)) 
        } 
    data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map { 
      case ((key, platform), (value, maybe)) => 
        (key.date, key.time, platform.repr, HyperLogLog.toBytes(maybe.fold(value)(array => value + array))) 
    }.saveToCassandra(parameters.table.keyspace, family) 
  } 

are causing this exception at runtime: 

15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID 126) 
java.lang.NullPointerException 
        at com.datastax.spark.connector.SparkContextFunctions.cassandraTable$default$3(SparkContextFunctions.scala:47) 
        at com.mindcandy.services.mako.concurrentusers.ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51) 
        at com.mindcandy.services.mako.concurrentusers.ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(ActiveUsersJobImpl.scala:41) 
        at com.mindcandy.services.mako.concurrentusers.ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(ActiveUsersJobImpl.scala:40) 
        at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759) 
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759) 
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) 
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) 
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
        at org.apache.spark.scheduler.Task.run(Task.scala:54) 
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
        at java.lang.Thread.run(Thread.java:745) 

I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of SparkContextFunctions.scala is the implicit CassandraConnector that uses the underlying spark context to retrieve the SparkConf.

After a few hours debugging the code, the source of the problem is that,

data.sparkContext

is returning null. It seems that the RDD is serialised and the SparkContext is lost. Is this the expected behaviour? Is a known bug?

I have ran out of ideas on how to make this work so I'm open to suggestions. 

Kind regards, 

Luis