kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler" <vvcep...@apache.org>
Subject Re: Reducing streams startup bandwidth usage
Date Sat, 07 Dec 2019 17:54:45 GMT
Hmm, that’s a good question. Now that we’re talking about caching, I wonder if the cache
was just too small. It’s not very big by default. 

On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
> Ok I'll check on that!
> 
> Now I can see that with caching we went from 3-4MB/s to 400KB/s, that will
> help with the bill.
> 
> Last question, any reason why after a while the regular windowed stream
> starts sending every update instead of caching?
> Could it be because it doesn't have any more memory available? Any other
> possible reason?
> 
> Thank you so much for your help
> 
> --
> Alessandro Tagliapietra
> 
> 
> On Sat, Dec 7, 2019 at 9:14 AM John Roesler <vvcephei@apache.org> wrote:
> 
> > Ah, yes. Glad you figured it out!
> >
> > Caching does not reduce EOS guarantees at all. I highly recommend using
> > it. You might even want to take a look at the caching metrics to make sure
> > you have a good hit ratio.
> >
> > -John
> >
> > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote:
> > > Never mind I've found out I can use `.withCachingEnabled` on the store
> > > builder to achieve the same thing as the windowing example as
> > > `Materialized.as` turns that on by default.
> > >
> > > Does caching in any way reduces the EOS guarantees?
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
> > > tagliapietra.alessandro@gmail.com> wrote:
> > >
> > > > Seems my journey with this isn't done just yet,
> > > >
> > > > This seems very complicated to me but I'll try to explain it as best I
> > can.
> > > > To better understand the streams network usage I've used prometheus
> > with
> > > > the JMX exporter to export kafka metrics.
> > > > To check the amount of data we use I'm looking at the increments
> > > > of kafka_producer_topic_metrics_byte_total and
> > > > kafka_producer_producer_topic_metrics_record_send_total,
> > > >
> > > > Our current (before the change mentioned above) code looks like this:
> > > >
> > > > // This transformers just pairs a value with the previous one storing
> > the
> > > > temporary one in a store
> > > > val pairsStream = metricStream
> > > >   .transformValues(ValueTransformerWithKeySupplier { PairTransformer()
> > },
> > > > "LastValueStore")
> > > >   .filter { _, value: MetricSequence? -> value != null }
> > > >
> > > > // Create a store to store suppressed windows until a new one is
> > received
> > > > val suppressStoreSupplier =
> > > >
> > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
> > > > ......
> > > >
> > > > // Window and aggregate data in 1 minute intervals
> > > > val aggregatedStream = pairsStream
> > > >   .groupByKey()
> > > >   .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > >   .aggregate(
> > > >           { MetricSequenceList(ArrayList()) },
> > > >           { key, value, aggregate ->
> > > >               aggregate.getRecords().add(value)
> > > >               aggregate
> > > >           },
> > > >           Materialized.`as`<String, MetricSequenceList,
> > WindowStore<Bytes,
> > > >
> > ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > >   )
> > > >   .toStream()
> > > >   .flatTransform(TransformerSupplier {
> > > >       // This transformer basically waits until a new window is
> > received
> > > > to emit the previous one
> > > >   }, "suppress-store")
> > > >   .map { sensorId: String, suppressedOutput: SuppressedOutput ->
> > > >       .... etc ....
> > > >
> > > >
> > > > Basically:
> > > >  - all data goes through LastValueStore store that stores each message
> > and
> > > > emits a pair with the previous one
> > > >  - the aggregate-store is used to store the per-window list of
> > messages in
> > > > the aggregate method
> > > >  - the suppress store is used to store each received window which is
> > > > emitted only after a newer one is received
> > > >
> > > > What I'm experiencing is that:
> > > >  - during normal execution, the streams app sends to the lastvalue
> > store
> > > > changelog topic 5k messages/min, the aggregate and suppress store
> > changelog
> > > > topics only about 100
> > > >  - at some point (after many hours of operation), the streams app
> > starts
> > > > sending to the aggregate and suppress store changelog topic the same
> > amount
> > > > of messages going through the lastvaluestore
> > > >  - if I restart the streams app it goes back to the initial behavior
> > > >
> > > > You can see the behavior in this graph https://imgur.com/dJcUNSf
> > > > You can also see that after a restart everything goes back to normal
> > > > levels.
> > > > Regarding other metrics, process latency increases, poll latency
> > > > decreases, poll rate decreases, commit rate stays the same while commit
> > > > latency increases.
> > > >
> > > > Now, I've these questions:
> > > >  - why isn't the aggregate/suppress store changelog topic throughput
> > the
> > > > same as the LastValueStore? Shouldn't every time it aggregates send a
> > > > record to the changelog?
> > > >  - is the windowing doing some internal caching like not sending every
> > > > aggregation record until the window time is passed? (if so, where can
I
> > > > find that code since I would like to use that also for our new
> > > > implementation)
> > > >
> > > > Thank you in advance
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > >
> > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <vvcephei@apache.org>
> > wrote:
> > > >
> > > >> Oh, good!
> > > >>
> > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote:
> > > >> > Testing on staging shows that a restart on exception is much
faster
> > and
> > > >> the
> > > >> > stream starts right away which I think means we're reading way
less
> > data
> > > >> > than before!
> > > >> >
> > > >> > What I was referring to is that, in Streams, the keys for window
> > > >> > > aggregation state is actually composed of both the window
itself
> > and
> > > >> the
> > > >> > > key. In the DSL, it looks like "Windowed<K>". That
results in the
> > > >> store
> > > >> > > having a unique key per window for each K, which is why
we need
> > > >> retention
> > > >> > > as well as compaction for our changelogs. But for you, if
you just
> > > >> make the
> > > >> > > key "K", then compaction alone should do the trick.
> > > >> >
> > > >> > Yes we had compact,delete as cleanup policy but probably it still
> > had a
> > > >> too
> > > >> > long retention value, also the rocksdb store is probably much
> > faster now
> > > >> > having only one key per key instead of one key per window per
key.
> > > >> >
> > > >> > Thanks a lot for helping! I'm now going to setup a prometheus-jmx
> > > >> > monitoring so we can keep better track of what's going on :)
> > > >> >
> > > >> > --
> > > >> > Alessandro Tagliapietra
> > > >> >
> > > >> >
> > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <vvcephei@apache.org>
> > > >> wrote:
> > > >> >
> > > >> > > Oh, yeah, I remember that conversation!
> > > >> > >
> > > >> > > Yes, then, I agree, if you're only storing state of the
most
> > recent
> > > >> window
> > > >> > > for each key, and the key you use for that state is actually
the
> > key
> > > >> of the
> > > >> > > records, then an aggressive compaction policy plus your
custom
> > > >> transformer
> > > >> > > seems like a good way forward.
> > > >> > >
> > > >> > > What I was referring to is that, in Streams, the keys for
window
> > > >> > > aggregation state is actually composed of both the window
itself
> > and
> > > >> the
> > > >> > > key. In the DSL, it looks like "Windowed<K>". That
results in the
> > > >> store
> > > >> > > having a unique key per window for each K, which is why
we need
> > > >> retention
> > > >> > > as well as compaction for our changelogs. But for you, if
you just
> > > >> make the
> > > >> > > key "K", then compaction alone should do the trick.
> > > >> > >
> > > >> > > And yes, if you manage the topic yourself, then Streams
won't
> > adjust
> > > >> the
> > > >> > > retention time. I think it might validate that the retention
> > isn't too
> > > >> > > short, but I don't remember offhand.
> > > >> > >
> > > >> > > Cheers, and let me know how it goes!
> > > >> > > -John
> > > >> > >
> > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro Tagliapietra wrote:
> > > >> > > > Hi John,
> > > >> > > >
> > > >> > > > afaik grace period uses stream time
> > > >> > > >
> > > >> > >
> > > >>
> > https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
> > > >> > > > which is
> > > >> > > > per partition, unfortunately we process data that's
not in sync
> > > >> between
> > > >> > > > keys so each key needs to be independent and a key
can have much
> > > >> older
> > > >> > > > data
> > > >> > > > than the other.
> > > >> > > >
> > > >> > > > Having a small grace period would probably close old
windows
> > sooner
> > > >> than
> > > >> > > > expected. That's also why in my use case a custom store
that
> > just
> > > >> stores
> > > >> > > > the last window data for each key might work better.
I had the
> > same
> > > >> issue
> > > >> > > > with suppression and it has been reported here
> > > >> > > > https://issues.apache.org/jira/browse/KAFKA-8769
> > > >> > > > Oh I just saw that you're the one that helped me on
slack and
> > > >> created the
> > > >> > > > issue (thanks again for that).
> > > >> > > >
> > > >> > > > The behavior that you mention about streams setting
changelog
> > > >> retention
> > > >> > > > time is something they do on creation of the topic
when the
> > broker
> > > >> has
> > > >> > > auto
> > > >> > > > creation enabled? Because we're using confluent cloud
and I had
> > to
> > > >> create
> > > >> > > > it manually.
> > > >> > > > Regarding the change in the recovery behavior, with
compact
> > cleanup
> > > >> > > policy
> > > >> > > > shouldn't the changelog only keep the last value? That
would
> > make
> > > >> the
> > > >> > > > recovery faster and "cheaper" as it would only need
to read a
> > single
> > > >> > > value
> > > >> > > > per key (if the cleanup just happened) right?
> > > >> > > >
> > > >> > > > --
> > > >> > > > Alessandro Tagliapietra
> > > >> > > >
> > > >> > > >
> > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler <
> > vvcephei@apache.org>
> > > >> wrote:
> > > >> > > >
> > > >> > > > > Hey Alessandro,
> > > >> > > > >
> > > >> > > > > That sounds also like it would work. I'm wondering
if it would
> > > >> actually
> > > >> > > > > change what you observe w.r.t. recovery behavior,
though.
> > Streams
> > > >> > > already
> > > >> > > > > sets the retention time on the changelog to equal
the
> > retention
> > > >> time
> > > >> > > of the
> > > >> > > > > windows, for windowed aggregations, so you shouldn't
be
> > loading a
> > > >> lot
> > > >> > > of
> > > >> > > > > window data for old windows you no longer care
about.
> > > >> > > > >
> > > >> > > > > Have you set the "grace period" on your window
definition? By
> > > >> default,
> > > >> > > it
> > > >> > > > > is set to 24 hours, but you can set it as low
as you like.
> > E.g.,
> > > >> if you
> > > >> > > > > want to commit to having in-order data only, then
you can set
> > the
> > > >> grace
> > > >> > > > > period to zero. This _should_ let the broker clean
up the
> > > >> changelog
> > > >> > > records
> > > >> > > > > as soon as the window ends.
> > > >> > > > >
> > > >> > > > > Of course, the log cleaner doesn't run all the
time, so
> > there's
> > > >> some
> > > >> > > extra
> > > >> > > > > delay in which "expired" data would still be visible
in the
> > > >> changelog,
> > > >> > > but
> > > >> > > > > it would actually be just the same as if you manage
the store
> > > >> yourself.
> > > >> > > > >
> > > >> > > > > Hope this helps!
> > > >> > > > > -John
> > > >> > > > >
> > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra
wrote:
> > > >> > > > > > Thanks John for the explanation,
> > > >> > > > > >
> > > >> > > > > > I thought that with EOS enabled (which we
have) it would in
> > the
> > > >> worst
> > > >> > > > > case
> > > >> > > > > > find a valid checkpoint and start the restore
from there
> > until
> > > >> it
> > > >> > > reached
> > > >> > > > > > the last committed status, not completely
from scratch. What
> > > >> you say
> > > >> > > > > > definitely makes sense now.
> > > >> > > > > > Since we don't really need old time windows
and we ensure
> > data
> > > >> is
> > > >> > > ordered
> > > >> > > > > > when processed I think I"ll just write a
custom transformer
> > to
> > > >> keep
> > > >> > > only
> > > >> > > > > > the last window, store intermediate aggregation
results in
> > the
> > > >> store
> > > >> > > and
> > > >> > > > > > emit a new value only when we receive data
belonging to a
> > new
> > > >> window.
> > > >> > > > > > That with a compact only changelog topic
should keep the
> > rebuild
> > > >> > > data to
> > > >> > > > > > the minimum as it would have only the last
value for each
> > key.
> > > >> > > > > >
> > > >> > > > > > Hope that makes sense
> > > >> > > > > >
> > > >> > > > > > Thanks again
> > > >> > > > > >
> > > >> > > > > > --
> > > >> > > > > > Alessandro Tagliapietra
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler
<
> > > >> vvcephei@apache.org>
> > > >> > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi Alessandro,
> > > >> > > > > > >
> > > >> > > > > > > To take a stab at your question, maybe
it first doesn't
> > find
> > > >> it,
> > > >> > > but
> > > >> > > > > then
> > > >> > > > > > > restores some data, writes the checkpoint,
and then later
> > on,
> > > >> it
> > > >> > > has to
> > > >> > > > > > > re-initialize the task for some reason,
and that's why it
> > does
> > > >> > > find a
> > > >> > > > > > > checkpoint then?
> > > >> > > > > > >
> > > >> > > > > > > More to the heart of the issue, if you
have EOS enabled,
> > > >> Streams
> > > >> > > _only_
> > > >> > > > > > > records the checkpoint when the store
is in a
> > known-consistent
> > > >> > > state.
> > > >> > > > > For
> > > >> > > > > > > example, if you have a graceful shutdown,
Streams will
> > flush
> > > >> all
> > > >> > > the
> > > >> > > > > > > stores, commit all the transactions,
and then write the
> > > >> checkpoint
> > > >> > > > > file.
> > > >> > > > > > > Then, on re-start, it will pick up from
that checkpoint.
> > > >> > > > > > >
> > > >> > > > > > > But as soon as it starts processing
records, it removes
> > the
> > > >> > > checkpoint
> > > >> > > > > > > file, so if it crashes while it was
processing, there is
> > no
> > > >> > > checkpoint
> > > >> > > > > file
> > > >> > > > > > > there, and it will have to restore from
the beginning of
> > the
> > > >> > > changelog.
> > > >> > > > > > >
> > > >> > > > > > > This design is there on purpose, because
otherwise we
> > cannot
> > > >> > > actually
> > > >> > > > > > > guarantee correctness... For example,
if you are
> > maintaining a
> > > >> > > count
> > > >> > > > > > > operation, and we process an input record
i, increment the
> > > >> count
> > > >> > > and
> > > >> > > > > write
> > > >> > > > > > > it to the state store, and to the changelog
topic. But we
> > > >> crash
> > > >> > > before
> > > >> > > > > we
> > > >> > > > > > > commit that transaction. Then, the write
to the changelog
> > > >> would be
> > > >> > > > > aborted,
> > > >> > > > > > > and we would re-process record i . However,
we've already
> > > >> updated
> > > >> > > the
> > > >> > > > > local
> > > >> > > > > > > state store, so when we increment it
again, it results in
> > > >> > > > > double-counting
> > > >> > > > > > > i. The key point here is that there's
no way to do an
> > atomic
> > > >> > > operation
> > > >> > > > > > > across two different systems (local
state store and the
> > > >> changelog
> > > >> > > > > topic).
> > > >> > > > > > > Since we can't guarantee that we roll
back the incremented
> > > >> count
> > > >> > > when
> > > >> > > > > the
> > > >> > > > > > > changelog transaction is aborted, we
can't keep the local
> > > >> store
> > > >> > > > > consistent
> > > >> > > > > > > with the changelog.
> > > >> > > > > > >
> > > >> > > > > > > After a crash, the only way to ensure
the local store is
> > > >> consistent
> > > >> > > > > with
> > > >> > > > > > > the changelog is to discard the entire
thing and rebuild
> > it.
> > > >> This
> > > >> > > is
> > > >> > > > > why we
> > > >> > > > > > > have an invariant that the checkpoint
file only exists
> > when we
> > > >> > > _know_
> > > >> > > > > that
> > > >> > > > > > > the local store is consistent with the
changelog, and
> > this is
> > > >> why
> > > >> > > > > you're
> > > >> > > > > > > seeing so much bandwidth when re-starting
from an unclean
> > > >> shutdown.
> > > >> > > > > > >
> > > >> > > > > > > Note that it's definitely possible to
do better than this,
> > > >> and we
> > > >> > > would
> > > >> > > > > > > very much like to improve it in the
future.
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > > -John
> > > >> > > > > > >
> > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro
Tagliapietra
> > wrote:
> > > >> > > > > > > > Hi John,
> > > >> > > > > > > >
> > > >> > > > > > > > thanks a lot for helping, regarding
your message:
> > > >> > > > > > > >  - no we only have 1 instance of
the stream application,
> > > >> and it
> > > >> > > > > always
> > > >> > > > > > > > re-uses the same state folder
> > > >> > > > > > > >  - yes we're seeing most issues
when restarting not
> > > >> gracefully
> > > >> > > due
> > > >> > > > > > > exception
> > > >> > > > > > > >
> > > >> > > > > > > > I've enabled trace logging and
filtering by a single
> > state
> > > >> store
> > > >> > > the
> > > >> > > > > > > > StoreChangelogReader messages are:
> > > >> > > > > > > >
> > > >> > > > > > > > Added restorer for changelog
> > > >> > > > > sensors-stream-aggregate-store-changelog-0
> > > >> > > > > > > > Added restorer for changelog
> > > >> > > > > sensors-stream-aggregate-store-changelog-1
> > > >> > > > > > > > Added restorer for changelog
> > > >> > > > > sensors-stream-aggregate-store-changelog-2
> > > >> > > > > > > > Did not find checkpoint from changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2
for store
> > > >> > > aggregate-store,
> > > >> > > > > > > > rewinding to beginning.
> > > >> > > > > > > > Did not find checkpoint from changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1
for store
> > > >> > > aggregate-store,
> > > >> > > > > > > > rewinding to beginning.
> > > >> > > > > > > > Did not find checkpoint from changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0
for store
> > > >> > > aggregate-store,
> > > >> > > > > > > > rewinding to beginning.
> > > >> > > > > > > > No checkpoint found for task 0_2
state store
> > aggregate-store
> > > >> > > > > changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2
with EOS
> > turned
> > > >> on.
> > > >> > > > > > > > Reinitializing the task and restore
its state from the
> > > >> beginning.
> > > >> > > > > > > > No checkpoint found for task 0_1
state store
> > aggregate-store
> > > >> > > > > changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1
with EOS
> > turned
> > > >> on.
> > > >> > > > > > > > Reinitializing the task and restore
its state from the
> > > >> beginning.
> > > >> > > > > > > > No checkpoint found for task 0_0
state store
> > aggregate-store
> > > >> > > > > changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0
with EOS
> > turned
> > > >> on.
> > > >> > > > > > > > Reinitializing the task and restore
its state from the
> > > >> beginning.
> > > >> > > > > > > > Found checkpoint 709937 from changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2
for store
> > > >> > > aggregate-store.
> > > >> > > > > > > > Restoring partition
> > > >> sensors-stream-aggregate-store-changelog-2
> > > >> > > from
> > > >> > > > > > > offset
> > > >> > > > > > > > 709937 to endOffset 742799
> > > >> > > > > > > > Found checkpoint 3024234 from changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1
for store
> > > >> > > aggregate-store.
> > > >> > > > > > > > Restoring partition
> > > >> sensors-stream-aggregate-store-changelog-1
> > > >> > > from
> > > >> > > > > > > offset
> > > >> > > > > > > > 3024234 to endOffset 3131513
> > > >> > > > > > > > Found checkpoint 14514072 from
changelog
> > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0
for store
> > > >> > > aggregate-store.
> > > >> > > > > > > > Restoring partition
> > > >> sensors-stream-aggregate-store-changelog-0
> > > >> > > from
> > > >> > > > > > > offset
> > > >> > > > > > > > 14514072 to endOffset 17116574
> > > >> > > > > > > > Restored from
> > sensors-stream-aggregate-store-changelog-2 to
> > > >> > > > > > > aggregate-store
> > > >> > > > > > > > with 966 records, ending offset
is 711432, next starting
> > > >> > > position is
> > > >> > > > > > > 711434
> > > >> > > > > > > > Restored from
> > sensors-stream-aggregate-store-changelog-2 to
> > > >> > > > > > > aggregate-store
> > > >> > > > > > > > with 914 records, ending offset
is 712711, next starting
> > > >> > > position is
> > > >> > > > > > > 712713
> > > >> > > > > > > > Restored from
> > sensors-stream-aggregate-store-changelog-1 to
> > > >> > > > > > > aggregate-store
> > > >> > > > > > > > with 18 records, ending offset
is 3024261, next starting
> > > >> > > position is
> > > >> > > > > > > 3024262
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > why it first says it didn't find
the checkpoint and
> > then it
> > > >> does
> > > >> > > > > find it?
> > > >> > > > > > > > It seems it loaded about  2.7M
records (sum of offset
> > > >> difference
> > > >> > > in
> > > >> > > > > the
> > > >> > > > > > > > "restorting partition ...." messages)
right?
> > > >> > > > > > > > Maybe should I try to reduce the
checkpoint interval?
> > > >> > > > > > > >
> > > >> > > > > > > > Regards
> > > >> > > > > > > >
> > > >> > > > > > > > --
> > > >> > > > > > > > Alessandro Tagliapietra
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM
John Roesler <
> > > >> vvcephei@apache.org
> > > >> > > >
> > > >> > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > Hi Alessandro,
> > > >> > > > > > > > >
> > > >> > > > > > > > > I'm sorry to hear that.
> > > >> > > > > > > > >
> > > >> > > > > > > > > The restore process only takes
one factor into
> > account:
> > > >> the
> > > >> > > current
> > > >> > > > > > > offset
> > > >> > > > > > > > > position of the changelog
topic is stored in a local
> > file
> > > >> > > > > alongside the
> > > >> > > > > > > > > state stores. On startup,
the app checks if the
> > recorded
> > > >> > > position
> > > >> > > > > lags
> > > >> > > > > > > the
> > > >> > > > > > > > > latest offset in the changelog.
If so, then it reads
> > the
> > > >> > > missing
> > > >> > > > > > > changelog
> > > >> > > > > > > > > records before starting processing.
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thus, it would not restore
any old window data.
> > > >> > > > > > > > >
> > > >> > > > > > > > > There might be a few different
things going on to
> > explain
> > > >> your
> > > >> > > > > > > observation:
> > > >> > > > > > > > > * if there is more than one
instance in your Streams
> > > >> cluster,
> > > >> > > > > maybe the
> > > >> > > > > > > > > task is "flopping" between
instances, so each instance
> > > >> still
> > > >> > > has to
> > > >> > > > > > > recover
> > > >> > > > > > > > > state, since it wasn't the
last one actively
> > processing
> > > >> it.
> > > >> > > > > > > > > * if the application isn't
stopped gracefully, it
> > might
> > > >> not
> > > >> > > get a
> > > >> > > > > > > chance
> > > >> > > > > > > > > to record its offset in that
local file, so on
> > restart it
> > > >> has
> > > >> > > to
> > > >> > > > > > > restore
> > > >> > > > > > > > > some or all of the state store
from changelog.
> > > >> > > > > > > > >
> > > >> > > > > > > > > Or it could be something else;
that's just what comes
> > to
> > > >> mind.
> > > >> > > > > > > > >
> > > >> > > > > > > > > If you want to get to the
bottom of it, you can take a
> > > >> look at
> > > >> > > the
> > > >> > > > > > > logs,
> > > >> > > > > > > > > paying close attention to
which tasks are assigned to
> > > >> which
> > > >> > > > > instances
> > > >> > > > > > > after
> > > >> > > > > > > > > each restart. You can also
look into the logs from
> > > >> > > > > > > > >
> > > >> > >
> > `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
> > > >> > > > > > > (might
> > > >> > > > > > > > > want to set it to DEBUG or
TRACE level to really see
> > > >> what's
> > > >> > > > > happening).
> > > >> > > > > > > > >
> > > >> > > > > > > > > I hope this helps!
> > > >> > > > > > > > > -John
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25,
Alessandro Tagliapietra
> > > >> wrote:
> > > >> > > > > > > > > > Hello everyone,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > we're having a problem
with bandwidth usage on
> > streams
> > > >> > > > > application
> > > >> > > > > > > > > startup,
> > > >> > > > > > > > > > our current setup does
this:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > ...
> > > >> > > > > > > > > > .groupByKey()
> > > >> > > > > > > > > >
> > > >> > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > >> > > > > > > > > > .aggregate(
> > > >> > > > > > > > > >         { MetricSequenceList(ArrayList())
},
> > > >> > > > > > > > > >         { key, value,
aggregate ->
> > > >> > > > > > > > > >             aggregate.getRecords().add(value)
> > > >> > > > > > > > > >             aggregate
> > > >> > > > > > > > > >         },
> > > >> > > > > > > > > >         Materialized.`as`<String,
> > MetricSequenceList,
> > > >> > > > > > > WindowStore<Bytes,
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > >
> > > >> > > > >
> > > >> > >
> > > >>
> > ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > >> > > > > > > > > > )
> > > >> > > > > > > > > > .toStream()
> > > >> > > > > > > > > > .flatTransform(TransformerSupplier
{
> > > >> > > > > > > > > > ...
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > basically in each window
we append the new values
> > and
> > > >> then do
> > > >> > > > > some
> > > >> > > > > > > other
> > > >> > > > > > > > > > logic with the array
of windowed values.
> > > >> > > > > > > > > > The aggregate-store changelog
topic configuration
> > uses
> > > >> > > > > > > compact,delete as
> > > >> > > > > > > > > > cleanup policy and has
12 hours of retention.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > What we've seen is that
on application startup it
> > takes
> > > >> a
> > > >> > > couple
> > > >> > > > > > > minutes
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > rebuild the state store,
even if the state store
> > > >> directory is
> > > >> > > > > > > persisted
> > > >> > > > > > > > > > across restarts. That
along with an exception that
> > > >> caused the
> > > >> > > > > docker
> > > >> > > > > > > > > > container to be restarted
a couple hundreds times
> > > >> caused a
> > > >> > > big
> > > >> > > > > > > confluent
> > > >> > > > > > > > > > cloud bill compared to
what we usually spend (1/4
> > of a
> > > >> full
> > > >> > > > > month in
> > > >> > > > > > > 1
> > > >> > > > > > > > > day).
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > What I think is happening
is that the topic is
> > keeping
> > > >> all
> > > >> > > the
> > > >> > > > > > > previous
> > > >> > > > > > > > > > windows even with the
compacting policy because each
> > > >> key is
> > > >> > > the
> > > >> > > > > > > original
> > > >> > > > > > > > > > key + the timestamp not
just the key. Since we don't
> > > >> care
> > > >> > > about
> > > >> > > > > > > previous
> > > >> > > > > > > > > > windows as the flatTransform
after the toStream()
> > makes
> > > >> sure
> > > >> > > > > that we
> > > >> > > > > > > > > don't
> > > >> > > > > > > > > > process old windows (a
custom suppressor basically)
> > is
> > > >> there
> > > >> > > a
> > > >> > > > > way to
> > > >> > > > > > > > > only
> > > >> > > > > > > > > > keep the last window
so that the store rebuilding
> > goes
> > > >> > > faster and
> > > >> > > > > > > without
> > > >> > > > > > > > > > rebuilding old windows
too? Or should I create a
> > custom
> > > >> > > window
> > > >> > > > > using
> > > >> > > > > > > the
> > > >> > > > > > > > > > original key as key so
that the compaction keeps
> > only
> > > >> the
> > > >> > > last
> > > >> > > > > window
> > > >> > > > > > > > > data?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Thank you
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > --
> > > >> > > > > > > > > > Alessandro Tagliapietra
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Mime
View raw message