Sounds like you'd be better off just failing if the external server is down, and scripting monitoring / restarting of your job.

On Tue, Sep 1, 2015 at 11:19 AM, Shushant Arora <> wrote:
Since in my app , after processing the events I am posting the events to some external server- if external server is down - I want to backoff consuming from kafka. But I can't stop and restart the consumer since it needs manual effort.

Backing off few batches is also not possible -since decision of backoff is based on last batch process status but driver has already computed offsets for next batches - so if I ignore further few batches till external server is back to normal its a dataloss if I cannot reset the offset .

So only option seems is to delay the last batch by calling sleep() in foreach rdd method after returning from foreachpartitions transformations.

So concern here is further batches will keep enqueening until current slept batch completes. So whats the max size of scheduling queue? Say if server does not come up for hours and my batch size is 5 sec it will enqueue 720 batches . 
Will that be a issue ?
 And is there any setting in directkafkastream to enforce not to call further computes() method after a threshold of scheduling queue size say (50 batches).Once queue size comes back to less than threshold call compute and enqueue the next job.

On Tue, Sep 1, 2015 at 8:57 PM, Cody Koeninger <> wrote:
Honestly I'd concentrate more on getting your batches to finish in a timely fashion, so you won't even have the issue to begin with...

On Tue, Sep 1, 2015 at 10:16 AM, Shushant Arora <> wrote:
What if I use custom checkpointing. So that I can take care of offsets being checkpointed at end of each batch.

Will it be possible then to reset the offset.

On Tue, Sep 1, 2015 at 8:42 PM, Cody Koeninger <> wrote:
No, if you start arbitrarily messing around with offset ranges after compute is called, things are going to get out of whack.  

e.g. checkpoints are no longer going to correspond to what you're actually processing

On Tue, Sep 1, 2015 at 10:04 AM, Shushant Arora <> wrote:
can I reset the range based on some condition - before calling transformations on the stream.

Say -
before calling :
 directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {

public Void call(JavaRDD<byte[][]> v1) throws Exception {
v1.foreachPartition(new  VoidFunction<Iterator<byte[][]>>{
public void call(Iterator<byte[][]> t) throws Exception {

change directKafkaStream's RDD's offset range.(fromOffset).

I can't do this in compute method since compute would have been called at current batch queue time - but condition is set at previous batch run time.

On Tue, Sep 1, 2015 at 7:09 PM, Cody Koeninger <> wrote:
It's at the time compute() gets called, which should be near the time the batch should have been queued.

On Tue, Sep 1, 2015 at 8:02 AM, Shushant Arora <> wrote:

In spark streaming 1.3 with kafka- when does driver bring latest offsets of this run - at start of each batch or at time when  batch gets queued ?

Say few of my batches take longer time to complete than their batch interval. So some of batches will go in queue. Will driver waits for  queued batches to get started or just brings the latest offsets before they even actually started. And when they start running they will work on old offsets brought at time when they were queued.