spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arijit <Arij...@live.com>
Subject Issue with Spark Streaming with checkpointing in Spark 2.0
Date Sat, 08 Oct 2016 00:06:13 GMT
In a Spark Streaming sample code I am trying to implicitly convert an RDD to DS and save to
permanent storage. Below is the snippet of the code I am trying to run. The job runs fine
first time when started with the checkpoint directory empty. However, if I kill and restart
the job with the same checkpoint directory I get the following error resulting in job failure:


16/10/07 23:42:50 ERROR JobScheduler: Error running job streaming job 1475883550000 ms.0
java.lang.NullPointerException
 at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163)
 at com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72)
 at com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72)
 at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
 at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
 at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
 at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
 at scala.util.Try$.apply(Try.scala:192)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
 at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
 at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
 at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
16/10/07 23:42:50 INFO SparkContext: Starting job: print at EventhubsToAzureBlobAsJSON.scala:93


Does anyone have any sample recoverable Spark Streaming code using Spark Session constructs
of 2.0?


object EventhubsToAzureBlobAsJSON {

  def createStreamingContext(inputOptions: ArgumentMap): StreamingContext = {

    .....

    val sparkSession : SparkSession = SparkSession.builder.config(sparkConfiguration).getOrCreate

    import sparkSession.implicits._

    val streamingContext = new StreamingContext(sparkSession.sparkContext,
      Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int]))
    streamingContext.checkpoint(inputOptions(Symbol(EventhubsArgumentKeys.CheckpointDirectory)).asInstanceOf[String])

    val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)

    val eventHubsWindowedStream = eventHubsStream
      .window(Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int]))

    /**
      * This fails on restart
      */

    eventHubsWindowedStream.map(x => EventContent(new String(x)))
      .foreachRDD(rdd => rdd.toDS.toJSON.write.mode(SaveMode.Overwrite)
        .save(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder))
          .asInstanceOf[String]))

    /**
      * This runs fine on restart
      */

    /*
    eventHubsWindowedStream.map(x => EventContent(new String(x)))
      .foreachRDD(rdd => rdd.saveAsTextFile(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder))
          .asInstanceOf[String], classOf[GzipCodec]))
    */

    .....

  }

  def main(inputArguments: Array[String]): Unit = {

    val inputOptions = EventhubsArgumentParser.parseArguments(Map(), inputArguments.toList)

    EventhubsArgumentParser.verifyEventhubsToAzureBlobAsJSONArguments(inputOptions)

    //Create or recreate streaming context

    val streamingContext = StreamingContext
      .getOrCreate(inputOptions(Symbol(EventhubsArgumentKeys.CheckpointDirectory)).asInstanceOf[String],
        () => createStreamingContext(inputOptions))

    streamingContext.start()

    if(inputOptions.contains(Symbol(EventhubsArgumentKeys.TimeoutInMinutes))) {

      streamingContext.awaitTerminationOrTimeout(inputOptions(Symbol(EventhubsArgumentKeys.TimeoutInMinutes))
        .asInstanceOf[Long] * 60 * 1000)
    }
    else {

      streamingContext.awaitTermination()
    }
  }
}


Thanks, Arijit

Mime
View raw message