kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com>
Subject Re: Reducing streams startup bandwidth usage
Date Tue, 03 Dec 2019 22:16:26 GMT
Hi John,

thanks a lot for helping, regarding your message:
 - no we only have 1 instance of the stream application, and it always
re-uses the same state folder
 - yes we're seeing most issues when restarting not gracefully due exception

I've enabled trace logging and filtering by a single state store the
StoreChangelogReader messages are:

Added restorer for changelog sensors-stream-aggregate-store-changelog-0
Added restorer for changelog sensors-stream-aggregate-store-changelog-1
Added restorer for changelog sensors-stream-aggregate-store-changelog-2
Did not find checkpoint from changelog
sensors-stream-aggregate-store-changelog-2 for store aggregate-store,
rewinding to beginning.
Did not find checkpoint from changelog
sensors-stream-aggregate-store-changelog-1 for store aggregate-store,
rewinding to beginning.
Did not find checkpoint from changelog
sensors-stream-aggregate-store-changelog-0 for store aggregate-store,
rewinding to beginning.
No checkpoint found for task 0_2 state store aggregate-store changelog
sensors-stream-aggregate-store-changelog-2 with EOS turned on.
Reinitializing the task and restore its state from the beginning.
No checkpoint found for task 0_1 state store aggregate-store changelog
sensors-stream-aggregate-store-changelog-1 with EOS turned on.
Reinitializing the task and restore its state from the beginning.
No checkpoint found for task 0_0 state store aggregate-store changelog
sensors-stream-aggregate-store-changelog-0 with EOS turned on.
Reinitializing the task and restore its state from the beginning.
Found checkpoint 709937 from changelog
sensors-stream-aggregate-store-changelog-2 for store aggregate-store.
Restoring partition sensors-stream-aggregate-store-changelog-2 from offset
709937 to endOffset 742799
Found checkpoint 3024234 from changelog
sensors-stream-aggregate-store-changelog-1 for store aggregate-store.
Restoring partition sensors-stream-aggregate-store-changelog-1 from offset
3024234 to endOffset 3131513
Found checkpoint 14514072 from changelog
sensors-stream-aggregate-store-changelog-0 for store aggregate-store.
Restoring partition sensors-stream-aggregate-store-changelog-0 from offset
14514072 to endOffset 17116574
Restored from sensors-stream-aggregate-store-changelog-2 to aggregate-store
with 966 records, ending offset is 711432, next starting position is 711434
Restored from sensors-stream-aggregate-store-changelog-2 to aggregate-store
with 914 records, ending offset is 712711, next starting position is 712713
Restored from sensors-stream-aggregate-store-changelog-1 to aggregate-store
with 18 records, ending offset is 3024261, next starting position is 3024262


why it first says it didn't find the checkpoint and then it does find it?
It seems it loaded about  2.7M records (sum of offset difference in the
"restorting partition ...." messages) right?
Maybe should I try to reduce the checkpoint interval?

Regards

--
Alessandro Tagliapietra


On Mon, Dec 2, 2019 at 9:18 AM John Roesler <vvcephei@apache.org> wrote:

> Hi Alessandro,
>
> I'm sorry to hear that.
>
> The restore process only takes one factor into account: the current offset
> position of the changelog topic is stored in a local file alongside the
> state stores. On startup, the app checks if the recorded position lags the
> latest offset in the changelog. If so, then it reads the missing changelog
> records before starting processing.
>
> Thus, it would not restore any old window data.
>
> There might be a few different things going on to explain your observation:
> * if there is more than one instance in your Streams cluster, maybe the
> task is "flopping" between instances, so each instance still has to recover
> state, since it wasn't the last one actively processing it.
> * if the application isn't stopped gracefully, it might not get a chance
> to record its offset in that local file, so on restart it has to restore
> some or all of the state store from changelog.
>
> Or it could be something else; that's just what comes to mind.
>
> If you want to get to the bottom of it, you can take a look at the logs,
> paying close attention to which tasks are assigned to which instances after
> each restart. You can also look into the logs from
> `org.apache.kafka.streams.processor.internals.StoreChangelogReader` (might
> want to set it to DEBUG or TRACE level to really see what's happening).
>
> I hope this helps!
> -John
>
> On Sun, Dec 1, 2019, at 21:25, Alessandro Tagliapietra wrote:
> > Hello everyone,
> >
> > we're having a problem with bandwidth usage on streams application
> startup,
> > our current setup does this:
> >
> > ...
> > .groupByKey()
> > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > .aggregate(
> >         { MetricSequenceList(ArrayList()) },
> >         { key, value, aggregate ->
> >             aggregate.getRecords().add(value)
> >             aggregate
> >         },
> >         Materialized.`as`<String, MetricSequenceList, WindowStore<Bytes,
> >
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > )
> > .toStream()
> > .flatTransform(TransformerSupplier {
> > ...
> >
> > basically in each window we append the new values and then do some other
> > logic with the array of windowed values.
> > The aggregate-store changelog topic configuration  uses compact,delete as
> > cleanup policy and has 12 hours of retention.
> >
> > What we've seen is that on application startup it takes a couple minutes
> to
> > rebuild the state store, even if the state store directory is persisted
> > across restarts. That along with an exception that caused the docker
> > container to be restarted a couple hundreds times caused a big confluent
> > cloud bill compared to what we usually spend (1/4 of a full month in 1
> day).
> >
> > What I think is happening is that the topic is keeping all the previous
> > windows even with the compacting policy because each key is the original
> > key + the timestamp not just the key. Since we don't care about previous
> > windows as the flatTransform after the toStream() makes sure that we
> don't
> > process old windows (a custom suppressor basically) is there a way to
> only
> > keep the last window so that the store rebuilding goes faster and without
> > rebuilding old windows too? Or should I create a custom window using the
> > original key as key so that the compaction keeps only the last window
> data?
> >
> > Thank you
> >
> > --
> > Alessandro Tagliapietra
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message