kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler" <vvcep...@apache.org>
Subject Re: Reducing streams startup bandwidth usage
Date Wed, 04 Dec 2019 15:56:52 GMT
Oh, good!

On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote:
> Testing on staging shows that a restart on exception is much faster and the
> stream starts right away which I think means we're reading way less data
> than before!
> 
> What I was referring to is that, in Streams, the keys for window
> > aggregation state is actually composed of both the window itself and the
> > key. In the DSL, it looks like "Windowed<K>". That results in the store
> > having a unique key per window for each K, which is why we need retention
> > as well as compaction for our changelogs. But for you, if you just make the
> > key "K", then compaction alone should do the trick.
> 
> Yes we had compact,delete as cleanup policy but probably it still had a too
> long retention value, also the rocksdb store is probably much faster now
> having only one key per key instead of one key per window per key.
> 
> Thanks a lot for helping! I'm now going to setup a prometheus-jmx
> monitoring so we can keep better track of what's going on :)
> 
> --
> Alessandro Tagliapietra
> 
> 
> On Tue, Dec 3, 2019 at 9:12 PM John Roesler <vvcephei@apache.org> wrote:
> 
> > Oh, yeah, I remember that conversation!
> >
> > Yes, then, I agree, if you're only storing state of the most recent window
> > for each key, and the key you use for that state is actually the key of the
> > records, then an aggressive compaction policy plus your custom transformer
> > seems like a good way forward.
> >
> > What I was referring to is that, in Streams, the keys for window
> > aggregation state is actually composed of both the window itself and the
> > key. In the DSL, it looks like "Windowed<K>". That results in the store
> > having a unique key per window for each K, which is why we need retention
> > as well as compaction for our changelogs. But for you, if you just make the
> > key "K", then compaction alone should do the trick.
> >
> > And yes, if you manage the topic yourself, then Streams won't adjust the
> > retention time. I think it might validate that the retention isn't too
> > short, but I don't remember offhand.
> >
> > Cheers, and let me know how it goes!
> > -John
> >
> > On Tue, Dec 3, 2019, at 23:03, Alessandro Tagliapietra wrote:
> > > Hi John,
> > >
> > > afaik grace period uses stream time
> > >
> > https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
> > > which is
> > > per partition, unfortunately we process data that's not in sync between
> > > keys so each key needs to be independent and a key can have much older
> > > data
> > > than the other.
> > >
> > > Having a small grace period would probably close old windows sooner than
> > > expected. That's also why in my use case a custom store that just stores
> > > the last window data for each key might work better. I had the same issue
> > > with suppression and it has been reported here
> > > https://issues.apache.org/jira/browse/KAFKA-8769
> > > Oh I just saw that you're the one that helped me on slack and created the
> > > issue (thanks again for that).
> > >
> > > The behavior that you mention about streams setting changelog retention
> > > time is something they do on creation of the topic when the broker has
> > auto
> > > creation enabled? Because we're using confluent cloud and I had to create
> > > it manually.
> > > Regarding the change in the recovery behavior, with compact cleanup
> > policy
> > > shouldn't the changelog only keep the last value? That would make the
> > > recovery faster and "cheaper" as it would only need to read a single
> > value
> > > per key (if the cleanup just happened) right?
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler <vvcephei@apache.org> wrote:
> > >
> > > > Hey Alessandro,
> > > >
> > > > That sounds also like it would work. I'm wondering if it would actually
> > > > change what you observe w.r.t. recovery behavior, though. Streams
> > already
> > > > sets the retention time on the changelog to equal the retention time
> > of the
> > > > windows, for windowed aggregations, so you shouldn't be loading a lot
> > of
> > > > window data for old windows you no longer care about.
> > > >
> > > > Have you set the "grace period" on your window definition? By default,
> > it
> > > > is set to 24 hours, but you can set it as low as you like. E.g., if you
> > > > want to commit to having in-order data only, then you can set the grace
> > > > period to zero. This _should_ let the broker clean up the changelog
> > records
> > > > as soon as the window ends.
> > > >
> > > > Of course, the log cleaner doesn't run all the time, so there's some
> > extra
> > > > delay in which "expired" data would still be visible in the changelog,
> > but
> > > > it would actually be just the same as if you manage the store yourself.
> > > >
> > > > Hope this helps!
> > > > -John
> > > >
> > > > On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra wrote:
> > > > > Thanks John for the explanation,
> > > > >
> > > > > I thought that with EOS enabled (which we have) it would in the worst
> > > > case
> > > > > find a valid checkpoint and start the restore from there until it
> > reached
> > > > > the last committed status, not completely from scratch. What you
say
> > > > > definitely makes sense now.
> > > > > Since we don't really need old time windows and we ensure data is
> > ordered
> > > > > when processed I think I"ll just write a custom transformer to keep
> > only
> > > > > the last window, store intermediate aggregation results in the store
> > and
> > > > > emit a new value only when we receive data belonging to a new window.
> > > > > That with a compact only changelog topic should keep the rebuild
> > data to
> > > > > the minimum as it would have only the last value for each key.
> > > > >
> > > > > Hope that makes sense
> > > > >
> > > > > Thanks again
> > > > >
> > > > > --
> > > > > Alessandro Tagliapietra
> > > > >
> > > > >
> > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler <vvcephei@apache.org>
> > wrote:
> > > > >
> > > > > > Hi Alessandro,
> > > > > >
> > > > > > To take a stab at your question, maybe it first doesn't find
it,
> > but
> > > > then
> > > > > > restores some data, writes the checkpoint, and then later on,
it
> > has to
> > > > > > re-initialize the task for some reason, and that's why it does
> > find a
> > > > > > checkpoint then?
> > > > > >
> > > > > > More to the heart of the issue, if you have EOS enabled, Streams
> > _only_
> > > > > > records the checkpoint when the store is in a known-consistent
> > state.
> > > > For
> > > > > > example, if you have a graceful shutdown, Streams will flush
all
> > the
> > > > > > stores, commit all the transactions, and then write the checkpoint
> > > > file.
> > > > > > Then, on re-start, it will pick up from that checkpoint.
> > > > > >
> > > > > > But as soon as it starts processing records, it removes the
> > checkpoint
> > > > > > file, so if it crashes while it was processing, there is no
> > checkpoint
> > > > file
> > > > > > there, and it will have to restore from the beginning of the
> > changelog.
> > > > > >
> > > > > > This design is there on purpose, because otherwise we cannot
> > actually
> > > > > > guarantee correctness... For example, if you are maintaining
a
> > count
> > > > > > operation, and we process an input record i, increment the count
> > and
> > > > write
> > > > > > it to the state store, and to the changelog topic. But we crash
> > before
> > > > we
> > > > > > commit that transaction. Then, the write to the changelog would
be
> > > > aborted,
> > > > > > and we would re-process record i . However, we've already updated
> > the
> > > > local
> > > > > > state store, so when we increment it again, it results in
> > > > double-counting
> > > > > > i. The key point here is that there's no way to do an atomic
> > operation
> > > > > > across two different systems (local state store and the changelog
> > > > topic).
> > > > > > Since we can't guarantee that we roll back the incremented count
> > when
> > > > the
> > > > > > changelog transaction is aborted, we can't keep the local store
> > > > consistent
> > > > > > with the changelog.
> > > > > >
> > > > > > After a crash, the only way to ensure the local store is consistent
> > > > with
> > > > > > the changelog is to discard the entire thing and rebuild it.
This
> > is
> > > > why we
> > > > > > have an invariant that the checkpoint file only exists when
we
> > _know_
> > > > that
> > > > > > the local store is consistent with the changelog, and this is
why
> > > > you're
> > > > > > seeing so much bandwidth when re-starting from an unclean shutdown.
> > > > > >
> > > > > > Note that it's definitely possible to do better than this, and
we
> > would
> > > > > > very much like to improve it in the future.
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro Tagliapietra wrote:
> > > > > > > Hi John,
> > > > > > >
> > > > > > > thanks a lot for helping, regarding your message:
> > > > > > >  - no we only have 1 instance of the stream application,
and it
> > > > always
> > > > > > > re-uses the same state folder
> > > > > > >  - yes we're seeing most issues when restarting not gracefully
> > due
> > > > > > exception
> > > > > > >
> > > > > > > I've enabled trace logging and filtering by a single state
store
> > the
> > > > > > > StoreChangelogReader messages are:
> > > > > > >
> > > > > > > Added restorer for changelog
> > > > sensors-stream-aggregate-store-changelog-0
> > > > > > > Added restorer for changelog
> > > > sensors-stream-aggregate-store-changelog-1
> > > > > > > Added restorer for changelog
> > > > sensors-stream-aggregate-store-changelog-2
> > > > > > > Did not find checkpoint from changelog
> > > > > > > sensors-stream-aggregate-store-changelog-2 for store
> > aggregate-store,
> > > > > > > rewinding to beginning.
> > > > > > > Did not find checkpoint from changelog
> > > > > > > sensors-stream-aggregate-store-changelog-1 for store
> > aggregate-store,
> > > > > > > rewinding to beginning.
> > > > > > > Did not find checkpoint from changelog
> > > > > > > sensors-stream-aggregate-store-changelog-0 for store
> > aggregate-store,
> > > > > > > rewinding to beginning.
> > > > > > > No checkpoint found for task 0_2 state store aggregate-store
> > > > changelog
> > > > > > > sensors-stream-aggregate-store-changelog-2 with EOS turned
on.
> > > > > > > Reinitializing the task and restore its state from the
beginning.
> > > > > > > No checkpoint found for task 0_1 state store aggregate-store
> > > > changelog
> > > > > > > sensors-stream-aggregate-store-changelog-1 with EOS turned
on.
> > > > > > > Reinitializing the task and restore its state from the
beginning.
> > > > > > > No checkpoint found for task 0_0 state store aggregate-store
> > > > changelog
> > > > > > > sensors-stream-aggregate-store-changelog-0 with EOS turned
on.
> > > > > > > Reinitializing the task and restore its state from the
beginning.
> > > > > > > Found checkpoint 709937 from changelog
> > > > > > > sensors-stream-aggregate-store-changelog-2 for store
> > aggregate-store.
> > > > > > > Restoring partition sensors-stream-aggregate-store-changelog-2
> > from
> > > > > > offset
> > > > > > > 709937 to endOffset 742799
> > > > > > > Found checkpoint 3024234 from changelog
> > > > > > > sensors-stream-aggregate-store-changelog-1 for store
> > aggregate-store.
> > > > > > > Restoring partition sensors-stream-aggregate-store-changelog-1
> > from
> > > > > > offset
> > > > > > > 3024234 to endOffset 3131513
> > > > > > > Found checkpoint 14514072 from changelog
> > > > > > > sensors-stream-aggregate-store-changelog-0 for store
> > aggregate-store.
> > > > > > > Restoring partition sensors-stream-aggregate-store-changelog-0
> > from
> > > > > > offset
> > > > > > > 14514072 to endOffset 17116574
> > > > > > > Restored from sensors-stream-aggregate-store-changelog-2
to
> > > > > > aggregate-store
> > > > > > > with 966 records, ending offset is 711432, next starting
> > position is
> > > > > > 711434
> > > > > > > Restored from sensors-stream-aggregate-store-changelog-2
to
> > > > > > aggregate-store
> > > > > > > with 914 records, ending offset is 712711, next starting
> > position is
> > > > > > 712713
> > > > > > > Restored from sensors-stream-aggregate-store-changelog-1
to
> > > > > > aggregate-store
> > > > > > > with 18 records, ending offset is 3024261, next starting
> > position is
> > > > > > 3024262
> > > > > > >
> > > > > > >
> > > > > > > why it first says it didn't find the checkpoint and then
it does
> > > > find it?
> > > > > > > It seems it loaded about  2.7M records (sum of offset difference
> > in
> > > > the
> > > > > > > "restorting partition ...." messages) right?
> > > > > > > Maybe should I try to reduce the checkpoint interval?
> > > > > > >
> > > > > > > Regards
> > > > > > >
> > > > > > > --
> > > > > > > Alessandro Tagliapietra
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John Roesler <vvcephei@apache.org
> > >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Alessandro,
> > > > > > > >
> > > > > > > > I'm sorry to hear that.
> > > > > > > >
> > > > > > > > The restore process only takes one factor into account:
the
> > current
> > > > > > offset
> > > > > > > > position of the changelog topic is stored in a local
file
> > > > alongside the
> > > > > > > > state stores. On startup, the app checks if the recorded
> > position
> > > > lags
> > > > > > the
> > > > > > > > latest offset in the changelog. If so, then it reads
the
> > missing
> > > > > > changelog
> > > > > > > > records before starting processing.
> > > > > > > >
> > > > > > > > Thus, it would not restore any old window data.
> > > > > > > >
> > > > > > > > There might be a few different things going on to
explain your
> > > > > > observation:
> > > > > > > > * if there is more than one instance in your Streams
cluster,
> > > > maybe the
> > > > > > > > task is "flopping" between instances, so each instance
still
> > has to
> > > > > > recover
> > > > > > > > state, since it wasn't the last one actively processing
it.
> > > > > > > > * if the application isn't stopped gracefully, it
might not
> > get a
> > > > > > chance
> > > > > > > > to record its offset in that local file, so on restart
it has
> > to
> > > > > > restore
> > > > > > > > some or all of the state store from changelog.
> > > > > > > >
> > > > > > > > Or it could be something else; that's just what comes
to mind.
> > > > > > > >
> > > > > > > > If you want to get to the bottom of it, you can take
a look at
> > the
> > > > > > logs,
> > > > > > > > paying close attention to which tasks are assigned
to which
> > > > instances
> > > > > > after
> > > > > > > > each restart. You can also look into the logs from
> > > > > > > >
> > `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
> > > > > > (might
> > > > > > > > want to set it to DEBUG or TRACE level to really see
what's
> > > > happening).
> > > > > > > >
> > > > > > > > I hope this helps!
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Sun, Dec 1, 2019, at 21:25, Alessandro Tagliapietra
wrote:
> > > > > > > > > Hello everyone,
> > > > > > > > >
> > > > > > > > > we're having a problem with bandwidth usage on
streams
> > > > application
> > > > > > > > startup,
> > > > > > > > > our current setup does this:
> > > > > > > > >
> > > > > > > > > ...
> > > > > > > > > .groupByKey()
> > > > > > > > >
> > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > > > > > > > .aggregate(
> > > > > > > > >         { MetricSequenceList(ArrayList()) },
> > > > > > > > >         { key, value, aggregate ->
> > > > > > > > >             aggregate.getRecords().add(value)
> > > > > > > > >             aggregate
> > > > > > > > >         },
> > > > > > > > >         Materialized.`as`<String, MetricSequenceList,
> > > > > > WindowStore<Bytes,
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > > > > > > > )
> > > > > > > > > .toStream()
> > > > > > > > > .flatTransform(TransformerSupplier {
> > > > > > > > > ...
> > > > > > > > >
> > > > > > > > > basically in each window we append the new values
and then do
> > > > some
> > > > > > other
> > > > > > > > > logic with the array of windowed values.
> > > > > > > > > The aggregate-store changelog topic configuration
 uses
> > > > > > compact,delete as
> > > > > > > > > cleanup policy and has 12 hours of retention.
> > > > > > > > >
> > > > > > > > > What we've seen is that on application startup
it takes a
> > couple
> > > > > > minutes
> > > > > > > > to
> > > > > > > > > rebuild the state store, even if the state store
directory is
> > > > > > persisted
> > > > > > > > > across restarts. That along with an exception
that caused the
> > > > docker
> > > > > > > > > container to be restarted a couple hundreds times
caused a
> > big
> > > > > > confluent
> > > > > > > > > cloud bill compared to what we usually spend
(1/4 of a full
> > > > month in
> > > > > > 1
> > > > > > > > day).
> > > > > > > > >
> > > > > > > > > What I think is happening is that the topic is
keeping all
> > the
> > > > > > previous
> > > > > > > > > windows even with the compacting policy because
each key is
> > the
> > > > > > original
> > > > > > > > > key + the timestamp not just the key. Since we
don't care
> > about
> > > > > > previous
> > > > > > > > > windows as the flatTransform after the toStream()
makes sure
> > > > that we
> > > > > > > > don't
> > > > > > > > > process old windows (a custom suppressor basically)
is there
> > a
> > > > way to
> > > > > > > > only
> > > > > > > > > keep the last window so that the store rebuilding
goes
> > faster and
> > > > > > without
> > > > > > > > > rebuilding old windows too? Or should I create
a custom
> > window
> > > > using
> > > > > > the
> > > > > > > > > original key as key so that the compaction keeps
only the
> > last
> > > > window
> > > > > > > > data?
> > > > > > > > >
> > > > > > > > > Thank you
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Alessandro Tagliapietra
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
View raw message