spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Luis Ángel Vicente Sánchez <langel.gro...@gmail.com>
Subject Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)
Date Wed, 21 Jan 2015 14:31:56 GMT
Yes, I have just found that. By replacing,

rdd.map(_._1).distinct().foreach {
          case (game, category) => persist(game, category, minTime,
maxTime, rdd)
        }

with,

rdd.map(_._1).distinct().collect().foreach {
          case (game, category) => persist(game, category, minTime,
maxTime, rdd)
        }

everything works as expected.

2015-01-21 14:18 GMT+00:00 Sean Owen <sowen@cloudera.com>:

> It looks like you are trying to use the RDD in a distributed operation,
> which won't work. The context will be null.
> On Jan 21, 2015 1:50 PM, "Luis Ángel Vicente Sánchez" <
> langel.groups@gmail.com> wrote:
>
>> The SparkContext is lost when I call the persist function from the sink
>> function, just before the function call... everything works as intended so
>> I guess is the FunctionN class serialisation what it's causing the problem.
>> I will try to embed the functionality in the sink method to verify that.
>>
>> 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez <
>> langel.groups@gmail.com>:
>>
>>> The following functions,
>>>
>>> def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
>>> HLL))]): Unit = {
>>>     data.foreachRDD { rdd =>
>>>       rdd.cache()
>>>       val (minTime, maxTime): (Long, Long) =
>>>         rdd.map {
>>>           case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time)
>>>         }.fold((Long.MaxValue, Long.MinValue)) {
>>>           case ((min, max), (num, _)) => (math.min(min, num),
>>> math.max(max, num))
>>>         }
>>>       if (minTime != Long.MaxValue && maxTime != Long.MinValue) {
>>>         rdd.map(_._1).distinct().foreach {
>>>           case (game, category) => persist(game, category, minTime,
>>> maxTime, rdd)
>>>         }
>>>       }
>>>       rdd.unpersist(blocking = false)
>>>     }
>>>   }
>>>
>>>   def persist(game: GameID, category: Category, min: Long, max: Long,
>>> data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
>>> {
>>>     val family: String = s"${parameters.table.family}_$
>>> {game.repr}_${category.repr}"
>>>     val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
>>>       data.sparkContext.cassandraTable[(Long, Long, String,
>>> Array[Byte])](parameters.table.keyspace, family)
>>>     val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
>>>       cas
>>>         .where(""""time" >= ?""", new Date(min))
>>>         .where(""""time" <= ?""", new Date(max))
>>>         .map {
>>>           case (date, time, platform, array) => ((TimeSeriesKey(date,
>>> time), Platform(platform)), HyperLogLog.fromBytes(array))
>>>         }
>>>     data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
>>> {
>>>       case ((key, platform), (value, maybe)) =>
>>>         (key.date, key.time, platform.repr, HyperLogLog.toBytes(maybe.fold(value)(array
>>> => value + array)))
>>>     }.saveToCassandra(parameters.table.keyspace, family)
>>>   }
>>>
>>> are causing this exception at runtime:
>>>
>>> 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0
>>> (TID 126)
>>> java.lang.NullPointerException
>>>         at com.datastax.spark.connector.SparkContextFunctions.
>>> cassandraTable$default$3(SparkContextFunctions.scala:47)
>>>         at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
>>>         at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
>>> ActiveUsersJobImpl.scala:41)
>>>         at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
>>> ActiveUsersJobImpl.scala:40)
>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>         at scala.collection.AbstractIterator.foreach(
>>> Iterator.scala:1157)
>>>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
>>> scala:759)
>>>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
>>> scala:759)
>>>         at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
>>> SparkContext.scala:1143)
>>>         at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
>>> SparkContext.scala:1143)
>>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
>>> scala:62)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>         at org.apache.spark.executor.Executor$TaskRunner.run(
>>> Executor.scala:178)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1145)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
>>> SparkContextFunctions.scala is the implicit CassandraConnector that uses
>>> the underlying spark context to retrieve the SparkConf.
>>>
>>> After a few hours debugging the code, the source of the problem is that,
>>>
>>> data.sparkContext
>>>
>>> is returning null. It seems that the RDD is serialised and the
>>> SparkContext is lost. Is this the expected behaviour? Is a known bug?
>>>
>>> I have ran out of ideas on how to make this work so I'm open to
>>> suggestions.
>>>
>>> Kind regards,
>>>
>>> Luis
>>>
>>
>>

Mime
View raw message