spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)
Date Wed, 21 Jan 2015 14:18:56 GMT
It looks like you are trying to use the RDD in a distributed operation,
which won't work. The context will be null.
On Jan 21, 2015 1:50 PM, "Luis Ángel Vicente Sánchez" <
langel.groups@gmail.com> wrote:

> The SparkContext is lost when I call the persist function from the sink
> function, just before the function call... everything works as intended so
> I guess is the FunctionN class serialisation what it's causing the problem.
> I will try to embed the functionality in the sink method to verify that.
>
> 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez <
> langel.groups@gmail.com>:
>
>> The following functions,
>>
>> def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
>> HLL))]): Unit = {
>>     data.foreachRDD { rdd =>
>>       rdd.cache()
>>       val (minTime, maxTime): (Long, Long) =
>>         rdd.map {
>>           case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time)
>>         }.fold((Long.MaxValue, Long.MinValue)) {
>>           case ((min, max), (num, _)) => (math.min(min, num),
>> math.max(max, num))
>>         }
>>       if (minTime != Long.MaxValue && maxTime != Long.MinValue) {
>>         rdd.map(_._1).distinct().foreach {
>>           case (game, category) => persist(game, category, minTime,
>> maxTime, rdd)
>>         }
>>       }
>>       rdd.unpersist(blocking = false)
>>     }
>>   }
>>
>>   def persist(game: GameID, category: Category, min: Long, max: Long,
>> data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
>> {
>>     val family: String = s"${parameters.table.family}_$
>> {game.repr}_${category.repr}"
>>     val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
>>       data.sparkContext.cassandraTable[(Long, Long, String,
>> Array[Byte])](parameters.table.keyspace, family)
>>     val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
>>       cas
>>         .where(""""time" >= ?""", new Date(min))
>>         .where(""""time" <= ?""", new Date(max))
>>         .map {
>>           case (date, time, platform, array) => ((TimeSeriesKey(date,
>> time), Platform(platform)), HyperLogLog.fromBytes(array))
>>         }
>>     data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
>> {
>>       case ((key, platform), (value, maybe)) =>
>>         (key.date, key.time, platform.repr, HyperLogLog.toBytes(maybe.fold(value)(array
>> => value + array)))
>>     }.saveToCassandra(parameters.table.keyspace, family)
>>   }
>>
>> are causing this exception at runtime:
>>
>> 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0
>> (TID 126)
>> java.lang.NullPointerException
>>         at com.datastax.spark.connector.SparkContextFunctions.
>> cassandraTable$default$3(SparkContextFunctions.scala:47)
>>         at com.mindcandy.services.mako.concurrentusers.
>> ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
>>         at com.mindcandy.services.mako.concurrentusers.
>> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
>> ActiveUsersJobImpl.scala:41)
>>         at com.mindcandy.services.mako.concurrentusers.
>> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
>> ActiveUsersJobImpl.scala:40)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(
>> Iterator.scala:1157)
>>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
>> scala:759)
>>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
>> scala:759)
>>         at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
>> SparkContext.scala:1143)
>>         at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
>> SparkContext.scala:1143)
>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
>> scala:62)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>         at org.apache.spark.executor.Executor$TaskRunner.run(
>> Executor.scala:178)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
>> SparkContextFunctions.scala is the implicit CassandraConnector that uses
>> the underlying spark context to retrieve the SparkConf.
>>
>> After a few hours debugging the code, the source of the problem is that,
>>
>> data.sparkContext
>>
>> is returning null. It seems that the RDD is serialised and the
>> SparkContext is lost. Is this the expected behaviour? Is a known bug?
>>
>> I have ran out of ideas on how to make this work so I'm open to
>> suggestions.
>>
>> Kind regards,
>>
>> Luis
>>
>
>

Mime
View raw message