spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Fang" <xiaojian....@alibaba-inc.com>
Subject Question about the DirectKafkaInputDStream
Date Thu, 08 Dec 2016 17:50:55 GMT
The source is DirectKafkaInputDStream which can ensure the exectly-once of the consumer side. But I have a question based the following code。As we known, the 
"graph.generateJobs(time)" will create rdds and generate jobs。And the source RDD is KafkaRDD which contain the offsetRange。 The jobs are submitted successfully by " jobScheduler.submitJobSet", and the cluster start running the jobs. After that, the driver crash suddenly and will lost the offsetRange. Because the driver has not run the "eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))" yet. 

```
  private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
  ```
Mime
View raw message