kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: Potential Bug | GlobalStateManager checkpoint
Date Mon, 04 Sep 2017 13:20:20 GMT
Thanks Sameer, yes this looks like a bug. Can you file a JIRA?

On Mon, 4 Sep 2017 at 12:23 Sameer Kumar <sam.kum.work@gmail.com> wrote:

> Hi,
>
> I am using InMemoryStore along with GlobalKTable. I came to realize that I
> was losing on data once I restart my stream application while it was
> consuming data from kafka topic since it would always start with last saved
> checkpoint. This shall work fine with RocksDB it being a persistent store.
> for in memory store it should be consume from beginning.
>
> Debugging it further, I looked at the code for GlobalStateManagerImpl(this
> one works for GlobalKTable) and was comparing the same with
> ProcessorStateManagerImpl(this one works for KTable).
>
> In ProcessorStateManagerImpl.checkpoint, we have added the check for when
> state store being persistent before writing the checkpoints, the same check
> is not there in GlobalStateManagerImpl.checkpoint method. Do you think the
> same check needs to be added for GlobalStateManagerImpl.
>
>   public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
>         log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets);
>         checkpointedOffsets.putAll(changelogReader.restoredOffsets());
>         for (final Map.Entry<String, StateStore> entry : stores.entrySet())
> {
>             final String storeName = entry.getKey();
>             // only checkpoint the offset to the offsets file if
>             // it is persistent AND changelog enabled
> *            if (entry.getValue().persistent() &&
> storeToChangelogTopic.containsKey(storeName)) {*
>                 final String changelogTopic = storeToChangelogTopic.get(
> storeName);
>                 final TopicPartition topicPartition = new
> TopicPartition(changelogTopic, getPartition(storeName));
>                 if (ackedOffsets.containsKey(topicPartition)) {
>                     // store the last offset + 1 (the log position after
> restoration)
>                     checkpointedOffsets.put(topicPartition,
> ackedOffsets.get(topicPartition) + 1);
>                 } else if (restoredOffsets.containsKey(topicPartition)) {
>                     checkpointedOffsets.put(topicPartition,
> restoredOffsets.get(topicPartition));
>                 }
>             }
>         }
>         // write the checkpoint file before closing, to indicate clean
> shutdown
>         try {
>             if (checkpoint == null) {
>                 checkpoint = new OffsetCheckpoint(new File(baseDir,
> CHECKPOINT_FILE_NAME));
>             }
>             checkpoint.write(checkpointedOffsets);
>         } catch (final IOException e) {
>             log.warn("Failed to write checkpoint file to {}:", new
> File(baseDir, CHECKPOINT_FILE_NAME), e);
>         }
>     }
>
> Regards,
> -Sameer.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message