spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shushant Arora <>
Subject Re: spark streaming 1.3 with kafka
Date Tue, 01 Sep 2015 16:19:30 GMT
Since in my app , after processing the events I am posting the events to
some external server- if external server is down - I want to backoff
consuming from kafka. But I can't stop and restart the consumer since it
needs manual effort.

Backing off few batches is also not possible -since decision of backoff is
based on last batch process status but driver has already computed offsets
for next batches - so if I ignore further few batches till external server
is back to normal its a dataloss if I cannot reset the offset .

So only option seems is to delay the last batch by calling sleep() in
foreach rdd method after returning from foreachpartitions transformations.

So concern here is further batches will keep enqueening until current slept
batch completes. So whats the max size of scheduling queue? Say if server
does not come up for hours and my batch size is 5 sec it will enqueue 720
batches .
Will that be a issue ?
 And is there any setting in directkafkastream to enforce not to call
further computes() method after a threshold of scheduling queue size say
(50 batches).Once queue size comes back to less than threshold call compute
and enqueue the next job.

On Tue, Sep 1, 2015 at 8:57 PM, Cody Koeninger <> wrote:

> Honestly I'd concentrate more on getting your batches to finish in a
> timely fashion, so you won't even have the issue to begin with...
> On Tue, Sep 1, 2015 at 10:16 AM, Shushant Arora <
> > wrote:
>> What if I use custom checkpointing. So that I can take care of offsets
>> being checkpointed at end of each batch.
>> Will it be possible then to reset the offset.
>> On Tue, Sep 1, 2015 at 8:42 PM, Cody Koeninger <>
>> wrote:
>>> No, if you start arbitrarily messing around with offset ranges after
>>> compute is called, things are going to get out of whack.
>>> e.g. checkpoints are no longer going to correspond to what you're
>>> actually processing
>>> On Tue, Sep 1, 2015 at 10:04 AM, Shushant Arora <
>>>> wrote:
>>>> can I reset the range based on some condition - before calling
>>>> transformations on the stream.
>>>> Say -
>>>> before calling :
>>>>  directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>()
>>>> @Override
>>>> public Void call(JavaRDD<byte[][]> v1) throws Exception {
>>>> v1.foreachPartition(new  VoidFunction<Iterator<byte[][]>>{
>>>> @Override
>>>> public void call(Iterator<byte[][]> t) throws Exception {
>>>> }});}});
>>>> change directKafkaStream's RDD's offset range.(fromOffset).
>>>> I can't do this in compute method since compute would have been called
>>>> at current batch queue time - but condition is set at previous batch run
>>>> time.
>>>> On Tue, Sep 1, 2015 at 7:09 PM, Cody Koeninger <>
>>>> wrote:
>>>>> It's at the time compute() gets called, which should be near the time
>>>>> the batch should have been queued.
>>>>> On Tue, Sep 1, 2015 at 8:02 AM, Shushant Arora <
>>>>>> wrote:
>>>>>> Hi
>>>>>> In spark streaming 1.3 with kafka- when does driver bring latest
>>>>>> offsets of this run - at start of each batch or at time when  batch
>>>>>> queued ?
>>>>>> Say few of my batches take longer time to complete than their batch
>>>>>> interval. So some of batches will go in queue. Will driver waits
>>>>>>  queued batches to get started or just brings the latest offsets
>>>>>> they even actually started. And when they start running they will
work on
>>>>>> old offsets brought at time when they were queued.

View raw message