kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler" <vvcep...@apache.org>
Subject Re: Reducing streams startup bandwidth usage
Date Tue, 03 Dec 2019 23:04:02 GMT
Hi Alessandro,

To take a stab at your question, maybe it first doesn't find it, but then restores some data,
writes the checkpoint, and then later on, it has to re-initialize the task for some reason,
and that's why it does find a checkpoint then?

More to the heart of the issue, if you have EOS enabled, Streams _only_ records the checkpoint
when the store is in a known-consistent state. For example, if you have a graceful shutdown,
Streams will flush all the stores, commit all the transactions, and then write the checkpoint
file. Then, on re-start, it will pick up from that checkpoint.

But as soon as it starts processing records, it removes the checkpoint file, so if it crashes
while it was processing, there is no checkpoint file there, and it will have to restore from
the beginning of the changelog.

This design is there on purpose, because otherwise we cannot actually guarantee correctness...
For example, if you are maintaining a count operation, and we process an input record i, increment
the count and write it to the state store, and to the changelog topic. But we crash before
we commit that transaction. Then, the write to the changelog would be aborted, and we would
re-process record i . However, we've already updated the local state store, so when we increment
it again, it results in double-counting i. The key point here is that there's no way to do
an atomic operation across two different systems (local state store and the changelog topic).
Since we can't guarantee that we roll back the incremented count when the changelog transaction
is aborted, we can't keep the local store consistent with the changelog.

After a crash, the only way to ensure the local store is consistent with the changelog is
to discard the entire thing and rebuild it. This is why we have an invariant that the checkpoint
file only exists when we _know_ that the local store is consistent with the changelog, and
this is why you're seeing so much bandwidth when re-starting from an unclean shutdown.

Note that it's definitely possible to do better than this, and we would very much like to
improve it in the future.

Thanks,
-John

