kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Difference between with and without state store cleanup streams startup
Date Sat, 01 Apr 2017 23:29:02 GMT
Yes. That's correct.

-Matthias

On 3/31/17 9:32 PM, Sachin Mittal wrote:
> Hi,
> Ok so basically what I understand is that there are no global offset
> maintained from changelog topic at broker level.
> Every local state store maintains the offset under a local checkpoint file.
> 
> And in order to make sure state store rebuilds or builds its state by
> reading from changelog topic faster, we need to ensure that change log
> topics are compacted efficiently.
> 
> I hope these assumptions are correct.
> 
> Thanks
> Sachin
> 
> 
> On Sat, Apr 1, 2017 at 4:51 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> 1. The whole log will be read.
>>
>> 2. It will read all the key-value pairs. However, the store will contain
>> only the latest record for each key, after state recovery finished.
>>
>> Both both (1) and (2): note, that changelog topics are compacted, thus,
>> it will not read everything since you started your app, as log
>> compaction will remove old records that got "overwritten" with newer
>> records.
>>
>> 3. The instance will hit the timeout and you will get an exception.
>>
>> 4. and 5. It depends. On a clean shutdown, Streams writes a checkpoint
>> file and thus knows the state of the store. Therefore, it does not need
>> to read the changelog to recover the store. On unclean shutdown (like
>> kill -9) the checkpoint file will be missing, and thus Streams will wipe
>> out the state and recreate it from scratch (thus, you don't need to call
>> .cleanup() manually)
>>
>> 6.
>> https://github.com/apache/kafka/blob/trunk/streams/src/
>> main/java/org/apache/kafka/streams/processor/internals/
>> StoreChangelogReader.java#L99
>>
>> called here:
>> https://github.com/apache/kafka/blob/trunk/streams/src/
>> main/java/org/apache/kafka/streams/processor/internals/
>> StreamThread.java#L1294
>>
>>
>> -Matthias
>>
>> On 3/31/17 11:46 AM, Sachin Mittal wrote:
>>> Hi,
>>> There are two ways to re start a streams application
>>> 1. executing streams.cleanUp() before streams.start()
>>> This cleans up the local state store.
>>>
>>> 2. Just by calling streams.start()
>>>
>>> What are the differences between two.
>>>
>>> As I understand in first case it will try to create local state store by
>>> replaying the changelog topic.
>>>
>>> So questions here are
>>> 1. Will it try to replay the whole log from earliest or from last
>> committed
>>> offset?
>>> 2. Will it read and fetch all the values from the topic for a given key
>> or
>>> only the last value for a key when creating a state store for that change
>>> log topic?
>>> 3. What happens if time to create the state store is greater than
>>> max.poll.interval.ms
>>>
>>> 4. If we don't delete the state store then what happens. Does it again
>> try
>>> to recreate the same store by reading entire change log topic? Or if it
>>> determines somehow from the state store what the latest offset is and
>>> updates the state store with values from that offset onwards.
>>>
>>> 5. In case of unclean shutdown say by kill -9 is it advisable to cleanup
>>> local state store before restart or we can just stat it normally.
>>>
>>> 6. Finally can anyone point me to the code where it creates the state
>> store
>>> by reading from changelog topic.
>>>
>>> Thanks
>>> Sachin
>>>
>>
>>
> 


Mime
View raw message