kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sophie Blee-Goldman <sop...@confluent.io>
Subject Re: Reducing streams startup bandwidth usage
Date Tue, 10 Dec 2019 03:01:35 GMT
Alright, well I see why you have so much data being sent to the changelog
if each
update involves appending to a list and then writing in the whole list. And
with 340
records/minute I'm actually not sure how the cache could really help at all
when it's
being flushed every 100ms.

Here's kind of a wild idea, if you really only need append semantics: what
if you wrote
a custom StateStore that wrapped the normal RocksDBStore (or
RocksDBWindowStore)
and did the append for you under the hood? The changelogging layer sits
between the
layer that you would call #put on in your transformer and the final layer
that actually writes
to the underlying storage engine. If you insert an extra layer and modify
your transformer
to only call put on the new data (rather than the entire list) then only
this new data will get
sent to the changelog. Your custom storage layer will know it's actually
append semantics,
and add the new data to the existing list before sending it on to RocksDB.

Since you only ever have one window per key in the store (right?) you just
need to make
sure that nothing from the current window gets deleted prematurely. You'd
want to turn off
compaction on the changelog and caching on the store of course, and maybe
give the
changelog some extra retention time to be safe.

Obviously I haven't thoroughly verified this alternative, but it seems like
this approach (or
something to its effect) could help you cut down on the changelog data.
WDYT?

