spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Goldenberg <dgoldenberg...@gmail.com>
Subject Re: Checkpointing doesn't appear to be working for direct streaming from Kafka
Date Sat, 15 Aug 2015 01:19:22 GMT
Thanks, Cody. It sounds like Spark Streaming has enough state info to know
how many batches have been processed and if not all of them then the RDD is
'unfinished'. I wonder if it would know whether the last micro-batch has
been fully processed successfully. Hypothetically, the driver program could
terminate as the last batch is being processed...

On Fri, Aug 14, 2015 at 6:17 PM, Cody Koeninger <cody@koeninger.org> wrote:

> You'll resume and re-process the rdd that didnt finish
>
> On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg <
> dgoldenberg123@gmail.com> wrote:
>
>> Our additional question on checkpointing is basically the logistics of it
>> --
>>
>> At which point does the data get written into checkpointing?  Is it
>> written as soon as the driver program retrieves an RDD from Kafka (or
>> another source)?  Or, is it written after that RDD has been processed and
>> we're basically moving on to the next RDD?
>>
>> What I'm driving at is, what happens if the driver program is killed?
>> The next time it's started, will it know, from Spark Streaming's
>> checkpointing, to resume from the same RDD that was being processed at the
>> time of the program getting killed?  In other words, will we, upon
>> restarting the consumer, resume from the RDD that was unfinished, or will
>> we be looking at the next RDD?
>>
>> Will we pick up from the last known *successfully processed* topic
>> offset?
>>
>> Thanks.
>>
>>
>>
>>
>> On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <sowen@cloudera.com> wrote:
>>
>>> If you've set the checkpoint dir, it seems like indeed the intent is
>>> to use a default checkpoint interval in DStream:
>>>
>>> private[streaming] def initialize(time: Time) {
>>> ...
>>>   // Set the checkpoint interval to be slideDuration or 10 seconds,
>>> which ever is larger
>>>   if (mustCheckpoint && checkpointDuration == null) {
>>>     checkpointDuration = slideDuration * math.ceil(Seconds(10) /
>>> slideDuration).toInt
>>>     logInfo("Checkpoint interval automatically set to " +
>>> checkpointDuration)
>>>   }
>>>
>>> Do you see that log message? what's the interval? that could at least
>>> explain why it's not doing anything, if it's quite long.
>>>
>>> It sort of seems wrong though since
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>>> suggests it was intended to be a multiple of the batch interval. The
>>> slide duration wouldn't always be relevant anyway.
>>>
>>> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>>> <dgoldenberg123@gmail.com> wrote:
>>> > I've instrumented checkpointing per the programming guide and I can
>>> tell
>>> > that Spark Streaming is creating the checkpoint directories but I'm not
>>> > seeing any content being created in those directories nor am I seeing
>>> the
>>> > effects I'd expect from checkpointing.  I'd expect any data that comes
>>> into
>>> > Kafka while the consumers are down, to get picked up when the
>>> consumers are
>>> > restarted; I'm not seeing that.
>>> >
>>> > For now my checkpoint directory is set to the local file system with
>>> the
>>> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
>>> > subdirectory named with a UUID being created under there but no files.
>>> >
>>> > I'm using a custom JavaStreamingContextFactory which creates a
>>> > JavaStreamingContext with the directory set into it via the
>>> > checkpoint(String) method.
>>> >
>>> > I'm currently not invoking the checkpoint(Duration) method on the
>>> DStream
>>> > since I want to first rely on Spark's default checkpointing interval.
>>> My
>>> > streaming batch duration millis is set to 1 second.
>>> >
>>> > Anyone have any idea what might be going wrong?
>>> >
>>> > Also, at which point does Spark delete files from checkpointing?
>>> >
>>> > Thanks.
>>>
>>
>>
>

Mime
View raw message