spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Checkpointing doesn't appear to be working for direct streaming from Kafka
Date Fri, 14 Aug 2015 22:17:25 GMT
You'll resume and re-process the rdd that didnt finish

On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg <dgoldenberg123@gmail.com
> wrote:

> Our additional question on checkpointing is basically the logistics of it
> --
>
> At which point does the data get written into checkpointing?  Is it
> written as soon as the driver program retrieves an RDD from Kafka (or
> another source)?  Or, is it written after that RDD has been processed and
> we're basically moving on to the next RDD?
>
> What I'm driving at is, what happens if the driver program is killed?  The
> next time it's started, will it know, from Spark Streaming's checkpointing,
> to resume from the same RDD that was being processed at the time of the
> program getting killed?  In other words, will we, upon restarting the
> consumer, resume from the RDD that was unfinished, or will we be looking at
> the next RDD?
>
> Will we pick up from the last known *successfully processed* topic offset?
>
> Thanks.
>
>
>
>
> On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <sowen@cloudera.com> wrote:
>
>> If you've set the checkpoint dir, it seems like indeed the intent is
>> to use a default checkpoint interval in DStream:
>>
>> private[streaming] def initialize(time: Time) {
>> ...
>>   // Set the checkpoint interval to be slideDuration or 10 seconds,
>> which ever is larger
>>   if (mustCheckpoint && checkpointDuration == null) {
>>     checkpointDuration = slideDuration * math.ceil(Seconds(10) /
>> slideDuration).toInt
>>     logInfo("Checkpoint interval automatically set to " +
>> checkpointDuration)
>>   }
>>
>> Do you see that log message? what's the interval? that could at least
>> explain why it's not doing anything, if it's quite long.
>>
>> It sort of seems wrong though since
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>> suggests it was intended to be a multiple of the batch interval. The
>> slide duration wouldn't always be relevant anyway.
>>
>> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>> <dgoldenberg123@gmail.com> wrote:
>> > I've instrumented checkpointing per the programming guide and I can tell
>> > that Spark Streaming is creating the checkpoint directories but I'm not
>> > seeing any content being created in those directories nor am I seeing
>> the
>> > effects I'd expect from checkpointing.  I'd expect any data that comes
>> into
>> > Kafka while the consumers are down, to get picked up when the consumers
>> are
>> > restarted; I'm not seeing that.
>> >
>> > For now my checkpoint directory is set to the local file system with the
>> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
>> > subdirectory named with a UUID being created under there but no files.
>> >
>> > I'm using a custom JavaStreamingContextFactory which creates a
>> > JavaStreamingContext with the directory set into it via the
>> > checkpoint(String) method.
>> >
>> > I'm currently not invoking the checkpoint(Duration) method on the
>> DStream
>> > since I want to first rely on Spark's default checkpointing interval.
>> My
>> > streaming batch duration millis is set to 1 second.
>> >
>> > Anyone have any idea what might be going wrong?
>> >
>> > Also, at which point does Spark delete files from checkpointing?
>> >
>> > Thanks.
>>
>
>

Mime
View raw message