On Mon, Dec 9, 2019 at 4:35 PM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Hi Sophie,
>
> Just to give a better context, yes we use EOS and the problem happens in
> our aggregation store.
> Basically when windowing data we append each record into a list that's
> stored in the aggregation store.
> We have 2 versions, in production we use the kafka streams windowing API,
> in staging we manually calculate the window end timestamp and aggregate
> using that timestamp.
>
> To give you an example of the staging code, it's a simple transformer that:
>  - if incoming data fits in the same window as the data in store, append
> the data to the existing store list overwriting the same key and nothing is
> sent downstream
>  - if incoming data has a timestamp smaller than the existing store data,
> discard the record
>  - if incoming data has a timestamp bigger than the existing store data,
> send the stored list downstream and store the new window data into the
> store
>
> This way we don't use multiple keys (kafka streams instead uses a store
> where each key is stream-key + window key) as we overwrite the store data
> using the same key over and over.
> So what I would expect is that since we're overwriting the same keys there
> isn't more  and more data to be cached as the number of keys are always the
> same and we don't really need to cache more data over time.
>
> To respond to your questions:
>  - yes when I say that cache "stopped/started" working I mean that at some
> point the store started sending more and more data to che changelog topic
> and then suddenly stopped again even without a restart (a restart always
> fixes the problem).
>  - Yes there are no density changes in the input stream, I've checked the
> number of records sent to the stream input topic and there is a variation
> of ~10-20 records per minute on an average of 340 records per minute. Most
> of the records are also generated by simulators with very predictable
> output rate.
>
> In the meantime I've enabled reporting of debug metrics (so including cache
> hit ratio) to hopefully get better insights the next time it happens.
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>
> On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman <sophie@confluent.io>
> wrote:
>
> > It's an LRU cache, so once it gets full new records will cause older ones
> > to be evicted (and thus sent
> > downstream). Of course this should only apply to records of a different
> > key, otherwise it will just cause
> > an update of that key in the cache.
> >
> > I missed that you were using EOS, given the short commit interval it's
> hard
> > to see those effects.
> > When you say that it stopped working and then appeared to start working
> > again, are you just
> > referring to the amount of data being sent to the changelog? And you can
> > definitely rule out differences
> > in the density of updates in the input stream?
> >
> >
> >
> > On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra <
> > tagliapietra.alessandro@gmail.com> wrote:
> >
> > > Hi Sophie,
> > >
> > > thanks fo helping.
> > >
> > > By eviction of older records you mean they get flushed to the changelog
> > > topic?
> > > Or the cache is just full and so all new records go to the changelog
> > topic
> > > until the old ones are evicted?
> > >
> > > Regarding the timing, what timing do you mean? Between when the cache
> > stops
> > > and starts working again? We're using EOS os I believe the commit
> > interval
> > > is every 100ms.
> > >
> > > Regards
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > >
> > > On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman <
> sophie@confluent.io
> > >
> > > wrote:
> > >
> > > > It might be that the cache appears to "stop working" because it gets
> > > full,
> > > > and each
> > > > new update causes an eviction (of some older record). This would also
> > > > explain the
> > > > opposite behavior, that it "starts working" again after some time
> > without
> > > > being restarted,
> > > > since the cache is completely flushed on commit. Does the timing seem
> > to
> > > > align with your
> > > > commit interval (default is 30s)?
> > > >
> > > > On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
> > > > tagliapietra.alessandro@gmail.com> wrote:
> > > >
> > > > > And it seems that for some reason after a while caching works again
> > > > > without a restart of the streams application
> > > > >
> > > > > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
> > > > >
> > > > > I'll try to enable debug metrics and see if I can find something
> > useful
> > > > > there.
> > > > > Any idea is appreciated in the meantime :)
> > > > >
> > > > > --
> > > > > Alessandro Tagliapietra
> > > > >
> > > > > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
> > > > > tagliapietra.alessandro@gmail.com> wrote:
> > > > >
> > > > >> It seems that even with caching enabled, after a while the sent
> > bytes
> > > > >> stil go up
> > > > >>
> > > > >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
> > > > >>
> > > > >> you can see the deploy when I've enabled caching but it looks like
> > > it's
> > > > >> still a temporary solution.
> > > > >>
> > > > >> --
> > > > >> Alessandro Tagliapietra
> > > > >>
> > > > >>
> > > > >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
> > > > >> tagliapietra.alessandro@gmail.com> wrote:
> > > > >>
> > > > >>> Could be, but since we have a limite amount of input keys (~30),
> > > > >>> windowing generates new keys but old ones are never touched again
> > > > since the
> > > > >>> data per key is in order, I assume it shouldn't be a big deal for
> > it
> > > to
> > > > >>> handle 30 keys
> > > > >>> I'll have a look at cache metrics and see if something pops out
> > > > >>>
> > > > >>> Thanks
> > > > >>>
> > > > >>> --
> > > > >>> Alessandro Tagliapietra
> > > > >>>
> > > > >>>
> > > > >>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler <
> vvcephei@apache.org>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Hmm, that’s a good question. Now that we’re talking about
> > caching, I
> > > > >>>> wonder if the cache was just too small. It’s not very big by
> > > default.
> > > > >>>>
> > > > >>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
> > > > >>>> > Ok I'll check on that!
> > > > >>>> >
> > > > >>>> > Now I can see that with caching we went from 3-4MB/s to
> 400KB/s,
> > > > that
> > > > >>>> will
> > > > >>>> > help with the bill.
> > > > >>>> >
> > > > >>>> > Last question, any reason why after a while the regular
> windowed
> > > > >>>> stream
> > > > >>>> > starts sending every update instead of caching?
> > > > >>>> > Could it be because it doesn't have any more memory available?
> > Any
> > > > >>>> other
> > > > >>>> > possible reason?
> > > > >>>> >
> > > > >>>> > Thank you so much for your help
> > > > >>>> >
> > > > >>>> > --
> > > > >>>> > Alessandro Tagliapietra
> > > > >>>> >
> > > > >>>> >
> > > > >>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <
> > vvcephei@apache.org>
> > > > >>>> wrote:
> > > > >>>> >
> > > > >>>> > > Ah, yes. Glad you figured it out!
> > > > >>>> > >
> > > > >>>> > > Caching does not reduce EOS guarantees at all. I highly
> > > recommend
> > > > >>>> using
> > > > >>>> > > it. You might even want to take a look at the caching
> metrics
> > to
> > > > >>>> make sure
> > > > >>>> > > you have a good hit ratio.
> > > > >>>> > >
> > > > >>>> > > -John
> > > > >>>> > >
> > > > >>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra
> wrote:
> > > > >>>> > > > Never mind I've found out I can use `.withCachingEnabled`
> on
> > > the
> > > > >>>> store
> > > > >>>> > > > builder to achieve the same thing as the windowing example
> > as
> > > > >>>> > > > `Materialized.as` turns that on by default.
> > > > >>>> > > >
> > > > >>>> > > > Does caching in any way reduces the EOS guarantees?
> > > > >>>> > > >
> > > > >>>> > > > --
> > > > >>>> > > > Alessandro Tagliapietra
> > > > >>>> > > >
> > > > >>>> > > >
> > > > >>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
> > > > >>>> > > > tagliapietra.alessandro@gmail.com> wrote:
> > > > >>>> > > >
> > > > >>>> > > > > Seems my journey with this isn't done just yet,
> > > > >>>> > > > >
> > > > >>>> > > > > This seems very complicated to me but I'll try to
> explain
> > it
> > > > as
> > > > >>>> best I
> > > > >>>> > > can.
> > > > >>>> > > > > To better understand the streams network usage I've used
> > > > >>>> prometheus
> > > > >>>> > > with
> > > > >>>> > > > > the JMX exporter to export kafka metrics.
> > > > >>>> > > > > To check the amount of data we use I'm looking at the
> > > > increments
> > > > >>>> > > > > of kafka_producer_topic_metrics_byte_total and
> > > > >>>> > > > > kafka_producer_producer_topic_metrics_record_send_total,
> > > > >>>> > > > >
> > > > >>>> > > > > Our current (before the change mentioned above) code
> looks
> > > > like
> > > > >>>> this:
> > > > >>>> > > > >
> > > > >>>> > > > > // This transformers just pairs a value with the
> previous
> > > one
> > > > >>>> storing
> > > > >>>> > > the
> > > > >>>> > > > > temporary one in a store
> > > > >>>> > > > > val pairsStream = metricStream
> > > > >>>> > > > >   .transformValues(ValueTransformerWithKeySupplier {
> > > > >>>> PairTransformer()
> > > > >>>> > > },
> > > > >>>> > > > > "LastValueStore")
> > > > >>>> > > > >   .filter { _, value: MetricSequence? -> value != null }
> > > > >>>> > > > >
> > > > >>>> > > > > // Create a store to store suppressed windows until a
> new
> > > one
> > > > is
> > > > >>>> > > received
> > > > >>>> > > > > val suppressStoreSupplier =
> > > > >>>> > > > >
> > > > >>>> > >
> > > > >>>>
> > > >
> > >
> >
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
> > > > >>>> > > > > ......
> > > > >>>> > > > >
> > > > >>>> > > > > // Window and aggregate data in 1 minute intervals
> > > > >>>> > > > > val aggregatedStream = pairsStream
> > > > >>>> > > > >   .groupByKey()
> > > > >>>> > > > >
> > > >  .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > > >>>> > > > >   .aggregate(
> > > > >>>> > > > >           { MetricSequenceList(ArrayList()) },
> > > > >>>> > > > >           { key, value, aggregate ->
> > > > >>>> > > > >               aggregate.getRecords().add(value)
> > > > >>>> > > > >               aggregate
> > > > >>>> > > > >           },
> > > > >>>> > > > >           Materialized.`as`<String, MetricSequenceList,
> > > > >>>> > > WindowStore<Bytes,
> > > > >>>> > > > >
> > > > >>>> > >
> > > > >>>>
> > > >
> > >
> >
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > > >>>> > > > >   )
> > > > >>>> > > > >   .toStream()
> > > > >>>> > > > >   .flatTransform(TransformerSupplier {
> > > > >>>> > > > >       // This transformer basically waits until a new
> > window
> > > > is
> > > > >>>> > > received
> > > > >>>> > > > > to emit the previous one
> > > > >>>> > > > >   }, "suppress-store")
> > > > >>>> > > > >   .map { sensorId: String, suppressedOutput:
> > > SuppressedOutput
> > > > ->
> > > > >>>> > > > >       .... etc ....
> > > > >>>> > > > >
> > > > >>>> > > > >
> > > > >>>> > > > > Basically:
> > > > >>>> > > > >  - all data goes through LastValueStore store that
> stores
> > > each
> > > > >>>> message
> > > > >>>> > > and
> > > > >>>> > > > > emits a pair with the previous one
> > > > >>>> > > > >  - the aggregate-store is used to store the per-window
> > list
> > > of
> > > > >>>> > > messages in
> > > > >>>> > > > > the aggregate method
> > > > >>>> > > > >  - the suppress store is used to store each received
> > window
> > > > >>>> which is
> > > > >>>> > > > > emitted only after a newer one is received
> > > > >>>> > > > >
> > > > >>>> > > > > What I'm experiencing is that:
> > > > >>>> > > > >  - during normal execution, the streams app sends to the
> > > > >>>> lastvalue
> > > > >>>> > > store
> > > > >>>> > > > > changelog topic 5k messages/min, the aggregate and
> > suppress
> > > > >>>> store
> > > > >>>> > > changelog
> > > > >>>> > > > > topics only about 100
> > > > >>>> > > > >  - at some point (after many hours of operation), the
> > > streams
> > > > >>>> app
> > > > >>>> > > starts
> > > > >>>> > > > > sending to the aggregate and suppress store changelog
> > topic
> > > > the
> > > > >>>> same
> > > > >>>> > > amount
> > > > >>>> > > > > of messages going through the lastvaluestore
> > > > >>>> > > > >  - if I restart the streams app it goes back to the
> > initial
> > > > >>>> behavior
> > > > >>>> > > > >
> > > > >>>> > > > > You can see the behavior in this graph
> > > > >>>> https://imgur.com/dJcUNSf
> > > > >>>> > > > > You can also see that after a restart everything goes
> back
> > > to
> > > > >>>> normal
> > > > >>>> > > > > levels.
> > > > >>>> > > > > Regarding other metrics, process latency increases, poll
> > > > latency
> > > > >>>> > > > > decreases, poll rate decreases, commit rate stays the
> same
> > > > >>>> while commit
> > > > >>>> > > > > latency increases.
> > > > >>>> > > > >
> > > > >>>> > > > > Now, I've these questions:
> > > > >>>> > > > >  - why isn't the aggregate/suppress store changelog
> topic
> > > > >>>> throughput
> > > > >>>> > > the
> > > > >>>> > > > > same as the LastValueStore? Shouldn't every time it
> > > aggregates
> > > > >>>> send a
> > > > >>>> > > > > record to the changelog?
> > > > >>>> > > > >  - is the windowing doing some internal caching like not
> > > > >>>> sending every
> > > > >>>> > > > > aggregation record until the window time is passed? (if
> > so,
> > > > >>>> where can I
> > > > >>>> > > > > find that code since I would like to use that also for
> our
> > > new
> > > > >>>> > > > > implementation)
> > > > >>>> > > > >
> > > > >>>> > > > > Thank you in advance
> > > > >>>> > > > >
> > > > >>>> > > > > --
> > > > >>>> > > > > Alessandro Tagliapietra
> > > > >>>> > > > >
> > > > >>>> > > > >
> > > > >>>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <
> > > > >>>> vvcephei@apache.org>
> > > > >>>> > > wrote:
> > > > >>>> > > > >
> > > > >>>> > > > >> Oh, good!
> > > > >>>> > > > >>
> > > > >>>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra
> > > wrote:
> > > > >>>> > > > >> > Testing on staging shows that a restart on exception
> is
> > > > much
> > > > >>>> faster
> > > > >>>> > > and
> > > > >>>> > > > >> the
> > > > >>>> > > > >> > stream starts right away which I think means we're
> > > reading
> > > > >>>> way less
> > > > >>>> > > data
> > > > >>>> > > > >> > than before!
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > What I was referring to is that, in Streams, the keys
> > for
> > > > >>>> window
> > > > >>>> > > > >> > > aggregation state is actually composed of both the
> > > window
> > > > >>>> itself
> > > > >>>> > > and
> > > > >>>> > > > >> the
> > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That
> > > > results
> > > > >>>> in the
> > > > >>>> > > > >> store
> > > > >>>> > > > >> > > having a unique key per window for each K, which is
> > why
> > > > we
> > > > >>>> need
> > > > >>>> > > > >> retention
> > > > >>>> > > > >> > > as well as compaction for our changelogs. But for
> > you,
> > > if
> > > > >>>> you just
> > > > >>>> > > > >> make the
> > > > >>>> > > > >> > > key "K", then compaction alone should do the trick.
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > Yes we had compact,delete as cleanup policy but
> > probably
> > > it
> > > > >>>> still
> > > > >>>> > > had a
> > > > >>>> > > > >> too
> > > > >>>> > > > >> > long retention value, also the rocksdb store is
> > probably
> > > > much
> > > > >>>> > > faster now
> > > > >>>> > > > >> > having only one key per key instead of one key per
> > window
> > > > >>>> per key.
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > Thanks a lot for helping! I'm now going to setup a
> > > > >>>> prometheus-jmx
> > > > >>>> > > > >> > monitoring so we can keep better track of what's
> going
> > on
> > > > :)
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > --
> > > > >>>> > > > >> > Alessandro Tagliapietra
> > > > >>>> > > > >> >
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <
> > > > >>>> vvcephei@apache.org>
> > > > >>>> > > > >> wrote:
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > > Oh, yeah, I remember that conversation!
> > > > >>>> > > > >> > >
> > > > >>>> > > > >> > > Yes, then, I agree, if you're only storing state of
> > the
> > > > >>>> most
> > > > >>>> > > recent
> > > > >>>> > > > >> window
> > > > >>>> > > > >> > > for each key, and the key you use for that state is
> > > > >>>> actually the
> > > > >>>> > > key
> > > > >>>> > > > >> of the
> > > > >>>> > > > >> > > records, then an aggressive compaction policy plus
> > your
> > > > >>>> custom
> > > > >>>> > > > >> transformer
> > > > >>>> > > > >> > > seems like a good way forward.
> > > > >>>> > > > >> > >
> > > > >>>> > > > >> > > What I was referring to is that, in Streams, the
> keys
> > > for
> > > > >>>> window
> > > > >>>> > > > >> > > aggregation state is actually composed of both the
> > > window
> > > > >>>> itself
> > > > >>>> > > and
> > > > >>>> > > > >> the
> > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That
> > > > results
> > > > >>>> in the
> > > > >>>> > > > >> store
> > > > >>>> > > > >> > > having a unique key per window for each K, which is
> > why
> > > > we
> > > > >>>> need
> > > > >>>> > > > >> retention
> > > > >>>> > > > >> > > as well as compaction for our changelogs. But for
> > you,
> > > if
> > > > >>>> you just
> > > > >>>> > > > >> make the
> > > > >>>> > > > >> > > key "K", then compaction alone should do the trick.
> > > > >>>> > > > >> > >
> > > > >>>> > > > >> > > And yes, if you manage the topic yourself, then
> > Streams
> > > > >>>> won't
> > > > >>>> > > adjust
> > > > >>>> > > > >> the
> > > > >>>> > > > >> > > retention time. I think it might validate that the
> > > > >>>> retention
> > > > >>>> > > isn't too
> > > > >>>> > > > >> > > short, but I don't remember offhand.
> > > > >>>> > > > >> > >
> > > > >>>> > > > >> > > Cheers, and let me know how it goes!
> > > > >>>> > > > >> > > -John
> > > > >>>> > > > >> > >
> > > > >>>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro
> > Tagliapietra
> > > > >>>> wrote:
> > > > >>>> > > > >> > > > Hi John,
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > > > afaik grace period uses stream time
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > >
> > > > >>>> > > > >>
> > > > >>>> > >
> > > > >>>>
> > > >
> > >
> >
> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
> > > > >>>> > > > >> > > > which is
> > > > >>>> > > > >> > > > per partition, unfortunately we process data
> that's
> > > not
> > > > >>>> in sync
> > > > >>>> > > > >> between
> > > > >>>> > > > >> > > > keys so each key needs to be independent and a
> key
> > > can
> > > > >>>> have much
> > > > >>>> > > > >> older
> > > > >>>> > > > >> > > > data
> > > > >>>> > > > >> > > > than the other.
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > > > Having a small grace period would probably close
> > old
> > > > >>>> windows
> > > > >>>> > > sooner
> > > > >>>> > > > >> than
> > > > >>>> > > > >> > > > expected. That's also why in my use case a custom
> > > store
> > > > >>>> that
> > > > >>>> > > just
> > > > >>>> > > > >> stores
> > > > >>>> > > > >> > > > the last window data for each key might work
> > better.
> > > I
> > > > >>>> had the
> > > > >>>> > > same
> > > > >>>> > > > >> issue
> > > > >>>> > > > >> > > > with suppression and it has been reported here
> > > > >>>> > > > >> > > > https://issues.apache.org/jira/browse/KAFKA-8769
> > > > >>>> > > > >> > > > Oh I just saw that you're the one that helped me
> on
> > > > >>>> slack and
> > > > >>>> > > > >> created the
> > > > >>>> > > > >> > > > issue (thanks again for that).
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > > > The behavior that you mention about streams
> setting
> > > > >>>> changelog
> > > > >>>> > > > >> retention
> > > > >>>> > > > >> > > > time is something they do on creation of the
> topic
> > > when
> > > > >>>> the
> > > > >>>> > > broker
> > > > >>>> > > > >> has
> > > > >>>> > > > >> > > auto
> > > > >>>> > > > >> > > > creation enabled? Because we're using confluent
> > cloud
> > > > >>>> and I had
> > > > >>>> > > to
> > > > >>>> > > > >> create
> > > > >>>> > > > >> > > > it manually.
> > > > >>>> > > > >> > > > Regarding the change in the recovery behavior,
> with
> > > > >>>> compact
> > > > >>>> > > cleanup
> > > > >>>> > > > >> > > policy
> > > > >>>> > > > >> > > > shouldn't the changelog only keep the last value?
> > > That
> > > > >>>> would
> > > > >>>> > > make
> > > > >>>> > > > >> the
> > > > >>>> > > > >> > > > recovery faster and "cheaper" as it would only
> need
> > > to
> > > > >>>> read a
> > > > >>>> > > single
> > > > >>>> > > > >> > > value
> > > > >>>> > > > >> > > > per key (if the cleanup just happened) right?
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > > > --
> > > > >>>> > > > >> > > > Alessandro Tagliapietra
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler <
> > > > >>>> > > vvcephei@apache.org>
> > > > >>>> > > > >> wrote:
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > > > > Hey Alessandro,
> > > > >>>> > > > >> > > > >
> > > > >>>> > > > >> > > > > That sounds also like it would work. I'm
> > wondering
> > > if
> > > > >>>> it would
> > > > >>>> > > > >> actually
> > > > >>>> > > > >> > > > > change what you observe w.r.t. recovery
> behavior,
> > > > >>>> though.
> > > > >>>> > > Streams
> > > > >>>> > > > >> > > already
> > > > >>>> > > > >> > > > > sets the retention time on the changelog to
> equal
> > > the
> > > > >>>> > > retention
> > > > >>>> > > > >> time
> > > > >>>> > > > >> > > of the
> > > > >>>> > > > >> > > > > windows, for windowed aggregations, so you
> > > shouldn't
> > > > be
> > > > >>>> > > loading a
> > > > >>>> > > > >> lot
> > > > >>>> > > > >> > > of
> > > > >>>> > > > >> > > > > window data for old windows you no longer care
> > > about.
> > > > >>>> > > > >> > > > >
> > > > >>>> > > > >> > > > > Have you set the "grace period" on your window
> > > > >>>> definition? By
> > > > >>>> > > > >> default,
> > > > >>>> > > > >> > > it
> > > > >>>> > > > >> > > > > is set to 24 hours, but you can set it as low
> as
> > > you
> > > > >>>> like.
> > > > >>>> > > E.g.,
> > > > >>>> > > > >> if you
> > > > >>>> > > > >> > > > > want to commit to having in-order data only,
> then
> > > you
> > > > >>>> can set
> > > > >>>> > > the
> > > > >>>> > > > >> grace
> > > > >>>> > > > >> > > > > period to zero. This _should_ let the broker
> > clean
> > > up
> > > > >>>> the
> > > > >>>> > > > >> changelog
> > > > >>>> > > > >> > > records
> > > > >>>> > > > >> > > > > as soon as the window ends.
> > > > >>>> > > > >> > > > >
> > > > >>>> > > > >> > > > > Of course, the log cleaner doesn't run all the
> > > time,
> > > > so
> > > > >>>> > > there's
> > > > >>>> > > > >> some
> > > > >>>> > > > >> > > extra
> > > > >>>> > > > >> > > > > delay in which "expired" data would still be
> > > visible
> > > > >>>> in the
> > > > >>>> > > > >> changelog,
> > > > >>>> > > > >> > > but
> > > > >>>> > > > >> > > > > it would actually be just the same as if you
> > manage
> > > > >>>> the store
> > > > >>>> > > > >> yourself.
> > > > >>>> > > > >> > > > >
> > > > >>>> > > > >> > > > > Hope this helps!
> > > > >>>> > > > >> > > > > -John
> > > > >>>> > > > >> > > > >
> > > > >>>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro
> > > > Tagliapietra
> > > > >>>> wrote:
> > > > >>>> > > > >> > > > > > Thanks John for the explanation,
> > > > >>>> > > > >> > > > > >
> > > > >>>> > > > >> > > > > > I thought that with EOS enabled (which we
> have)
> > > it
> > > > >>>> would in
> > > > >>>> > > the
> > > > >>>> > > > >> worst
> > > > >>>> > > > >> > > > > case
> > > > >>>> > > > >> > > > > > find a valid checkpoint and start the restore
> > > from
> > > > >>>> there
> > > > >>>> > > until
> > > > >>>> > > > >> it
> > > > >>>> > > > >> > > reached
> > > > >>>> > > > >> > > > > > the last committed status, not completely
> from
> > > > >>>> scratch. What
> > > > >>>> > > > >> you say
> > > > >>>> > > > >> > > > > > definitely makes sense now.
> > > > >>>> > > > >> > > > > > Since we don't really need old time windows
> and
> > > we
> > > > >>>> ensure
> > > > >>>> > > data
> > > > >>>> > > > >> is
> > > > >>>> > > > >> > > ordered
> > > > >>>> > > > >> > > > > > when processed I think I"ll just write a
> custom
> > > > >>>> transformer
> > > > >>>> > > to
> > > > >>>> > > > >> keep
> > > > >>>> > > > >> > > only
> > > > >>>> > > > >> > > > > > the last window, store intermediate
> aggregation
> > > > >>>> results in
> > > > >>>> > > the
> > > > >>>> > > > >> store
> > > > >>>> > > > >> > > and
> > > > >>>> > > > >> > > > > > emit a new value only when we receive data
> > > > belonging
> > > > >>>> to a
> > > > >>>> > > new
> > > > >>>> > > > >> window.
> > > > >>>> > > > >> > > > > > That with a compact only changelog topic
> should
> > > > keep
> > > > >>>> the
> > > > >>>> > > rebuild
> > > > >>>> > > > >> > > data to
> > > > >>>> > > > >> > > > > > the minimum as it would have only the last
> > value
> > > > for
> > > > >>>> each
> > > > >>>> > > key.
> > > > >>>> > > > >> > > > > >
> > > > >>>> > > > >> > > > > > Hope that makes sense
> > > > >>>> > > > >> > > > > >
> > > > >>>> > > > >> > > > > > Thanks again
> > > > >>>> > > > >> > > > > >
> > > > >>>> > > > >> > > > > > --
> > > > >>>> > > > >> > > > > > Alessandro Tagliapietra
> > > > >>>> > > > >> > > > > >
> > > > >>>> > > > >> > > > > >
> > > > >>>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler <
> > > > >>>> > > > >> vvcephei@apache.org>
> > > > >>>> > > > >> > > wrote:
> > > > >>>> > > > >> > > > > >
> > > > >>>> > > > >> > > > > > > Hi Alessandro,
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > > > To take a stab at your question, maybe it
> > first
> > > > >>>> doesn't
> > > > >>>> > > find
> > > > >>>> > > > >> it,
> > > > >>>> > > > >> > > but
> > > > >>>> > > > >> > > > > then
> > > > >>>> > > > >> > > > > > > restores some data, writes the checkpoint,
> > and
> > > > >>>> then later
> > > > >>>> > > on,
> > > > >>>> > > > >> it
> > > > >>>> > > > >> > > has to
> > > > >>>> > > > >> > > > > > > re-initialize the task for some reason, and
> > > > that's
> > > > >>>> why it
> > > > >>>> > > does
> > > > >>>> > > > >> > > find a
> > > > >>>> > > > >> > > > > > > checkpoint then?
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > > > More to the heart of the issue, if you have
> > EOS
> > > > >>>> enabled,
> > > > >>>> > > > >> Streams
> > > > >>>> > > > >> > > _only_
> > > > >>>> > > > >> > > > > > > records the checkpoint when the store is
> in a
> > > > >>>> > > known-consistent
> > > > >>>> > > > >> > > state.
> > > > >>>> > > > >> > > > > For
> > > > >>>> > > > >> > > > > > > example, if you have a graceful shutdown,
> > > Streams
> > > > >>>> will
> > > > >>>> > > flush
> > > > >>>> > > > >> all
> > > > >>>> > > > >> > > the
> > > > >>>> > > > >> > > > > > > stores, commit all the transactions, and
> then
> > > > >>>> write the
> > > > >>>> > > > >> checkpoint
> > > > >>>> > > > >> > > > > file.
> > > > >>>> > > > >> > > > > > > Then, on re-start, it will pick up from
> that
> > > > >>>> checkpoint.
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > > > But as soon as it starts processing
> records,
> > it
> > > > >>>> removes
> > > > >>>> > > the
> > > > >>>> > > > >> > > checkpoint
> > > > >>>> > > > >> > > > > > > file, so if it crashes while it was
> > processing,
> > > > >>>> there is
> > > > >>>> > > no
> > > > >>>> > > > >> > > checkpoint
> > > > >>>> > > > >> > > > > file
> > > > >>>> > > > >> > > > > > > there, and it will have to restore from the
> > > > >>>> beginning of
> > > > >>>> > > the
> > > > >>>> > > > >> > > changelog.
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > > > This design is there on purpose, because
> > > > otherwise
> > > > >>>> we
> > > > >>>> > > cannot
> > > > >>>> > > > >> > > actually
> > > > >>>> > > > >> > > > > > > guarantee correctness... For example, if
> you
> > > are
> > > > >>>> > > maintaining a
> > > > >>>> > > > >> > > count
> > > > >>>> > > > >> > > > > > > operation, and we process an input record
> i,
> > > > >>>> increment the
> > > > >>>> > > > >> count
> > > > >>>> > > > >> > > and
> > > > >>>> > > > >> > > > > write
> > > > >>>> > > > >> > > > > > > it to the state store, and to the changelog
> > > > topic.
> > > > >>>> But we
> > > > >>>> > > > >> crash
> > > > >>>> > > > >> > > before
> > > > >>>> > > > >> > > > > we
> > > > >>>> > > > >> > > > > > > commit that transaction. Then, the write to
> > the
> > > > >>>> changelog
> > > > >>>> > > > >> would be
> > > > >>>> > > > >> > > > > aborted,
> > > > >>>> > > > >> > > > > > > and we would re-process record i . However,
> > > we've
> > > > >>>> already
> > > > >>>> > > > >> updated
> > > > >>>> > > > >> > > the
> > > > >>>> > > > >> > > > > local
> > > > >>>> > > > >> > > > > > > state store, so when we increment it again,
> > it
> > > > >>>> results in
> > > > >>>> > > > >> > > > > double-counting
> > > > >>>> > > > >> > > > > > > i. The key point here is that there's no
> way
> > to
> > > > do
> > > > >>>> an
> > > > >>>> > > atomic
> > > > >>>> > > > >> > > operation
> > > > >>>> > > > >> > > > > > > across two different systems (local state
> > store
> > > > >>>> and the
> > > > >>>> > > > >> changelog
> > > > >>>> > > > >> > > > > topic).
> > > > >>>> > > > >> > > > > > > Since we can't guarantee that we roll back
> > the
> > > > >>>> incremented
> > > > >>>> > > > >> count
> > > > >>>> > > > >> > > when
> > > > >>>> > > > >> > > > > the
> > > > >>>> > > > >> > > > > > > changelog transaction is aborted, we can't
> > keep
> > > > >>>> the local
> > > > >>>> > > > >> store
> > > > >>>> > > > >> > > > > consistent
> > > > >>>> > > > >> > > > > > > with the changelog.
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > > > After a crash, the only way to ensure the
> > local
> > > > >>>> store is
> > > > >>>> > > > >> consistent
> > > > >>>> > > > >> > > > > with
> > > > >>>> > > > >> > > > > > > the changelog is to discard the entire
> thing
> > > and
> > > > >>>> rebuild
> > > > >>>> > > it.
> > > > >>>> > > > >> This
> > > > >>>> > > > >> > > is
> > > > >>>> > > > >> > > > > why we
> > > > >>>> > > > >> > > > > > > have an invariant that the checkpoint file
> > only
> > > > >>>> exists
> > > > >>>> > > when we
> > > > >>>> > > > >> > > _know_
> > > > >>>> > > > >> > > > > that
> > > > >>>> > > > >> > > > > > > the local store is consistent with the
> > > changelog,
> > > > >>>> and
> > > > >>>> > > this is
> > > > >>>> > > > >> why
> > > > >>>> > > > >> > > > > you're
> > > > >>>> > > > >> > > > > > > seeing so much bandwidth when re-starting
> > from
> > > an
> > > > >>>> unclean
> > > > >>>> > > > >> shutdown.
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > > > Note that it's definitely possible to do
> > better
> > > > >>>> than this,
> > > > >>>> > > > >> and we
> > > > >>>> > > > >> > > would
> > > > >>>> > > > >> > > > > > > very much like to improve it in the future.
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > > > Thanks,
> > > > >>>> > > > >> > > > > > > -John
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro
> > > > >>>> Tagliapietra
> > > > >>>> > > wrote:
> > > > >>>> > > > >> > > > > > > > Hi John,
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > > thanks a lot for helping, regarding your
> > > > message:
> > > > >>>> > > > >> > > > > > > >  - no we only have 1 instance of the
> stream
> > > > >>>> application,
> > > > >>>> > > > >> and it
> > > > >>>> > > > >> > > > > always
> > > > >>>> > > > >> > > > > > > > re-uses the same state folder
> > > > >>>> > > > >> > > > > > > >  - yes we're seeing most issues when
> > > restarting
> > > > >>>> not
> > > > >>>> > > > >> gracefully
> > > > >>>> > > > >> > > due
> > > > >>>> > > > >> > > > > > > exception
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > > I've enabled trace logging and filtering
> > by a
> > > > >>>> single
> > > > >>>> > > state
> > > > >>>> > > > >> store
> > > > >>>> > > > >> > > the
> > > > >>>> > > > >> > > > > > > > StoreChangelogReader messages are:
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-0
> > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-1
> > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-2
> > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-2
> > > for
> > > > >>>> store
> > > > >>>> > > > >> > > aggregate-store,
> > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-1
> > > for
> > > > >>>> store
> > > > >>>> > > > >> > > aggregate-store,
> > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-0
> > > for
> > > > >>>> store
> > > > >>>> > > > >> > > aggregate-store,
> > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_2 state
> > store
> > > > >>>> > > aggregate-store
> > > > >>>> > > > >> > > > > changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-2
> > > with
> > > > >>>> EOS
> > > > >>>> > > turned
> > > > >>>> > > > >> on.
> > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its
> > state
> > > > >>>> from the
> > > > >>>> > > > >> beginning.
> > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_1 state
> > store
> > > > >>>> > > aggregate-store
> > > > >>>> > > > >> > > > > changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-1
> > > with
> > > > >>>> EOS
> > > > >>>> > > turned
> > > > >>>> > > > >> on.
> > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its
> > state
> > > > >>>> from the
> > > > >>>> > > > >> beginning.
> > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_0 state
> > store
> > > > >>>> > > aggregate-store
> > > > >>>> > > > >> > > > > changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-0
> > > with
> > > > >>>> EOS
> > > > >>>> > > turned
> > > > >>>> > > > >> on.
> > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its
> > state
> > > > >>>> from the
> > > > >>>> > > > >> beginning.
> > > > >>>> > > > >> > > > > > > > Found checkpoint 709937 from changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-2
> > > for
> > > > >>>> store
> > > > >>>> > > > >> > > aggregate-store.
> > > > >>>> > > > >> > > > > > > > Restoring partition
> > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-2
> > > > >>>> > > > >> > > from
> > > > >>>> > > > >> > > > > > > offset
> > > > >>>> > > > >> > > > > > > > 709937 to endOffset 742799
> > > > >>>> > > > >> > > > > > > > Found checkpoint 3024234 from changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-1
> > > for
> > > > >>>> store
> > > > >>>> > > > >> > > aggregate-store.
> > > > >>>> > > > >> > > > > > > > Restoring partition
> > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-1
> > > > >>>> > > > >> > > from
> > > > >>>> > > > >> > > > > > > offset
> > > > >>>> > > > >> > > > > > > > 3024234 to endOffset 3131513
> > > > >>>> > > > >> > > > > > > > Found checkpoint 14514072 from changelog
> > > > >>>> > > > >> > > > > > > >
> sensors-stream-aggregate-store-changelog-0
> > > for
> > > > >>>> store
> > > > >>>> > > > >> > > aggregate-store.
> > > > >>>> > > > >> > > > > > > > Restoring partition
> > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-0
> > > > >>>> > > > >> > > from
> > > > >>>> > > > >> > > > > > > offset
> > > > >>>> > > > >> > > > > > > > 14514072 to endOffset 17116574
> > > > >>>> > > > >> > > > > > > > Restored from
> > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to
> > > > >>>> > > > >> > > > > > > aggregate-store
> > > > >>>> > > > >> > > > > > > > with 966 records, ending offset is
> 711432,
> > > next
> > > > >>>> starting
> > > > >>>> > > > >> > > position is
> > > > >>>> > > > >> > > > > > > 711434
> > > > >>>> > > > >> > > > > > > > Restored from
> > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to
> > > > >>>> > > > >> > > > > > > aggregate-store
> > > > >>>> > > > >> > > > > > > > with 914 records, ending offset is
> 712711,
> > > next
> > > > >>>> starting
> > > > >>>> > > > >> > > position is
> > > > >>>> > > > >> > > > > > > 712713
> > > > >>>> > > > >> > > > > > > > Restored from
> > > > >>>> > > sensors-stream-aggregate-store-changelog-1 to
> > > > >>>> > > > >> > > > > > > aggregate-store
> > > > >>>> > > > >> > > > > > > > with 18 records, ending offset is
> 3024261,
> > > next
> > > > >>>> starting
> > > > >>>> > > > >> > > position is
> > > > >>>> > > > >> > > > > > > 3024262
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > > why it first says it didn't find the
> > > checkpoint
> > > > >>>> and
> > > > >>>> > > then it
> > > > >>>> > > > >> does
> > > > >>>> > > > >> > > > > find it?
> > > > >>>> > > > >> > > > > > > > It seems it loaded about  2.7M records
> (sum
> > > of
> > > > >>>> offset
> > > > >>>> > > > >> difference
> > > > >>>> > > > >> > > in
> > > > >>>> > > > >> > > > > the
> > > > >>>> > > > >> > > > > > > > "restorting partition ...." messages)
> > right?
> > > > >>>> > > > >> > > > > > > > Maybe should I try to reduce the
> checkpoint
> > > > >>>> interval?
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > > Regards
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > > --
> > > > >>>> > > > >> > > > > > > > Alessandro Tagliapietra
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John
> > Roesler <
> > > > >>>> > > > >> vvcephei@apache.org
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > > > > wrote:
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > > > > Hi Alessandro,
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > I'm sorry to hear that.
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > The restore process only takes one
> factor
> > > > into
> > > > >>>> > > account:
> > > > >>>> > > > >> the
> > > > >>>> > > > >> > > current
> > > > >>>> > > > >> > > > > > > offset
> > > > >>>> > > > >> > > > > > > > > position of the changelog topic is
> stored
> > > in
> > > > a
> > > > >>>> local
> > > > >>>> > > file
> > > > >>>> > > > >> > > > > alongside the
> > > > >>>> > > > >> > > > > > > > > state stores. On startup, the app
> checks
> > if
> > > > the
> > > > >>>> > > recorded
> > > > >>>> > > > >> > > position
> > > > >>>> > > > >> > > > > lags
> > > > >>>> > > > >> > > > > > > the
> > > > >>>> > > > >> > > > > > > > > latest offset in the changelog. If so,
> > then
> > > > it
> > > > >>>> reads
> > > > >>>> > > the
> > > > >>>> > > > >> > > missing
> > > > >>>> > > > >> > > > > > > changelog
> > > > >>>> > > > >> > > > > > > > > records before starting processing.
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > Thus, it would not restore any old
> window
> > > > data.
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > There might be a few different things
> > going
> > > > on
> > > > >>>> to
> > > > >>>> > > explain
> > > > >>>> > > > >> your
> > > > >>>> > > > >> > > > > > > observation:
> > > > >>>> > > > >> > > > > > > > > * if there is more than one instance in
> > > your
> > > > >>>> Streams
> > > > >>>> > > > >> cluster,
> > > > >>>> > > > >> > > > > maybe the
> > > > >>>> > > > >> > > > > > > > > task is "flopping" between instances,
> so
> > > each
> > > > >>>> instance
> > > > >>>> > > > >> still
> > > > >>>> > > > >> > > has to
> > > > >>>> > > > >> > > > > > > recover
> > > > >>>> > > > >> > > > > > > > > state, since it wasn't the last one
> > > actively
> > > > >>>> > > processing
> > > > >>>> > > > >> it.
> > > > >>>> > > > >> > > > > > > > > * if the application isn't stopped
> > > > gracefully,
> > > > >>>> it
> > > > >>>> > > might
> > > > >>>> > > > >> not
> > > > >>>> > > > >> > > get a
> > > > >>>> > > > >> > > > > > > chance
> > > > >>>> > > > >> > > > > > > > > to record its offset in that local
> file,
> > so
> > > > on
> > > > >>>> > > restart it
> > > > >>>> > > > >> has
> > > > >>>> > > > >> > > to
> > > > >>>> > > > >> > > > > > > restore
> > > > >>>> > > > >> > > > > > > > > some or all of the state store from
> > > > changelog.
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > Or it could be something else; that's
> > just
> > > > >>>> what comes
> > > > >>>> > > to
> > > > >>>> > > > >> mind.
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > If you want to get to the bottom of it,
> > you
> > > > >>>> can take a
> > > > >>>> > > > >> look at
> > > > >>>> > > > >> > > the
> > > > >>>> > > > >> > > > > > > logs,
> > > > >>>> > > > >> > > > > > > > > paying close attention to which tasks
> are
> > > > >>>> assigned to
> > > > >>>> > > > >> which
> > > > >>>> > > > >> > > > > instances
> > > > >>>> > > > >> > > > > > > after
> > > > >>>> > > > >> > > > > > > > > each restart. You can also look into
> the
> > > logs
> > > > >>>> from
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > >
> > > > >>>> > >
> > > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
> > > > >>>> > > > >> > > > > > > (might
> > > > >>>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE level
> to
> > > > >>>> really see
> > > > >>>> > > > >> what's
> > > > >>>> > > > >> > > > > happening).
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > I hope this helps!
> > > > >>>> > > > >> > > > > > > > > -John
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25,
> Alessandro
> > > > >>>> Tagliapietra
> > > > >>>> > > > >> wrote:
> > > > >>>> > > > >> > > > > > > > > > Hello everyone,
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > > we're having a problem with bandwidth
> > > usage
> > > > >>>> on
> > > > >>>> > > streams
> > > > >>>> > > > >> > > > > application
> > > > >>>> > > > >> > > > > > > > > startup,
> > > > >>>> > > > >> > > > > > > > > > our current setup does this:
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > > ...
> > > > >>>> > > > >> > > > > > > > > > .groupByKey()
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > >
> > > > >>>> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > > >>>> > > > >> > > > > > > > > > .aggregate(
> > > > >>>> > > > >> > > > > > > > > >         {
> > MetricSequenceList(ArrayList())
> > > > },
> > > > >>>> > > > >> > > > > > > > > >         { key, value, aggregate ->
> > > > >>>> > > > >> > > > > > > > > >
> > > >  aggregate.getRecords().add(value)
> > > > >>>> > > > >> > > > > > > > > >             aggregate
> > > > >>>> > > > >> > > > > > > > > >         },
> > > > >>>> > > > >> > > > > > > > > >         Materialized.`as`<String,
> > > > >>>> > > MetricSequenceList,
> > > > >>>> > > > >> > > > > > > WindowStore<Bytes,
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > >
> > > > >>>> > > > >> > >
> > > > >>>> > > > >>
> > > > >>>> > >
> > > > >>>>
> > > >
> > >
> >
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > > >>>> > > > >> > > > > > > > > > )
> > > > >>>> > > > >> > > > > > > > > > .toStream()
> > > > >>>> > > > >> > > > > > > > > > .flatTransform(TransformerSupplier {
> > > > >>>> > > > >> > > > > > > > > > ...
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > > basically in each window we append
> the
> > > new
> > > > >>>> values
> > > > >>>> > > and
> > > > >>>> > > > >> then do
> > > > >>>> > > > >> > > > > some
> > > > >>>> > > > >> > > > > > > other
> > > > >>>> > > > >> > > > > > > > > > logic with the array of windowed
> > values.
> > > > >>>> > > > >> > > > > > > > > > The aggregate-store changelog topic
> > > > >>>> configuration
> > > > >>>> > > uses
> > > > >>>> > > > >> > > > > > > compact,delete as
> > > > >>>> > > > >> > > > > > > > > > cleanup policy and has 12 hours of
> > > > retention.
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > > What we've seen is that on
> application
> > > > >>>> startup it
> > > > >>>> > > takes
> > > > >>>> > > > >> a
> > > > >>>> > > > >> > > couple
> > > > >>>> > > > >> > > > > > > minutes
> > > > >>>> > > > >> > > > > > > > > to
> > > > >>>> > > > >> > > > > > > > > > rebuild the state store, even if the
> > > state
> > > > >>>> store
> > > > >>>> > > > >> directory is
> > > > >>>> > > > >> > > > > > > persisted
> > > > >>>> > > > >> > > > > > > > > > across restarts. That along with an
> > > > >>>> exception that
> > > > >>>> > > > >> caused the
> > > > >>>> > > > >> > > > > docker
> > > > >>>> > > > >> > > > > > > > > > container to be restarted a couple
> > > hundreds
> > > > >>>> times
> > > > >>>> > > > >> caused a
> > > > >>>> > > > >> > > big
> > > > >>>> > > > >> > > > > > > confluent
> > > > >>>> > > > >> > > > > > > > > > cloud bill compared to what we
> usually
> > > > spend
> > > > >>>> (1/4
> > > > >>>> > > of a
> > > > >>>> > > > >> full
> > > > >>>> > > > >> > > > > month in
> > > > >>>> > > > >> > > > > > > 1
> > > > >>>> > > > >> > > > > > > > > day).
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > > What I think is happening is that the
> > > topic
> > > > >>>> is
> > > > >>>> > > keeping
> > > > >>>> > > > >> all
> > > > >>>> > > > >> > > the
> > > > >>>> > > > >> > > > > > > previous
> > > > >>>> > > > >> > > > > > > > > > windows even with the compacting
> policy
> > > > >>>> because each
> > > > >>>> > > > >> key is
> > > > >>>> > > > >> > > the
> > > > >>>> > > > >> > > > > > > original
> > > > >>>> > > > >> > > > > > > > > > key + the timestamp not just the key.
> > > Since
> > > > >>>> we don't
> > > > >>>> > > > >> care
> > > > >>>> > > > >> > > about
> > > > >>>> > > > >> > > > > > > previous
> > > > >>>> > > > >> > > > > > > > > > windows as the flatTransform after
> the
> > > > >>>> toStream()
> > > > >>>> > > makes
> > > > >>>> > > > >> sure
> > > > >>>> > > > >> > > > > that we
> > > > >>>> > > > >> > > > > > > > > don't
> > > > >>>> > > > >> > > > > > > > > > process old windows (a custom
> > suppressor
> > > > >>>> basically)
> > > > >>>> > > is
> > > > >>>> > > > >> there
> > > > >>>> > > > >> > > a
> > > > >>>> > > > >> > > > > way to
> > > > >>>> > > > >> > > > > > > > > only
> > > > >>>> > > > >> > > > > > > > > > keep the last window so that the
> store
> > > > >>>> rebuilding
> > > > >>>> > > goes
> > > > >>>> > > > >> > > faster and
> > > > >>>> > > > >> > > > > > > without
> > > > >>>> > > > >> > > > > > > > > > rebuilding old windows too? Or
> should I
> > > > >>>> create a
> > > > >>>> > > custom
> > > > >>>> > > > >> > > window
> > > > >>>> > > > >> > > > > using
> > > > >>>> > > > >> > > > > > > the
> > > > >>>> > > > >> > > > > > > > > > original key as key so that the
> > > compaction
> > > > >>>> keeps
> > > > >>>> > > only
> > > > >>>> > > > >> the
> > > > >>>> > > > >> > > last
> > > > >>>> > > > >> > > > > window
> > > > >>>> > > > >> > > > > > > > > data?
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > > Thank you
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > > > --
> > > > >>>> > > > >> > > > > > > > > > Alessandro Tagliapietra
> > > > >>>> > > > >> > > > > > > > > >
> > > > >>>> > > > >> > > > > > > > >
> > > > >>>> > > > >> > > > > > > >
> > > > >>>> > > > >> > > > > > >
> > > > >>>> > > > >> > > > > >
> > > > >>>> > > > >> > > > >
> > > > >>>> > > > >> > > >
> > > > >>>> > > > >> > >
> > > > >>>> > > > >> >
> > > > >>>> > > > >>
> > > > >>>> > > > >
> > > > >>>> > > >
> > > > >>>> > >
> > > > >>>> >
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message