On Tue, Dec 3, 2019, at 16:16, Alessandro Tagliapietra wrote:
> Hi John,
> 
> thanks a lot for helping, regarding your message:
>  - no we only have 1 instance of the stream application, and it always
> re-uses the same state folder
>  - yes we're seeing most issues when restarting not gracefully due exception
> 
> I've enabled trace logging and filtering by a single state store the
> StoreChangelogReader messages are:
> 
> Added restorer for changelog sensors-stream-aggregate-store-changelog-0
> Added restorer for changelog sensors-stream-aggregate-store-changelog-1
> Added restorer for changelog sensors-stream-aggregate-store-changelog-2
> Did not find checkpoint from changelog
> sensors-stream-aggregate-store-changelog-2 for store aggregate-store,
> rewinding to beginning.
> Did not find checkpoint from changelog
> sensors-stream-aggregate-store-changelog-1 for store aggregate-store,
> rewinding to beginning.
> Did not find checkpoint from changelog
> sensors-stream-aggregate-store-changelog-0 for store aggregate-store,
> rewinding to beginning.
> No checkpoint found for task 0_2 state store aggregate-store changelog
> sensors-stream-aggregate-store-changelog-2 with EOS turned on.
> Reinitializing the task and restore its state from the beginning.
> No checkpoint found for task 0_1 state store aggregate-store changelog
> sensors-stream-aggregate-store-changelog-1 with EOS turned on.
> Reinitializing the task and restore its state from the beginning.
> No checkpoint found for task 0_0 state store aggregate-store changelog
> sensors-stream-aggregate-store-changelog-0 with EOS turned on.
> Reinitializing the task and restore its state from the beginning.
> Found checkpoint 709937 from changelog
> sensors-stream-aggregate-store-changelog-2 for store aggregate-store.
> Restoring partition sensors-stream-aggregate-store-changelog-2 from offset
> 709937 to endOffset 742799
> Found checkpoint 3024234 from changelog
> sensors-stream-aggregate-store-changelog-1 for store aggregate-store.
> Restoring partition sensors-stream-aggregate-store-changelog-1 from offset
> 3024234 to endOffset 3131513
> Found checkpoint 14514072 from changelog
> sensors-stream-aggregate-store-changelog-0 for store aggregate-store.
> Restoring partition sensors-stream-aggregate-store-changelog-0 from offset
> 14514072 to endOffset 17116574
> Restored from sensors-stream-aggregate-store-changelog-2 to aggregate-store
> with 966 records, ending offset is 711432, next starting position is 711434
> Restored from sensors-stream-aggregate-store-changelog-2 to aggregate-store
> with 914 records, ending offset is 712711, next starting position is 712713
> Restored from sensors-stream-aggregate-store-changelog-1 to aggregate-store
> with 18 records, ending offset is 3024261, next starting position is 3024262
> 
> 
> why it first says it didn't find the checkpoint and then it does find it?
> It seems it loaded about  2.7M records (sum of offset difference in the
> "restorting partition ...." messages) right?
> Maybe should I try to reduce the checkpoint interval?
> 
> Regards
> 
> --
> Alessandro Tagliapietra
> 
> 
> On Mon, Dec 2, 2019 at 9:18 AM John Roesler <vvcephei@apache.org> wrote:
> 
> > Hi Alessandro,
> >
> > I'm sorry to hear that.
> >
> > The restore process only takes one factor into account: the current offset
> > position of the changelog topic is stored in a local file alongside the
> > state stores. On startup, the app checks if the recorded position lags the
> > latest offset in the changelog. If so, then it reads the missing changelog
> > records before starting processing.
> >
> > Thus, it would not restore any old window data.
> >
> > There might be a few different things going on to explain your observation:
> > * if there is more than one instance in your Streams cluster, maybe the
> > task is "flopping" between instances, so each instance still has to recover
> > state, since it wasn't the last one actively processing it.
> > * if the application isn't stopped gracefully, it might not get a chance
> > to record its offset in that local file, so on restart it has to restore
> > some or all of the state store from changelog.
> >
> > Or it could be something else; that's just what comes to mind.
> >
> > If you want to get to the bottom of it, you can take a look at the logs,
> > paying close attention to which tasks are assigned to which instances after
> > each restart. You can also look into the logs from
> > `org.apache.kafka.streams.processor.internals.StoreChangelogReader` (might
> > want to set it to DEBUG or TRACE level to really see what's happening).
> >
> > I hope this helps!
> > -John
> >
> > On Sun, Dec 1, 2019, at 21:25, Alessandro Tagliapietra wrote:
> > > Hello everyone,
> > >
> > > we're having a problem with bandwidth usage on streams application
> > startup,
> > > our current setup does this:
> > >
> > > ...
> > > .groupByKey()
> > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > .aggregate(
> > >         { MetricSequenceList(ArrayList()) },
> > >         { key, value, aggregate ->
> > >             aggregate.getRecords().add(value)
> > >             aggregate
> > >         },
> > >         Materialized.`as`<String, MetricSequenceList, WindowStore<Bytes,
> > >
> > ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > )
> > > .toStream()
> > > .flatTransform(TransformerSupplier {
> > > ...
> > >
> > > basically in each window we append the new values and then do some other
> > > logic with the array of windowed values.
> > > The aggregate-store changelog topic configuration  uses compact,delete as
> > > cleanup policy and has 12 hours of retention.
> > >
> > > What we've seen is that on application startup it takes a couple minutes
> > to
> > > rebuild the state store, even if the state store directory is persisted
> > > across restarts. That along with an exception that caused the docker
> > > container to be restarted a couple hundreds times caused a big confluent
> > > cloud bill compared to what we usually spend (1/4 of a full month in 1
> > day).
> > >
> > > What I think is happening is that the topic is keeping all the previous
> > > windows even with the compacting policy because each key is the original
> > > key + the timestamp not just the key. Since we don't care about previous
> > > windows as the flatTransform after the toStream() makes sure that we
> > don't
> > > process old windows (a custom suppressor basically) is there a way to
> > only
> > > keep the last window so that the store rebuilding goes faster and without
> > > rebuilding old windows too? Or should I create a custom window using the
> > > original key as key so that the compaction keeps only the last window
> > data?
> > >
> > > Thank you
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> >
>

Mime
View raw message