Sure thing!

The main looks like:

--------------------------------------------------------------------------------------------------


val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")

val kafkaConf = Map(
      "zookeeper.connect" -> zookeeper,
      "group.id" -> options.group,
      "zookeeper.connection.timeout.ms" -> "10000",
      "auto.commit.interval.ms" -> "1000",
      "rebalance.max.retries" -> "25",
      "bootstrap.servers" -> kafkaBrokers
    )

val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => {
        createContext(kafkaConf, checkpointDirectory, topic, numThreads, isProd)
      }, createOnError = true) 

ssc.start()
ssc.awaitTermination()


--------------------------------------------------------------------------------------------------


And createContext is defined as:


--------------------------------------------------------------------------------------------------


val batchDuration = Seconds(5)
val checkpointDuration = Seconds(20)

private val AUTO_OFFSET_COMMIT = "auto.commit.enable"

def createContext(kafkaConf: Map[String, String],
                    checkpointDirectory: String,
                    topic: String,
                    numThreads: Int,
                    isProd: Boolean)
  : StreamingContext = {

    val sparkConf = new SparkConf().setAppName("***")
    val ssc = new StreamingContext(sparkConf, batchDuration)
    ssc.checkpoint(checkpointDirectory)

    val topicSet = topic.split(",").toSet
    val groupId = kafkaConf.getOrElse("group.id", "")

    val directKStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
    directKStream.checkpoint(checkpointDuration)

    val table = ***

    directKStream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.flatMap(rec => someFunc(rec))
        .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
        .foreachPartition { partitionRec =>
          val dbWrite = DynamoDBWriter()
          partitionRec.foreach {
            /* Update Dynamo Here */
          }
        }

      /** Set up ZK Connection **/
      val props = new Properties()
      kafkaConf.foreach(param => props.put(param._1, param._2))

      props.setProperty(AUTO_OFFSET_COMMIT, "false")

      val consumerConfig = new ConsumerConfig(props)
      assert(!consumerConfig.autoCommitEnable)

      val zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
        consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

      offsetRanges.foreach { osr =>
        val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
        val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
        ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString)
      }
    }
    ssc
  }



On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <cody@koeninger.org> wrote:
Sounds like something's not set up right... can you post a minimal code example that reproduces the issue?

On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <suchenzang@gmail.com> wrote:
Yeah. All messages are lost while the streaming job was down. 

On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <cody@koeninger.org> wrote:
Are you actually losing messages then?

On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <suchenzang@gmail.com> wrote:
No; first batch only contains messages received after the second job starts (messages come in at a steady rate of about 400/second).

On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <cody@koeninger.org> wrote:
Does the first batch after restart contain all the messages received while the job was down?

On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <suchenzang@gmail.com> wrote:
Hello,

I'm using direct spark streaming (from kafka) with checkpointing, and
everything works well until a restart. When I shut down (^C) the first
streaming job, wait 1 minute, then re-submit, there is somehow a series of 0
event batches that get queued (corresponding to the 1 minute when the job
was down). Eventually, the batches would resume processing, and I would see
that each batch has roughly 2000 events.

I see that at the beginning of the second launch, the checkpoint dirs are
found and "loaded", according to console output.

Is this expected behavior? It seems like I might've configured something
incorrectly, since I would expect with checkpointing that the streaming job
would resume from checkpoint and continue processing from there (without
seeing 0 event batches corresponding to when the job was down).

Also, if I were to wait > 10 minutes or so before re-launching, there would
be so many 0 event batches that the job would hang. Is this merely something
to be "waited out", or should I set up some restart behavior/make a config
change to discard checkpointing if the elapsed time has been too long?

Thanks!

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png>



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org