Have you tried deleting or moving the contents of the checkpoint directory and restarting the job?

On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <dgoldenberg123@gmail.com> wrote:
Sorry, more relevant code below:

SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
JavaStreamingContext jssc = params.isCheckpointed() ? createCheckpointedContext(sparkConf, params) : createContext(sparkConf, params);
jssc.start();
jssc.awaitTermination();
jssc.close();
……………………………..
  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf, Parameters params) {
    JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
      @Override
      public JavaStreamingContext create() {
        return createContext(sparkConf, params);
      }
    };
    return JavaStreamingContext.getOrCreate(params.getCheckpointDir(), factory);
  }

  private JavaStreamingContext createContext(SparkConf sparkConf, Parameters params) {
    // Create context with the specified batch interval, in milliseconds.
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis()));
    // Set the checkpoint directory, if we're checkpointing
    if (params.isCheckpointed()) {
      jssc.checkpoint(params.getCheckpointDir());
    }

    Set<String> topicsSet = new HashSet<String>(Arrays.asList(params.getTopic()));

    // Set the Kafka parameters.
    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params.getBrokerList());
    if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
      kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params.getAutoOffsetReset());
    }

    // Create direct Kafka stream with the brokers and the topic.
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
      jssc,
      String.class,
      String.class,
      StringDecoder.class,
      StringDecoder.class,
      kafkaParams,
      topicsSet);

    // See if there's an override of the default checkpoint duration.
    if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
      messages.checkpoint(Durations.milliseconds(params.getCheckpointMillis()));
    }

    JavaDStream<String> messageBodies = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });

    messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
        ProcessPartitionFunction func = new ProcessPartitionFunction(params);
        rdd.foreachPartition(func);
        return null;
      }
    });

    return jssc;
}


On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <dgoldenberg123@gmail.com> wrote:
I'd think that we wouldn't be "accidentally recovering from checkpoint" hours or even days after consumers have been restarted, plus the content is the fresh content that I'm feeding, not some content that had been fed before the last restart.

The code is basically as follows:

    SparkConf sparkConf = createSparkConf(...);
    // We'd be 'checkpointed' because we specify a checkpoint directory which makes isCheckpointed true
    JavaStreamingContext jssc = params.isCheckpointed() ? createCheckpointedContext(sparkConf, params) : createContext(sparkConf, params);    jssc.start();

    jssc.awaitTermination();

    jssc.close();




On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <tdas@databricks.com> wrote:
Are you sure you are not accidentally recovering from checkpoint? How are you using StreamingContext.getOrCreate() in your code?

TD

On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <dgoldenberg123@gmail.com> wrote:
Tathagata,

In our logs I see the batch duration millis being set first to 10 then to 20 seconds. I don't see the 20 being reflected later during ingestion.

In the Spark UI under Streaming I see the below output, notice the 10 second Batch interval.  Can you think of a reason why it's stuck at 10?  It used to be 1 second by the way, then somehow over the course of a few restarts we managed to get it to be 10 seconds.  Now it won't get reset to 20 seconds.  Any ideas?

Streaming

  • Started at: Thu Sep 03 10:59:03 EDT 2015
  • Time since start: 1 day 8 hours 44 minutes
  • Network receivers: 0
  • Batch interval: 10 seconds
  • Processed batches: 11790
  • Waiting batches: 0
  • Received records: 0
  • Processed records: 0


Statistics over last 100 processed batches

Receiver Statistics
No receivers
Batch Processing Statistics
    MetricLast batchMinimum25th percentileMedian75th percentileMaximum
    Processing Time23 ms7 ms10 ms11 ms14 ms172 ms
    Scheduling Delay1 ms0 ms0 ms0 ms1 ms2 ms
    Total Delay24 ms8 ms10 ms12 ms14 ms173 ms




On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <tdas@databricks.com> wrote:
Could you see what the streaming tab in the Spark UI says? It should show the underlying batch duration of the StreamingContext, the details of when the batch starts, etc. 

BTW, it seems that the 5.6 or 6.8 seconds delay is present only when data is present (that is,  Documents processed: > 0)

On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg <dgoldenberg123@gmail.com> wrote:
Tathagata,

Checkpointing is turned on but we were not recovering. I'm looking at the logs now, feeding fresh content hours after the restart. Here's a snippet:

2015-09-04 06:11:20,013 ... Documents processed: 0.
2015-09-04 06:11:30,014 ... Documents processed: 0.
2015-09-04 06:11:40,011 ... Documents processed: 0.
2015-09-04 06:11:50,012 ... Documents processed: 0.
2015-09-04 06:12:00,010 ... Documents processed: 0.
2015-09-04 06:12:10,047 ... Documents processed: 0.
2015-09-04 06:12:20,012 ... Documents processed: 0.
2015-09-04 06:12:30,011 ... Documents processed: 0.
2015-09-04 06:12:40,012 ... Documents processed: 0.
2015-09-04 06:12:55,629 ... Documents processed: 4.
2015-09-04 06:13:00,018 ... Documents processed: 0.
2015-09-04 06:13:10,012 ... Documents processed: 0.
2015-09-04 06:13:20,019 ... Documents processed: 0.
2015-09-04 06:13:30,014 ... Documents processed: 0.
2015-09-04 06:13:40,041 ... Documents processed: 0.
2015-09-04 06:13:50,009 ... Documents processed: 0.
...
2015-09-04 06:17:30,019 ... Documents processed: 0.
2015-09-04 06:17:46,832 ... Documents processed: 40.

Interestingly, the fresh content (4 documents) is fed about 5.6 seconds after the previous batch, not 10 seconds. The second fresh batch comes in 6.8 seconds after the previous empty one.

Granted, the log message is printed after iterating over the messages which may account for some time differences. But generally, looking at the log messages being printed before we iterate, it's still 10 seconds each time, not 20 which is what batchdurationmillis is currently set to.

Code:

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(....);
messages.checkpoint(Durations.milliseconds(checkpointMillis));

    JavaDStream<String> messageBodies = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });

    messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
        ProcessPartitionFunction func = new ProcessPartitionFunction(...);
        rdd.foreachPartition(func);
        return null;
      }
    });

The log message comes from ProcessPartitionFunction:

public void call(Iterator<String> messageIterator) throws Exception {
    log.info("Starting data partition processing. AppName={}, topic={}.)...", appName, topic);
    // ... iterate ...
    log.info("Finished data partition processing (appName={}, topic={}). Documents processed: {}."appName, topic, docCount);
}

Any ideas? Thanks.

- Dmitry


On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <tdas@databricks.com> wrote:
Are you accidentally recovering from checkpoint files which has 10 second as the batch interval?


On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg <dgoldenberg123@gmail.com> wrote:
I'm seeing an oddity where I initially set the batchdurationmillis to 1 second and it works fine:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(batchDurationMillis));

Then I tried changing the value to 10 seconds. The change didn't seem to take. I've bounced the Spark workers and the consumers and now I'm seeing RDD's coming in once around 10 seconds (not always 10 seconds according to the logs).

However, now I'm trying to change the value to 20 seconds and it's just not taking. I've bounced Spark master, workers, and consumers and the value seems "stuck" at 10 seconds.

Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.

Thanks.

- Dmitry