kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com>
Subject Re: Reducing streams startup bandwidth usage
Date Fri, 13 Dec 2019 22:17:20 GMT
Hi Sophie,

thanks for explaining that.
So yeah it seems that since I'm using the default grace period of 24 hours,
that's might cause the records to be sent to the changelog after ~24 hours.

However, I'm switching the regular windowing system to the custom one, and
while for the regular windows now I understand why it might send all the
data to the changelog, the new one I don't.
In the regulard windows version since new keys are added every minute
(because of the key-timewindow combination) I think it's an expected
behavior, I don't see why the cache would stop working in the new scenario
where the number of keys are fixed.

Since I'm rewriting the same keys over and over and my changelog topic has
a compact cleanup policy, why should it send old values to the changelog?
What I mean is, if the cache can keep the values for e.g. a few hours, and
we saw that the amount of data in each custom window per key doesn't change
(it goes up and down but never exceeds a certain value), why shouldn't it
be able to do the same for longer periods of time?

Btw it doesn't seem to be a metric to know the amount of cache available
for each state store, is it possible to find out in some other way? Just to
tkeep track of it?

Regards

--
Alessandro Tagliapietra


On Thu, Dec 12, 2019 at 3:14 PM Sophie Blee-Goldman <sophie@confluent.io>
wrote:

> Thanks for collecting all these metrics. It might be that as the length of
> the lists
> increases over time, the cache is able to hold fewer unique keys and
> eventually has to
> start evicting things. This would explain why the cache hit rate starts to
> decrease, and
> likely why latency starts to go up. Whenever a dirty entry is
> evicted/flushed from the cache
> it gets sent to the changelog (and underlying state store), so these
> evictions might be the
> cause of the increased load.
>
> The fluctuations you're seeing (ie it starts and stops "working") could
> just be the window
> closing. After that, the list size would go back down to zero, and the
> cache would suddenly
> have free space again.
>
> Does that seem to make sense with what you're seeing?
>
> On Tue, Dec 10, 2019 at 7:04 PM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Just an update since it has been happening again now and I have some more
> > metrics to show, the topology is this:
> >
> > Topologies:
> >    Sub-topology: 0
> >     Source: KSTREAM-SOURCE-0000000000 (topics: [sensors])
> >       --> KSTREAM-TRANSFORMVALUES-0000000001
> >     Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores:
> > [new-data-store])
> >       --> KSTREAM-FLATMAPVALUES-0000000002
> >       <-- KSTREAM-SOURCE-0000000000
> >     Processor: KSTREAM-FLATMAPVALUES-0000000002 (stores: [])
> >       --> KSTREAM-TRANSFORMVALUES-0000000003
> >       <-- KSTREAM-TRANSFORMVALUES-0000000001
> >     Processor: KSTREAM-TRANSFORMVALUES-0000000003 (stores:
> > [LastValueStore])
> >       --> KSTREAM-FILTER-0000000004
> >       <-- KSTREAM-FLATMAPVALUES-0000000002
> >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >       --> KSTREAM-AGGREGATE-0000000005
> >       <-- KSTREAM-TRANSFORMVALUES-0000000003
> >     Processor: KSTREAM-AGGREGATE-0000000005 (stores: [aggregate-store])
> >       --> KTABLE-TOSTREAM-0000000006
> >       <-- KSTREAM-FILTER-0000000004
> >     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> >       --> KSTREAM-TRANSFORM-0000000007
> >       <-- KSTREAM-AGGREGATE-0000000005
> >     Processor: KSTREAM-TRANSFORM-0000000007 (stores: [suppress-store])
> >       --> KSTREAM-MAP-0000000008
> >       <-- KTABLE-TOSTREAM-0000000006
> >     Processor: KSTREAM-MAP-0000000008 (stores: [])
> >       --> KSTREAM-PRINTER-0000000009, KSTREAM-SINK-0000000010
> >       <-- KSTREAM-TRANSFORM-0000000007
> >     Processor: KSTREAM-PRINTER-0000000009 (stores: [])
> >       --> none
> >       <-- KSTREAM-MAP-0000000008
> >     Sink: KSTREAM-SINK-0000000010 (topic: sensors-output)
> >       <-- KSTREAM-MAP-0000000008
> >
> >  - https://imgur.com/R3Pqypo this shows that the input source topic has
> > the
> > same rate of messages
> >  - https://imgur.com/BTwq09p this is the number of records processed by
> > each processor node, at first there are 3 processor nodes
> > kstream-transformvalues-3, kstream-filter-4, kstream-aggregate-5
> processing
> > 4/5k records/min, then ktable-tostream-6 and kstream-transform-7 rump up
> > and the previous ones slow down due the higher load
> >  - https://imgur.com/5eXpf8l the state stores cache rate starts to
> > decrease
> >  - https://imgur.com/dwFOb2g put and fetch operations of the window
> store
> > almost remain the same (maybe lowers due higher load)
> >  - https://imgur.com/1XZmMW5 commit latency increases
> >  - https://imgur.com/UdBpOVU commit rate stays almost the same
> >  - https://imgur.com/UJ3JB4f process latency increases
> >  - https://imgur.com/55YVmy2 process rate stays the same
> >  - https://imgur.com/GMJ3eGV sent records increase because of aggregate
> > and
> > suppress store changelog records
> >  - https://imgur.com/XDm2kX6 sent bytes for those changelog topics
> > increase
> >
> > (full album https://imgur.com/a/tXlJJEO)
> >
> > Any other metric that might be important?
> >
> > It seems that the issue is between the aggregate and Ktable.toStream()
> >
> > After a restart as expected usage go back to normal values
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Mon, Dec 9, 2019 at 7:22 PM Alessandro Tagliapietra <
> > tagliapietra.alessandro@gmail.com> wrote:
> >
> > > You're saying that with a 100ms commit interval, caching won't help
> > > because it would still send the compacted changes to the changelog
> every
> > > 100ms?
> > >
> > > Regarding the custom state store I'll look into that because I didn't
> go
> > > much further than transformers and stores in my kafka experience so
> I'll
> > > need to understand better what that implies.
> > >
> > > Yeah I only have one window per key in the store.
> > >
> > > The only thing I don't understand is why cache works 80% of the time
> and
> > > then suddenly the changelog sent bytes increase 90x.
> > > I mean, if cache wasn't working, why enabling it in our pipeline
> > decreased
> > > the sent bytes from 30-40MB/minute to 400KB/minute?
> > >
> > > I'll look into the custom state store tho.
> > >
> > > Thanks
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > >
> > > On Mon, Dec 9, 2019 at 7:02 PM Sophie Blee-Goldman <
> sophie@confluent.io>
> > > wrote:
> > >
> > >> Alright, well I see why you have so much data being sent to the
> > changelog
> > >> if each
> > >> update involves appending to a list and then writing in the whole
> list.
> > >> And
> > >> with 340
> > >> records/minute I'm actually not sure how the cache could really help
> at
> > >> all
> > >> when it's
> > >> being flushed every 100ms.
> > >>
> > >> Here's kind of a wild idea, if you really only need append semantics:
> > what
> > >> if you wrote
> > >> a custom StateStore that wrapped the normal RocksDBStore (or
> > >> RocksDBWindowStore)
> > >> and did the append for you under the hood? The changelogging layer
> sits
> > >> between the
> > >> layer that you would call #put on in your transformer and the final
> > layer
> > >> that actually writes
> > >> to the underlying storage engine. If you insert an extra layer and
> > modify
> > >> your transformer
> > >> to only call put on the new data (rather than the entire list) then
> only
> > >> this new data will get
> > >> sent to the changelog. Your custom storage layer will know it's
> actually
> > >> append semantics,
> > >> and add the new data to the existing list before sending it on to
> > RocksDB.
> > >>
> > >> Since you only ever have one window per key in the store (right?) you
> > just
> > >> need to make
> > >> sure that nothing from the current window gets deleted prematurely.
> > You'd
> > >> want to turn off
> > >> compaction on the changelog and caching on the store of course, and
> > maybe
> > >> give the
> > >> changelog some extra retention time to be safe.
> > >>
> > >> Obviously I haven't thoroughly verified this alternative, but it seems
> > >> like
> > >> this approach (or
> > >> something to its effect) could help you cut down on the changelog
> data.
> > >> WDYT?
> > >>
> > >> On Mon, Dec 9, 2019 at 4:35 PM Alessandro Tagliapietra <
> > >> tagliapietra.alessandro@gmail.com> wrote:
> > >>
> > >> > Hi Sophie,
> > >> >
> > >> > Just to give a better context, yes we use EOS and the problem
> happens
> > in
> > >> > our aggregation store.
> > >> > Basically when windowing data we append each record into a list
> that's
> > >> > stored in the aggregation store.
> > >> > We have 2 versions, in production we use the kafka streams windowing
> > >> API,
> > >> > in staging we manually calculate the window end timestamp and
> > aggregate
> > >> > using that timestamp.
> > >> >
> > >> > To give you an example of the staging code, it's a simple
> transformer
> > >> that:
> > >> >  - if incoming data fits in the same window as the data in store,
> > append
> > >> > the data to the existing store list overwriting the same key and
> > >> nothing is
> > >> > sent downstream
> > >> >  - if incoming data has a timestamp smaller than the existing store
> > >> data,
> > >> > discard the record
> > >> >  - if incoming data has a timestamp bigger than the existing store
> > data,
> > >> > send the stored list downstream and store the new window data into
> the
> > >> > store
> > >> >
> > >> > This way we don't use multiple keys (kafka streams instead uses a
> > store
> > >> > where each key is stream-key + window key) as we overwrite the store
> > >> data
> > >> > using the same key over and over.
> > >> > So what I would expect is that since we're overwriting the same keys
> > >> there
> > >> > isn't more  and more data to be cached as the number of keys are
> > always
> > >> the
> > >> > same and we don't really need to cache more data over time.
> > >> >
> > >> > To respond to your questions:
> > >> >  - yes when I say that cache "stopped/started" working I mean that
> at
> > >> some
> > >> > point the store started sending more and more data to che changelog
> > >> topic
> > >> > and then suddenly stopped again even without a restart (a restart
> > always
> > >> > fixes the problem).
> > >> >  - Yes there are no density changes in the input stream, I've
> checked
> > >> the
> > >> > number of records sent to the stream input topic and there is a
> > >> variation
> > >> > of ~10-20 records per minute on an average of 340 records per
> minute.
> > >> Most
> > >> > of the records are also generated by simulators with very
> predictable
> > >> > output rate.
> > >> >
> > >> > In the meantime I've enabled reporting of debug metrics (so
> including
> > >> cache
> > >> > hit ratio) to hopefully get better insights the next time it
> happens.
> > >> >
> > >> > Thank you in advance
> > >> >
> > >> > --
> > >> > Alessandro Tagliapietra
> > >> >
> > >> > On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman <
> > sophie@confluent.io
> > >> >
> > >> > wrote:
> > >> >
> > >> > > It's an LRU cache, so once it gets full new records will cause
> older
> > >> ones
> > >> > > to be evicted (and thus sent
> > >> > > downstream). Of course this should only apply to records of a
> > >> different
> > >> > > key, otherwise it will just cause
> > >> > > an update of that key in the cache.
> > >> > >
> > >> > > I missed that you were using EOS, given the short commit interval
> > it's
> > >> > hard
> > >> > > to see those effects.
> > >> > > When you say that it stopped working and then appeared to start
> > >> working
> > >> > > again, are you just
> > >> > > referring to the amount of data being sent to the changelog? And
> you
> > >> can
> > >> > > definitely rule out differences
> > >> > > in the density of updates in the input stream?
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra <
> > >> > > tagliapietra.alessandro@gmail.com> wrote:
> > >> > >
> > >> > > > Hi Sophie,
> > >> > > >
> > >> > > > thanks fo helping.
> > >> > > >
> > >> > > > By eviction of older records you mean they get flushed to the
> > >> changelog
> > >> > > > topic?
> > >> > > > Or the cache is just full and so all new records go to the
> > changelog
> > >> > > topic
> > >> > > > until the old ones are evicted?
> > >> > > >
> > >> > > > Regarding the timing, what timing do you mean? Between when the
> > >> cache
> > >> > > stops
> > >> > > > and starts working again? We're using EOS os I believe the
> commit
> > >> > > interval
> > >> > > > is every 100ms.
> > >> > > >
> > >> > > > Regards
> > >> > > >
> > >> > > > --
> > >> > > > Alessandro Tagliapietra
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman <
> > >> > sophie@confluent.io
> > >> > > >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > It might be that the cache appears to "stop working" because
> it
> > >> gets
> > >> > > > full,
> > >> > > > > and each
> > >> > > > > new update causes an eviction (of some older record). This
> would
> > >> also
> > >> > > > > explain the
> > >> > > > > opposite behavior, that it "starts working" again after some
> > time
> > >> > > without
> > >> > > > > being restarted,
> > >> > > > > since the cache is completely flushed on commit. Does the
> timing
> > >> seem
> > >> > > to
> > >> > > > > align with your
> > >> > > > > commit interval (default is 30s)?
> > >> > > > >
> > >> > > > > On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
> > >> > > > > tagliapietra.alessandro@gmail.com> wrote:
> > >> > > > >
> > >> > > > > > And it seems that for some reason after a while caching
> works
> > >> again
> > >> > > > > > without a restart of the streams application
> > >> > > > > >
> > >> > > > > > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
> > >> > > > > >
> > >> > > > > > I'll try to enable debug metrics and see if I can find
> > something
> > >> > > useful
> > >> > > > > > there.
> > >> > > > > > Any idea is appreciated in the meantime :)
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > Alessandro Tagliapietra
> > >> > > > > >
> > >> > > > > > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
> > >> > > > > > tagliapietra.alessandro@gmail.com> wrote:
> > >> > > > > >
> > >> > > > > >> It seems that even with caching enabled, after a while the
> > sent
> > >> > > bytes
> > >> > > > > >> stil go up
> > >> > > > > >>
> > >> > > > > >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
> > >> > > > > >>
> > >> > > > > >> you can see the deploy when I've enabled caching but it
> looks
> > >> like
> > >> > > > it's
> > >> > > > > >> still a temporary solution.
> > >> > > > > >>
> > >> > > > > >> --
> > >> > > > > >> Alessandro Tagliapietra
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > > >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
> > >> > > > > >> tagliapietra.alessandro@gmail.com> wrote:
> > >> > > > > >>
> > >> > > > > >>> Could be, but since we have a limite amount of input keys
> > >> (~30),
> > >> > > > > >>> windowing generates new keys but old ones are never
> touched
> > >> again
> > >> > > > > since the
> > >> > > > > >>> data per key is in order, I assume it shouldn't be a big
> > deal
> > >> for
> > >> > > it
> > >> > > > to
> > >> > > > > >>> handle 30 keys
> > >> > > > > >>> I'll have a look at cache metrics and see if something
> pops
> > >> out
> > >> > > > > >>>
> > >> > > > > >>> Thanks
> > >> > > > > >>>
> > >> > > > > >>> --
> > >> > > > > >>> Alessandro Tagliapietra
> > >> > > > > >>>
> > >> > > > > >>>
> > >> > > > > >>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler <
> > >> > vvcephei@apache.org>
> > >> > > > > >>> wrote:
> > >> > > > > >>>
> > >> > > > > >>>> Hmm, that’s a good question. Now that we’re talking about
> > >> > > caching, I
> > >> > > > > >>>> wonder if the cache was just too small. It’s not very big
> > by
> > >> > > > default.
> > >> > > > > >>>>
> > >> > > > > >>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra
> > wrote:
> > >> > > > > >>>> > Ok I'll check on that!
> > >> > > > > >>>> >
> > >> > > > > >>>> > Now I can see that with caching we went from 3-4MB/s to
> > >> > 400KB/s,
> > >> > > > > that
> > >> > > > > >>>> will
> > >> > > > > >>>> > help with the bill.
> > >> > > > > >>>> >
> > >> > > > > >>>> > Last question, any reason why after a while the regular
> > >> > windowed
> > >> > > > > >>>> stream
> > >> > > > > >>>> > starts sending every update instead of caching?
> > >> > > > > >>>> > Could it be because it doesn't have any more memory
> > >> available?
> > >> > > Any
> > >> > > > > >>>> other
> > >> > > > > >>>> > possible reason?
> > >> > > > > >>>> >
> > >> > > > > >>>> > Thank you so much for your help
> > >> > > > > >>>> >
> > >> > > > > >>>> > --
> > >> > > > > >>>> > Alessandro Tagliapietra
> > >> > > > > >>>> >
> > >> > > > > >>>> >
> > >> > > > > >>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <
> > >> > > vvcephei@apache.org>
> > >> > > > > >>>> wrote:
> > >> > > > > >>>> >
> > >> > > > > >>>> > > Ah, yes. Glad you figured it out!
> > >> > > > > >>>> > >
> > >> > > > > >>>> > > Caching does not reduce EOS guarantees at all. I
> highly
> > >> > > > recommend
> > >> > > > > >>>> using
> > >> > > > > >>>> > > it. You might even want to take a look at the caching
> > >> > metrics
> > >> > > to
> > >> > > > > >>>> make sure
> > >> > > > > >>>> > > you have a good hit ratio.
> > >> > > > > >>>> > >
> > >> > > > > >>>> > > -John
> > >> > > > > >>>> > >
> > >> > > > > >>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro
> Tagliapietra
> > >> > wrote:
> > >> > > > > >>>> > > > Never mind I've found out I can use
> > >> `.withCachingEnabled`
> > >> > on
> > >> > > > the
> > >> > > > > >>>> store
> > >> > > > > >>>> > > > builder to achieve the same thing as the windowing
> > >> example
> > >> > > as
> > >> > > > > >>>> > > > `Materialized.as` turns that on by default.
> > >> > > > > >>>> > > >
> > >> > > > > >>>> > > > Does caching in any way reduces the EOS guarantees?
> > >> > > > > >>>> > > >
> > >> > > > > >>>> > > > --
> > >> > > > > >>>> > > > Alessandro Tagliapietra
> > >> > > > > >>>> > > >
> > >> > > > > >>>> > > >
> > >> > > > > >>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro
> > Tagliapietra
> > >> <
> > >> > > > > >>>> > > > tagliapietra.alessandro@gmail.com> wrote:
> > >> > > > > >>>> > > >
> > >> > > > > >>>> > > > > Seems my journey with this isn't done just yet,
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > This seems very complicated to me but I'll try to
> > >> > explain
> > >> > > it
> > >> > > > > as
> > >> > > > > >>>> best I
> > >> > > > > >>>> > > can.
> > >> > > > > >>>> > > > > To better understand the streams network usage
> I've
> > >> used
> > >> > > > > >>>> prometheus
> > >> > > > > >>>> > > with
> > >> > > > > >>>> > > > > the JMX exporter to export kafka metrics.
> > >> > > > > >>>> > > > > To check the amount of data we use I'm looking at
> > the
> > >> > > > > increments
> > >> > > > > >>>> > > > > of kafka_producer_topic_metrics_byte_total and
> > >> > > > > >>>> > > > >
> > >> kafka_producer_producer_topic_metrics_record_send_total,
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > Our current (before the change mentioned above)
> > code
> > >> > looks
> > >> > > > > like
> > >> > > > > >>>> this:
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > // This transformers just pairs a value with the
> > >> > previous
> > >> > > > one
> > >> > > > > >>>> storing
> > >> > > > > >>>> > > the
> > >> > > > > >>>> > > > > temporary one in a store
> > >> > > > > >>>> > > > > val pairsStream = metricStream
> > >> > > > > >>>> > > > >
>  .transformValues(ValueTransformerWithKeySupplier
> > {
> > >> > > > > >>>> PairTransformer()
> > >> > > > > >>>> > > },
> > >> > > > > >>>> > > > > "LastValueStore")
> > >> > > > > >>>> > > > >   .filter { _, value: MetricSequence? -> value !=
> > >> null }
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > // Create a store to store suppressed windows
> > until a
> > >> > new
> > >> > > > one
> > >> > > > > is
> > >> > > > > >>>> > > received
> > >> > > > > >>>> > > > > val suppressStoreSupplier =
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > >
> > >> > > > > >>>>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
> > >> > > > > >>>> > > > > ......
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > // Window and aggregate data in 1 minute
> intervals
> > >> > > > > >>>> > > > > val aggregatedStream = pairsStream
> > >> > > > > >>>> > > > >   .groupByKey()
> > >> > > > > >>>> > > > >
> > >> > > > >
> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > >> > > > > >>>> > > > >   .aggregate(
> > >> > > > > >>>> > > > >           { MetricSequenceList(ArrayList()) },
> > >> > > > > >>>> > > > >           { key, value, aggregate ->
> > >> > > > > >>>> > > > >               aggregate.getRecords().add(value)
> > >> > > > > >>>> > > > >               aggregate
> > >> > > > > >>>> > > > >           },
> > >> > > > > >>>> > > > >           Materialized.`as`<String,
> > >> MetricSequenceList,
> > >> > > > > >>>> > > WindowStore<Bytes,
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > >
> > >> > > > > >>>>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > >> > > > > >>>> > > > >   )
> > >> > > > > >>>> > > > >   .toStream()
> > >> > > > > >>>> > > > >   .flatTransform(TransformerSupplier {
> > >> > > > > >>>> > > > >       // This transformer basically waits until a
> > new
> > >> > > window
> > >> > > > > is
> > >> > > > > >>>> > > received
> > >> > > > > >>>> > > > > to emit the previous one
> > >> > > > > >>>> > > > >   }, "suppress-store")
> > >> > > > > >>>> > > > >   .map { sensorId: String, suppressedOutput:
> > >> > > > SuppressedOutput
> > >> > > > > ->
> > >> > > > > >>>> > > > >       .... etc ....
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > Basically:
> > >> > > > > >>>> > > > >  - all data goes through LastValueStore store
> that
> > >> > stores
> > >> > > > each
> > >> > > > > >>>> message
> > >> > > > > >>>> > > and
> > >> > > > > >>>> > > > > emits a pair with the previous one
> > >> > > > > >>>> > > > >  - the aggregate-store is used to store the
> > >> per-window
> > >> > > list
> > >> > > > of
> > >> > > > > >>>> > > messages in
> > >> > > > > >>>> > > > > the aggregate method
> > >> > > > > >>>> > > > >  - the suppress store is used to store each
> > received
> > >> > > window
> > >> > > > > >>>> which is
> > >> > > > > >>>> > > > > emitted only after a newer one is received
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > What I'm experiencing is that:
> > >> > > > > >>>> > > > >  - during normal execution, the streams app sends
> > to
> > >> the
> > >> > > > > >>>> lastvalue
> > >> > > > > >>>> > > store
> > >> > > > > >>>> > > > > changelog topic 5k messages/min, the aggregate
> and
> > >> > > suppress
> > >> > > > > >>>> store
> > >> > > > > >>>> > > changelog
> > >> > > > > >>>> > > > > topics only about 100
> > >> > > > > >>>> > > > >  - at some point (after many hours of operation),
> > the
> > >> > > > streams
> > >> > > > > >>>> app
> > >> > > > > >>>> > > starts
> > >> > > > > >>>> > > > > sending to the aggregate and suppress store
> > changelog
> > >> > > topic
> > >> > > > > the
> > >> > > > > >>>> same
> > >> > > > > >>>> > > amount
> > >> > > > > >>>> > > > > of messages going through the lastvaluestore
> > >> > > > > >>>> > > > >  - if I restart the streams app it goes back to
> the
> > >> > > initial
> > >> > > > > >>>> behavior
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > You can see the behavior in this graph
> > >> > > > > >>>> https://imgur.com/dJcUNSf
> > >> > > > > >>>> > > > > You can also see that after a restart everything
> > goes
> > >> > back
> > >> > > > to
> > >> > > > > >>>> normal
> > >> > > > > >>>> > > > > levels.
> > >> > > > > >>>> > > > > Regarding other metrics, process latency
> increases,
> > >> poll
> > >> > > > > latency
> > >> > > > > >>>> > > > > decreases, poll rate decreases, commit rate stays
> > the
> > >> > same
> > >> > > > > >>>> while commit
> > >> > > > > >>>> > > > > latency increases.
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > Now, I've these questions:
> > >> > > > > >>>> > > > >  - why isn't the aggregate/suppress store
> changelog
> > >> > topic
> > >> > > > > >>>> throughput
> > >> > > > > >>>> > > the
> > >> > > > > >>>> > > > > same as the LastValueStore? Shouldn't every time
> it
> > >> > > > aggregates
> > >> > > > > >>>> send a
> > >> > > > > >>>> > > > > record to the changelog?
> > >> > > > > >>>> > > > >  - is the windowing doing some internal caching
> > like
> > >> not
> > >> > > > > >>>> sending every
> > >> > > > > >>>> > > > > aggregation record until the window time is
> passed?
> > >> (if
> > >> > > so,
> > >> > > > > >>>> where can I
> > >> > > > > >>>> > > > > find that code since I would like to use that
> also
> > >> for
> > >> > our
> > >> > > > new
> > >> > > > > >>>> > > > > implementation)
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > Thank you in advance
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > --
> > >> > > > > >>>> > > > > Alessandro Tagliapietra
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <
> > >> > > > > >>>> vvcephei@apache.org>
> > >> > > > > >>>> > > wrote:
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > > >> Oh, good!
> > >> > > > > >>>> > > > >>
> > >> > > > > >>>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro
> > >> Tagliapietra
> > >> > > > wrote:
> > >> > > > > >>>> > > > >> > Testing on staging shows that a restart on
> > >> exception
> > >> > is
> > >> > > > > much
> > >> > > > > >>>> faster
> > >> > > > > >>>> > > and
> > >> > > > > >>>> > > > >> the
> > >> > > > > >>>> > > > >> > stream starts right away which I think means
> > we're
> > >> > > > reading
> > >> > > > > >>>> way less
> > >> > > > > >>>> > > data
> > >> > > > > >>>> > > > >> > than before!
> > >> > > > > >>>> > > > >> >
> > >> > > > > >>>> > > > >> > What I was referring to is that, in Streams,
> the
> > >> keys
> > >> > > for
> > >> > > > > >>>> window
> > >> > > > > >>>> > > > >> > > aggregation state is actually composed of
> both
> > >> the
> > >> > > > window
> > >> > > > > >>>> itself
> > >> > > > > >>>> > > and
> > >> > > > > >>>> > > > >> the
> > >> > > > > >>>> > > > >> > > key. In the DSL, it looks like
> "Windowed<K>".
> > >> That
> > >> > > > > results
> > >> > > > > >>>> in the
> > >> > > > > >>>> > > > >> store
> > >> > > > > >>>> > > > >> > > having a unique key per window for each K,
> > >> which is
> > >> > > why
> > >> > > > > we
> > >> > > > > >>>> need
> > >> > > > > >>>> > > > >> retention
> > >> > > > > >>>> > > > >> > > as well as compaction for our changelogs.
> But
> > >> for
> > >> > > you,
> > >> > > > if
> > >> > > > > >>>> you just
> > >> > > > > >>>> > > > >> make the
> > >> > > > > >>>> > > > >> > > key "K", then compaction alone should do the
> > >> trick.
> > >> > > > > >>>> > > > >> >
> > >> > > > > >>>> > > > >> > Yes we had compact,delete as cleanup policy
> but
> > >> > > probably
> > >> > > > it
> > >> > > > > >>>> still
> > >> > > > > >>>> > > had a
> > >> > > > > >>>> > > > >> too
> > >> > > > > >>>> > > > >> > long retention value, also the rocksdb store
> is
> > >> > > probably
> > >> > > > > much
> > >> > > > > >>>> > > faster now
> > >> > > > > >>>> > > > >> > having only one key per key instead of one key
> > per
> > >> > > window
> > >> > > > > >>>> per key.
> > >> > > > > >>>> > > > >> >
> > >> > > > > >>>> > > > >> > Thanks a lot for helping! I'm now going to
> > setup a
> > >> > > > > >>>> prometheus-jmx
> > >> > > > > >>>> > > > >> > monitoring so we can keep better track of
> what's
> > >> > going
> > >> > > on
> > >> > > > > :)
> > >> > > > > >>>> > > > >> >
> > >> > > > > >>>> > > > >> > --
> > >> > > > > >>>> > > > >> > Alessandro Tagliapietra
> > >> > > > > >>>> > > > >> >
> > >> > > > > >>>> > > > >> >
> > >> > > > > >>>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <
> > >> > > > > >>>> vvcephei@apache.org>
> > >> > > > > >>>> > > > >> wrote:
> > >> > > > > >>>> > > > >> >
> > >> > > > > >>>> > > > >> > > Oh, yeah, I remember that conversation!
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > > > >> > > Yes, then, I agree, if you're only storing
> > >> state of
> > >> > > the
> > >> > > > > >>>> most
> > >> > > > > >>>> > > recent
> > >> > > > > >>>> > > > >> window
> > >> > > > > >>>> > > > >> > > for each key, and the key you use for that
> > >> state is
> > >> > > > > >>>> actually the
> > >> > > > > >>>> > > key
> > >> > > > > >>>> > > > >> of the
> > >> > > > > >>>> > > > >> > > records, then an aggressive compaction
> policy
> > >> plus
> > >> > > your
> > >> > > > > >>>> custom
> > >> > > > > >>>> > > > >> transformer
> > >> > > > > >>>> > > > >> > > seems like a good way forward.
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > > > >> > > What I was referring to is that, in Streams,
> > the
> > >> > keys
> > >> > > > for
> > >> > > > > >>>> window
> > >> > > > > >>>> > > > >> > > aggregation state is actually composed of
> both
> > >> the
> > >> > > > window
> > >> > > > > >>>> itself
> > >> > > > > >>>> > > and
> > >> > > > > >>>> > > > >> the
> > >> > > > > >>>> > > > >> > > key. In the DSL, it looks like
> "Windowed<K>".
> > >> That
> > >> > > > > results
> > >> > > > > >>>> in the
> > >> > > > > >>>> > > > >> store
> > >> > > > > >>>> > > > >> > > having a unique key per window for each K,
> > >> which is
> > >> > > why
> > >> > > > > we
> > >> > > > > >>>> need
> > >> > > > > >>>> > > > >> retention
> > >> > > > > >>>> > > > >> > > as well as compaction for our changelogs.
> But
> > >> for
> > >> > > you,
> > >> > > > if
> > >> > > > > >>>> you just
> > >> > > > > >>>> > > > >> make the
> > >> > > > > >>>> > > > >> > > key "K", then compaction alone should do the
> > >> trick.
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > > > >> > > And yes, if you manage the topic yourself,
> > then
> > >> > > Streams
> > >> > > > > >>>> won't
> > >> > > > > >>>> > > adjust
> > >> > > > > >>>> > > > >> the
> > >> > > > > >>>> > > > >> > > retention time. I think it might validate
> that
> > >> the
> > >> > > > > >>>> retention
> > >> > > > > >>>> > > isn't too
> > >> > > > > >>>> > > > >> > > short, but I don't remember offhand.
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > > > >> > > Cheers, and let me know how it goes!
> > >> > > > > >>>> > > > >> > > -John
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro
> > >> > > Tagliapietra
> > >> > > > > >>>> wrote:
> > >> > > > > >>>> > > > >> > > > Hi John,
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > > > afaik grace period uses stream time
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > > > >>
> > >> > > > > >>>> > >
> > >> > > > > >>>>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
> > >> > > > > >>>> > > > >> > > > which is
> > >> > > > > >>>> > > > >> > > > per partition, unfortunately we process
> data
> > >> > that's
> > >> > > > not
> > >> > > > > >>>> in sync
> > >> > > > > >>>> > > > >> between
> > >> > > > > >>>> > > > >> > > > keys so each key needs to be independent
> > and a
> > >> > key
> > >> > > > can
> > >> > > > > >>>> have much
> > >> > > > > >>>> > > > >> older
> > >> > > > > >>>> > > > >> > > > data
> > >> > > > > >>>> > > > >> > > > than the other.
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > > > Having a small grace period would probably
> > >> close
> > >> > > old
> > >> > > > > >>>> windows
> > >> > > > > >>>> > > sooner
> > >> > > > > >>>> > > > >> than
> > >> > > > > >>>> > > > >> > > > expected. That's also why in my use case a
> > >> custom
> > >> > > > store
> > >> > > > > >>>> that
> > >> > > > > >>>> > > just
> > >> > > > > >>>> > > > >> stores
> > >> > > > > >>>> > > > >> > > > the last window data for each key might
> work
> > >> > > better.
> > >> > > > I
> > >> > > > > >>>> had the
> > >> > > > > >>>> > > same
> > >> > > > > >>>> > > > >> issue
> > >> > > > > >>>> > > > >> > > > with suppression and it has been reported
> > here
> > >> > > > > >>>> > > > >> > > >
> > >> https://issues.apache.org/jira/browse/KAFKA-8769
> > >> > > > > >>>> > > > >> > > > Oh I just saw that you're the one that
> > helped
> > >> me
> > >> > on
> > >> > > > > >>>> slack and
> > >> > > > > >>>> > > > >> created the
> > >> > > > > >>>> > > > >> > > > issue (thanks again for that).
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > > > The behavior that you mention about
> streams
> > >> > setting
> > >> > > > > >>>> changelog
> > >> > > > > >>>> > > > >> retention
> > >> > > > > >>>> > > > >> > > > time is something they do on creation of
> the
> > >> > topic
> > >> > > > when
> > >> > > > > >>>> the
> > >> > > > > >>>> > > broker
> > >> > > > > >>>> > > > >> has
> > >> > > > > >>>> > > > >> > > auto
> > >> > > > > >>>> > > > >> > > > creation enabled? Because we're using
> > >> confluent
> > >> > > cloud
> > >> > > > > >>>> and I had
> > >> > > > > >>>> > > to
> > >> > > > > >>>> > > > >> create
> > >> > > > > >>>> > > > >> > > > it manually.
> > >> > > > > >>>> > > > >> > > > Regarding the change in the recovery
> > behavior,
> > >> > with
> > >> > > > > >>>> compact
> > >> > > > > >>>> > > cleanup
> > >> > > > > >>>> > > > >> > > policy
> > >> > > > > >>>> > > > >> > > > shouldn't the changelog only keep the last
> > >> value?
> > >> > > > That
> > >> > > > > >>>> would
> > >> > > > > >>>> > > make
> > >> > > > > >>>> > > > >> the
> > >> > > > > >>>> > > > >> > > > recovery faster and "cheaper" as it would
> > only
> > >> > need
> > >> > > > to
> > >> > > > > >>>> read a
> > >> > > > > >>>> > > single
> > >> > > > > >>>> > > > >> > > value
> > >> > > > > >>>> > > > >> > > > per key (if the cleanup just happened)
> > right?
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > > > --
> > >> > > > > >>>> > > > >> > > > Alessandro Tagliapietra
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John
> Roesler
> > <
> > >> > > > > >>>> > > vvcephei@apache.org>
> > >> > > > > >>>> > > > >> wrote:
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > > > > Hey Alessandro,
> > >> > > > > >>>> > > > >> > > > >
> > >> > > > > >>>> > > > >> > > > > That sounds also like it would work. I'm
> > >> > > wondering
> > >> > > > if
> > >> > > > > >>>> it would
> > >> > > > > >>>> > > > >> actually
> > >> > > > > >>>> > > > >> > > > > change what you observe w.r.t. recovery
> > >> > behavior,
> > >> > > > > >>>> though.
> > >> > > > > >>>> > > Streams
> > >> > > > > >>>> > > > >> > > already
> > >> > > > > >>>> > > > >> > > > > sets the retention time on the changelog
> > to
> > >> > equal
> > >> > > > the
> > >> > > > > >>>> > > retention
> > >> > > > > >>>> > > > >> time
> > >> > > > > >>>> > > > >> > > of the
> > >> > > > > >>>> > > > >> > > > > windows, for windowed aggregations, so
> you
> > >> > > > shouldn't
> > >> > > > > be
> > >> > > > > >>>> > > loading a
> > >> > > > > >>>> > > > >> lot
> > >> > > > > >>>> > > > >> > > of
> > >> > > > > >>>> > > > >> > > > > window data for old windows you no
> longer
> > >> care
> > >> > > > about.
> > >> > > > > >>>> > > > >> > > > >
> > >> > > > > >>>> > > > >> > > > > Have you set the "grace period" on your
> > >> window
> > >> > > > > >>>> definition? By
> > >> > > > > >>>> > > > >> default,
> > >> > > > > >>>> > > > >> > > it
> > >> > > > > >>>> > > > >> > > > > is set to 24 hours, but you can set it
> as
> > >> low
> > >> > as
> > >> > > > you
> > >> > > > > >>>> like.
> > >> > > > > >>>> > > E.g.,
> > >> > > > > >>>> > > > >> if you
> > >> > > > > >>>> > > > >> > > > > want to commit to having in-order data
> > only,
> > >> > then
> > >> > > > you
> > >> > > > > >>>> can set
> > >> > > > > >>>> > > the
> > >> > > > > >>>> > > > >> grace
> > >> > > > > >>>> > > > >> > > > > period to zero. This _should_ let the
> > broker
> > >> > > clean
> > >> > > > up
> > >> > > > > >>>> the
> > >> > > > > >>>> > > > >> changelog
> > >> > > > > >>>> > > > >> > > records
> > >> > > > > >>>> > > > >> > > > > as soon as the window ends.
> > >> > > > > >>>> > > > >> > > > >
> > >> > > > > >>>> > > > >> > > > > Of course, the log cleaner doesn't run
> all
> > >> the
> > >> > > > time,
> > >> > > > > so
> > >> > > > > >>>> > > there's
> > >> > > > > >>>> > > > >> some
> > >> > > > > >>>> > > > >> > > extra
> > >> > > > > >>>> > > > >> > > > > delay in which "expired" data would
> still
> > be
> > >> > > > visible
> > >> > > > > >>>> in the
> > >> > > > > >>>> > > > >> changelog,
> > >> > > > > >>>> > > > >> > > but
> > >> > > > > >>>> > > > >> > > > > it would actually be just the same as if
> > you
> > >> > > manage
> > >> > > > > >>>> the store
> > >> > > > > >>>> > > > >> yourself.
> > >> > > > > >>>> > > > >> > > > >
> > >> > > > > >>>> > > > >> > > > > Hope this helps!
> > >> > > > > >>>> > > > >> > > > > -John
> > >> > > > > >>>> > > > >> > > > >
> > >> > > > > >>>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22,
> Alessandro
> > >> > > > > Tagliapietra
> > >> > > > > >>>> wrote:
> > >> > > > > >>>> > > > >> > > > > > Thanks John for the explanation,
> > >> > > > > >>>> > > > >> > > > > >
> > >> > > > > >>>> > > > >> > > > > > I thought that with EOS enabled (which
> > we
> > >> > have)
> > >> > > > it
> > >> > > > > >>>> would in
> > >> > > > > >>>> > > the
> > >> > > > > >>>> > > > >> worst
> > >> > > > > >>>> > > > >> > > > > case
> > >> > > > > >>>> > > > >> > > > > > find a valid checkpoint and start the
> > >> restore
> > >> > > > from
> > >> > > > > >>>> there
> > >> > > > > >>>> > > until
> > >> > > > > >>>> > > > >> it
> > >> > > > > >>>> > > > >> > > reached
> > >> > > > > >>>> > > > >> > > > > > the last committed status, not
> > completely
> > >> > from
> > >> > > > > >>>> scratch. What
> > >> > > > > >>>> > > > >> you say
> > >> > > > > >>>> > > > >> > > > > > definitely makes sense now.
> > >> > > > > >>>> > > > >> > > > > > Since we don't really need old time
> > >> windows
> > >> > and
> > >> > > > we
> > >> > > > > >>>> ensure
> > >> > > > > >>>> > > data
> > >> > > > > >>>> > > > >> is
> > >> > > > > >>>> > > > >> > > ordered
> > >> > > > > >>>> > > > >> > > > > > when processed I think I"ll just
> write a
> > >> > custom
> > >> > > > > >>>> transformer
> > >> > > > > >>>> > > to
> > >> > > > > >>>> > > > >> keep
> > >> > > > > >>>> > > > >> > > only
> > >> > > > > >>>> > > > >> > > > > > the last window, store intermediate
> > >> > aggregation
> > >> > > > > >>>> results in
> > >> > > > > >>>> > > the
> > >> > > > > >>>> > > > >> store
> > >> > > > > >>>> > > > >> > > and
> > >> > > > > >>>> > > > >> > > > > > emit a new value only when we receive
> > data
> > >> > > > > belonging
> > >> > > > > >>>> to a
> > >> > > > > >>>> > > new
> > >> > > > > >>>> > > > >> window.
> > >> > > > > >>>> > > > >> > > > > > That with a compact only changelog
> topic
> > >> > should
> > >> > > > > keep
> > >> > > > > >>>> the
> > >> > > > > >>>> > > rebuild
> > >> > > > > >>>> > > > >> > > data to
> > >> > > > > >>>> > > > >> > > > > > the minimum as it would have only the
> > last
> > >> > > value
> > >> > > > > for
> > >> > > > > >>>> each
> > >> > > > > >>>> > > key.
> > >> > > > > >>>> > > > >> > > > > >
> > >> > > > > >>>> > > > >> > > > > > Hope that makes sense
> > >> > > > > >>>> > > > >> > > > > >
> > >> > > > > >>>> > > > >> > > > > > Thanks again
> > >> > > > > >>>> > > > >> > > > > >
> > >> > > > > >>>> > > > >> > > > > > --
> > >> > > > > >>>> > > > >> > > > > > Alessandro Tagliapietra
> > >> > > > > >>>> > > > >> > > > > >
> > >> > > > > >>>> > > > >> > > > > >
> > >> > > > > >>>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John
> > >> Roesler <
> > >> > > > > >>>> > > > >> vvcephei@apache.org>
> > >> > > > > >>>> > > > >> > > wrote:
> > >> > > > > >>>> > > > >> > > > > >
> > >> > > > > >>>> > > > >> > > > > > > Hi Alessandro,
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > To take a stab at your question,
> maybe
> > >> it
> > >> > > first
> > >> > > > > >>>> doesn't
> > >> > > > > >>>> > > find
> > >> > > > > >>>> > > > >> it,
> > >> > > > > >>>> > > > >> > > but
> > >> > > > > >>>> > > > >> > > > > then
> > >> > > > > >>>> > > > >> > > > > > > restores some data, writes the
> > >> checkpoint,
> > >> > > and
> > >> > > > > >>>> then later
> > >> > > > > >>>> > > on,
> > >> > > > > >>>> > > > >> it
> > >> > > > > >>>> > > > >> > > has to
> > >> > > > > >>>> > > > >> > > > > > > re-initialize the task for some
> > reason,
> > >> and
> > >> > > > > that's
> > >> > > > > >>>> why it
> > >> > > > > >>>> > > does
> > >> > > > > >>>> > > > >> > > find a
> > >> > > > > >>>> > > > >> > > > > > > checkpoint then?
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > More to the heart of the issue, if
> you
> > >> have
> > >> > > EOS
> > >> > > > > >>>> enabled,
> > >> > > > > >>>> > > > >> Streams
> > >> > > > > >>>> > > > >> > > _only_
> > >> > > > > >>>> > > > >> > > > > > > records the checkpoint when the
> store
> > is
> > >> > in a
> > >> > > > > >>>> > > known-consistent
> > >> > > > > >>>> > > > >> > > state.
> > >> > > > > >>>> > > > >> > > > > For
> > >> > > > > >>>> > > > >> > > > > > > example, if you have a graceful
> > >> shutdown,
> > >> > > > Streams
> > >> > > > > >>>> will
> > >> > > > > >>>> > > flush
> > >> > > > > >>>> > > > >> all
> > >> > > > > >>>> > > > >> > > the
> > >> > > > > >>>> > > > >> > > > > > > stores, commit all the transactions,
> > and
> > >> > then
> > >> > > > > >>>> write the
> > >> > > > > >>>> > > > >> checkpoint
> > >> > > > > >>>> > > > >> > > > > file.
> > >> > > > > >>>> > > > >> > > > > > > Then, on re-start, it will pick up
> > from
> > >> > that
> > >> > > > > >>>> checkpoint.
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > But as soon as it starts processing
> > >> > records,
> > >> > > it
> > >> > > > > >>>> removes
> > >> > > > > >>>> > > the
> > >> > > > > >>>> > > > >> > > checkpoint
> > >> > > > > >>>> > > > >> > > > > > > file, so if it crashes while it was
> > >> > > processing,
> > >> > > > > >>>> there is
> > >> > > > > >>>> > > no
> > >> > > > > >>>> > > > >> > > checkpoint
> > >> > > > > >>>> > > > >> > > > > file
> > >> > > > > >>>> > > > >> > > > > > > there, and it will have to restore
> > from
> > >> the
> > >> > > > > >>>> beginning of
> > >> > > > > >>>> > > the
> > >> > > > > >>>> > > > >> > > changelog.
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > This design is there on purpose,
> > because
> > >> > > > > otherwise
> > >> > > > > >>>> we
> > >> > > > > >>>> > > cannot
> > >> > > > > >>>> > > > >> > > actually
> > >> > > > > >>>> > > > >> > > > > > > guarantee correctness... For
> example,
> > if
> > >> > you
> > >> > > > are
> > >> > > > > >>>> > > maintaining a
> > >> > > > > >>>> > > > >> > > count
> > >> > > > > >>>> > > > >> > > > > > > operation, and we process an input
> > >> record
> > >> > i,
> > >> > > > > >>>> increment the
> > >> > > > > >>>> > > > >> count
> > >> > > > > >>>> > > > >> > > and
> > >> > > > > >>>> > > > >> > > > > write
> > >> > > > > >>>> > > > >> > > > > > > it to the state store, and to the
> > >> changelog
> > >> > > > > topic.
> > >> > > > > >>>> But we
> > >> > > > > >>>> > > > >> crash
> > >> > > > > >>>> > > > >> > > before
> > >> > > > > >>>> > > > >> > > > > we
> > >> > > > > >>>> > > > >> > > > > > > commit that transaction. Then, the
> > >> write to
> > >> > > the
> > >> > > > > >>>> changelog
> > >> > > > > >>>> > > > >> would be
> > >> > > > > >>>> > > > >> > > > > aborted,
> > >> > > > > >>>> > > > >> > > > > > > and we would re-process record i .
> > >> However,
> > >> > > > we've
> > >> > > > > >>>> already
> > >> > > > > >>>> > > > >> updated
> > >> > > > > >>>> > > > >> > > the
> > >> > > > > >>>> > > > >> > > > > local
> > >> > > > > >>>> > > > >> > > > > > > state store, so when we increment it
> > >> again,
> > >> > > it
> > >> > > > > >>>> results in
> > >> > > > > >>>> > > > >> > > > > double-counting
> > >> > > > > >>>> > > > >> > > > > > > i. The key point here is that
> there's
> > no
> > >> > way
> > >> > > to
> > >> > > > > do
> > >> > > > > >>>> an
> > >> > > > > >>>> > > atomic
> > >> > > > > >>>> > > > >> > > operation
> > >> > > > > >>>> > > > >> > > > > > > across two different systems (local
> > >> state
> > >> > > store
> > >> > > > > >>>> and the
> > >> > > > > >>>> > > > >> changelog
> > >> > > > > >>>> > > > >> > > > > topic).
> > >> > > > > >>>> > > > >> > > > > > > Since we can't guarantee that we
> roll
> > >> back
> > >> > > the
> > >> > > > > >>>> incremented
> > >> > > > > >>>> > > > >> count
> > >> > > > > >>>> > > > >> > > when
> > >> > > > > >>>> > > > >> > > > > the
> > >> > > > > >>>> > > > >> > > > > > > changelog transaction is aborted, we
> > >> can't
> > >> > > keep
> > >> > > > > >>>> the local
> > >> > > > > >>>> > > > >> store
> > >> > > > > >>>> > > > >> > > > > consistent
> > >> > > > > >>>> > > > >> > > > > > > with the changelog.
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > After a crash, the only way to
> ensure
> > >> the
> > >> > > local
> > >> > > > > >>>> store is
> > >> > > > > >>>> > > > >> consistent
> > >> > > > > >>>> > > > >> > > > > with
> > >> > > > > >>>> > > > >> > > > > > > the changelog is to discard the
> entire
> > >> > thing
> > >> > > > and
> > >> > > > > >>>> rebuild
> > >> > > > > >>>> > > it.
> > >> > > > > >>>> > > > >> This
> > >> > > > > >>>> > > > >> > > is
> > >> > > > > >>>> > > > >> > > > > why we
> > >> > > > > >>>> > > > >> > > > > > > have an invariant that the
> checkpoint
> > >> file
> > >> > > only
> > >> > > > > >>>> exists
> > >> > > > > >>>> > > when we
> > >> > > > > >>>> > > > >> > > _know_
> > >> > > > > >>>> > > > >> > > > > that
> > >> > > > > >>>> > > > >> > > > > > > the local store is consistent with
> the
> > >> > > > changelog,
> > >> > > > > >>>> and
> > >> > > > > >>>> > > this is
> > >> > > > > >>>> > > > >> why
> > >> > > > > >>>> > > > >> > > > > you're
> > >> > > > > >>>> > > > >> > > > > > > seeing so much bandwidth when
> > >> re-starting
> > >> > > from
> > >> > > > an
> > >> > > > > >>>> unclean
> > >> > > > > >>>> > > > >> shutdown.
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > Note that it's definitely possible
> to
> > do
> > >> > > better
> > >> > > > > >>>> than this,
> > >> > > > > >>>> > > > >> and we
> > >> > > > > >>>> > > > >> > > would
> > >> > > > > >>>> > > > >> > > > > > > very much like to improve it in the
> > >> future.
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > Thanks,
> > >> > > > > >>>> > > > >> > > > > > > -John
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16,
> > >> Alessandro
> > >> > > > > >>>> Tagliapietra
> > >> > > > > >>>> > > wrote:
> > >> > > > > >>>> > > > >> > > > > > > > Hi John,
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > thanks a lot for helping,
> regarding
> > >> your
> > >> > > > > message:
> > >> > > > > >>>> > > > >> > > > > > > >  - no we only have 1 instance of
> the
> > >> > stream
> > >> > > > > >>>> application,
> > >> > > > > >>>> > > > >> and it
> > >> > > > > >>>> > > > >> > > > > always
> > >> > > > > >>>> > > > >> > > > > > > > re-uses the same state folder
> > >> > > > > >>>> > > > >> > > > > > > >  - yes we're seeing most issues
> when
> > >> > > > restarting
> > >> > > > > >>>> not
> > >> > > > > >>>> > > > >> gracefully
> > >> > > > > >>>> > > > >> > > due
> > >> > > > > >>>> > > > >> > > > > > > exception
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > I've enabled trace logging and
> > >> filtering
> > >> > > by a
> > >> > > > > >>>> single
> > >> > > > > >>>> > > state
> > >> > > > > >>>> > > > >> store
> > >> > > > > >>>> > > > >> > > the
> > >> > > > > >>>> > > > >> > > > > > > > StoreChangelogReader messages are:
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > >> > > > > >>>> > > > >> > > > >
> sensors-stream-aggregate-store-changelog-0
> > >> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > >> > > > > >>>> > > > >> > > > >
> sensors-stream-aggregate-store-changelog-1
> > >> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > >> > > > > >>>> > > > >> > > > >
> sensors-stream-aggregate-store-changelog-2
> > >> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from
> > changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-2
> > >> > > > for
> > >> > > > > >>>> store
> > >> > > > > >>>> > > > >> > > aggregate-store,
> > >> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > >> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from
> > changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-1
> > >> > > > for
> > >> > > > > >>>> store
> > >> > > > > >>>> > > > >> > > aggregate-store,
> > >> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > >> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from
> > changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-0
> > >> > > > for
> > >> > > > > >>>> store
> > >> > > > > >>>> > > > >> > > aggregate-store,
> > >> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > >> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_2
> > state
> > >> > > store
> > >> > > > > >>>> > > aggregate-store
> > >> > > > > >>>> > > > >> > > > > changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-2
> > >> > > > with
> > >> > > > > >>>> EOS
> > >> > > > > >>>> > > turned
> > >> > > > > >>>> > > > >> on.
> > >> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and
> restore
> > >> its
> > >> > > state
> > >> > > > > >>>> from the
> > >> > > > > >>>> > > > >> beginning.
> > >> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_1
> > state
> > >> > > store
> > >> > > > > >>>> > > aggregate-store
> > >> > > > > >>>> > > > >> > > > > changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-1
> > >> > > > with
> > >> > > > > >>>> EOS
> > >> > > > > >>>> > > turned
> > >> > > > > >>>> > > > >> on.
> > >> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and
> restore
> > >> its
> > >> > > state
> > >> > > > > >>>> from the
> > >> > > > > >>>> > > > >> beginning.
> > >> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_0
> > state
> > >> > > store
> > >> > > > > >>>> > > aggregate-store
> > >> > > > > >>>> > > > >> > > > > changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-0
> > >> > > > with
> > >> > > > > >>>> EOS
> > >> > > > > >>>> > > turned
> > >> > > > > >>>> > > > >> on.
> > >> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and
> restore
> > >> its
> > >> > > state
> > >> > > > > >>>> from the
> > >> > > > > >>>> > > > >> beginning.
> > >> > > > > >>>> > > > >> > > > > > > > Found checkpoint 709937 from
> > changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-2
> > >> > > > for
> > >> > > > > >>>> store
> > >> > > > > >>>> > > > >> > > aggregate-store.
> > >> > > > > >>>> > > > >> > > > > > > > Restoring partition
> > >> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-2
> > >> > > > > >>>> > > > >> > > from
> > >> > > > > >>>> > > > >> > > > > > > offset
> > >> > > > > >>>> > > > >> > > > > > > > 709937 to endOffset 742799
> > >> > > > > >>>> > > > >> > > > > > > > Found checkpoint 3024234 from
> > >> changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-1
> > >> > > > for
> > >> > > > > >>>> store
> > >> > > > > >>>> > > > >> > > aggregate-store.
> > >> > > > > >>>> > > > >> > > > > > > > Restoring partition
> > >> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-1
> > >> > > > > >>>> > > > >> > > from
> > >> > > > > >>>> > > > >> > > > > > > offset
> > >> > > > > >>>> > > > >> > > > > > > > 3024234 to endOffset 3131513
> > >> > > > > >>>> > > > >> > > > > > > > Found checkpoint 14514072 from
> > >> changelog
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > sensors-stream-aggregate-store-changelog-0
> > >> > > > for
> > >> > > > > >>>> store
> > >> > > > > >>>> > > > >> > > aggregate-store.
> > >> > > > > >>>> > > > >> > > > > > > > Restoring partition
> > >> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-0
> > >> > > > > >>>> > > > >> > > from
> > >> > > > > >>>> > > > >> > > > > > > offset
> > >> > > > > >>>> > > > >> > > > > > > > 14514072 to endOffset 17116574
> > >> > > > > >>>> > > > >> > > > > > > > Restored from
> > >> > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to
> > >> > > > > >>>> > > > >> > > > > > > aggregate-store
> > >> > > > > >>>> > > > >> > > > > > > > with 966 records, ending offset is
> > >> > 711432,
> > >> > > > next
> > >> > > > > >>>> starting
> > >> > > > > >>>> > > > >> > > position is
> > >> > > > > >>>> > > > >> > > > > > > 711434
> > >> > > > > >>>> > > > >> > > > > > > > Restored from
> > >> > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to
> > >> > > > > >>>> > > > >> > > > > > > aggregate-store
> > >> > > > > >>>> > > > >> > > > > > > > with 914 records, ending offset is
> > >> > 712711,
> > >> > > > next
> > >> > > > > >>>> starting
> > >> > > > > >>>> > > > >> > > position is
> > >> > > > > >>>> > > > >> > > > > > > 712713
> > >> > > > > >>>> > > > >> > > > > > > > Restored from
> > >> > > > > >>>> > > sensors-stream-aggregate-store-changelog-1 to
> > >> > > > > >>>> > > > >> > > > > > > aggregate-store
> > >> > > > > >>>> > > > >> > > > > > > > with 18 records, ending offset is
> > >> > 3024261,
> > >> > > > next
> > >> > > > > >>>> starting
> > >> > > > > >>>> > > > >> > > position is
> > >> > > > > >>>> > > > >> > > > > > > 3024262
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > why it first says it didn't find
> the
> > >> > > > checkpoint
> > >> > > > > >>>> and
> > >> > > > > >>>> > > then it
> > >> > > > > >>>> > > > >> does
> > >> > > > > >>>> > > > >> > > > > find it?
> > >> > > > > >>>> > > > >> > > > > > > > It seems it loaded about  2.7M
> > records
> > >> > (sum
> > >> > > > of
> > >> > > > > >>>> offset
> > >> > > > > >>>> > > > >> difference
> > >> > > > > >>>> > > > >> > > in
> > >> > > > > >>>> > > > >> > > > > the
> > >> > > > > >>>> > > > >> > > > > > > > "restorting partition ...."
> > messages)
> > >> > > right?
> > >> > > > > >>>> > > > >> > > > > > > > Maybe should I try to reduce the
> > >> > checkpoint
> > >> > > > > >>>> interval?
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > Regards
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > --
> > >> > > > > >>>> > > > >> > > > > > > > Alessandro Tagliapietra
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM
> John
> > >> > > Roesler <
> > >> > > > > >>>> > > > >> vvcephei@apache.org
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > > > > wrote:
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > Hi Alessandro,
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > I'm sorry to hear that.
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > The restore process only takes
> one
> > >> > factor
> > >> > > > > into
> > >> > > > > >>>> > > account:
> > >> > > > > >>>> > > > >> the
> > >> > > > > >>>> > > > >> > > current
> > >> > > > > >>>> > > > >> > > > > > > offset
> > >> > > > > >>>> > > > >> > > > > > > > > position of the changelog topic
> is
> > >> > stored
> > >> > > > in
> > >> > > > > a
> > >> > > > > >>>> local
> > >> > > > > >>>> > > file
> > >> > > > > >>>> > > > >> > > > > alongside the
> > >> > > > > >>>> > > > >> > > > > > > > > state stores. On startup, the
> app
> > >> > checks
> > >> > > if
> > >> > > > > the
> > >> > > > > >>>> > > recorded
> > >> > > > > >>>> > > > >> > > position
> > >> > > > > >>>> > > > >> > > > > lags
> > >> > > > > >>>> > > > >> > > > > > > the
> > >> > > > > >>>> > > > >> > > > > > > > > latest offset in the changelog.
> If
> > >> so,
> > >> > > then
> > >> > > > > it
> > >> > > > > >>>> reads
> > >> > > > > >>>> > > the
> > >> > > > > >>>> > > > >> > > missing
> > >> > > > > >>>> > > > >> > > > > > > changelog
> > >> > > > > >>>> > > > >> > > > > > > > > records before starting
> > processing.
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > Thus, it would not restore any
> old
> > >> > window
> > >> > > > > data.
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > There might be a few different
> > >> things
> > >> > > going
> > >> > > > > on
> > >> > > > > >>>> to
> > >> > > > > >>>> > > explain
> > >> > > > > >>>> > > > >> your
> > >> > > > > >>>> > > > >> > > > > > > observation:
> > >> > > > > >>>> > > > >> > > > > > > > > * if there is more than one
> > >> instance in
> > >> > > > your
> > >> > > > > >>>> Streams
> > >> > > > > >>>> > > > >> cluster,
> > >> > > > > >>>> > > > >> > > > > maybe the
> > >> > > > > >>>> > > > >> > > > > > > > > task is "flopping" between
> > >> instances,
> > >> > so
> > >> > > > each
> > >> > > > > >>>> instance
> > >> > > > > >>>> > > > >> still
> > >> > > > > >>>> > > > >> > > has to
> > >> > > > > >>>> > > > >> > > > > > > recover
> > >> > > > > >>>> > > > >> > > > > > > > > state, since it wasn't the last
> > one
> > >> > > > actively
> > >> > > > > >>>> > > processing
> > >> > > > > >>>> > > > >> it.
> > >> > > > > >>>> > > > >> > > > > > > > > * if the application isn't
> stopped
> > >> > > > > gracefully,
> > >> > > > > >>>> it
> > >> > > > > >>>> > > might
> > >> > > > > >>>> > > > >> not
> > >> > > > > >>>> > > > >> > > get a
> > >> > > > > >>>> > > > >> > > > > > > chance
> > >> > > > > >>>> > > > >> > > > > > > > > to record its offset in that
> local
> > >> > file,
> > >> > > so
> > >> > > > > on
> > >> > > > > >>>> > > restart it
> > >> > > > > >>>> > > > >> has
> > >> > > > > >>>> > > > >> > > to
> > >> > > > > >>>> > > > >> > > > > > > restore
> > >> > > > > >>>> > > > >> > > > > > > > > some or all of the state store
> > from
> > >> > > > > changelog.
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > Or it could be something else;
> > >> that's
> > >> > > just
> > >> > > > > >>>> what comes
> > >> > > > > >>>> > > to
> > >> > > > > >>>> > > > >> mind.
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > If you want to get to the bottom
> > of
> > >> it,
> > >> > > you
> > >> > > > > >>>> can take a
> > >> > > > > >>>> > > > >> look at
> > >> > > > > >>>> > > > >> > > the
> > >> > > > > >>>> > > > >> > > > > > > logs,
> > >> > > > > >>>> > > > >> > > > > > > > > paying close attention to which
> > >> tasks
> > >> > are
> > >> > > > > >>>> assigned to
> > >> > > > > >>>> > > > >> which
> > >> > > > > >>>> > > > >> > > > > instances
> > >> > > > > >>>> > > > >> > > > > > > after
> > >> > > > > >>>> > > > >> > > > > > > > > each restart. You can also look
> > into
> > >> > the
> > >> > > > logs
> > >> > > > > >>>> from
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > >
> > >> > > > >
> > >> `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
> > >> > > > > >>>> > > > >> > > > > > > (might
> > >> > > > > >>>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE
> > >> level
> > >> > to
> > >> > > > > >>>> really see
> > >> > > > > >>>> > > > >> what's
> > >> > > > > >>>> > > > >> > > > > happening).
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > I hope this helps!
> > >> > > > > >>>> > > > >> > > > > > > > > -John
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25,
> > >> > Alessandro
> > >> > > > > >>>> Tagliapietra
> > >> > > > > >>>> > > > >> wrote:
> > >> > > > > >>>> > > > >> > > > > > > > > > Hello everyone,
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > > we're having a problem with
> > >> bandwidth
> > >> > > > usage
> > >> > > > > >>>> on
> > >> > > > > >>>> > > streams
> > >> > > > > >>>> > > > >> > > > > application
> > >> > > > > >>>> > > > >> > > > > > > > > startup,
> > >> > > > > >>>> > > > >> > > > > > > > > > our current setup does this:
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > > ...
> > >> > > > > >>>> > > > >> > > > > > > > > > .groupByKey()
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>>
> > >> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > >> > > > > >>>> > > > >> > > > > > > > > > .aggregate(
> > >> > > > > >>>> > > > >> > > > > > > > > >         {
> > >> > > MetricSequenceList(ArrayList())
> > >> > > > > },
> > >> > > > > >>>> > > > >> > > > > > > > > >         { key, value,
> aggregate
> > ->
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > >  aggregate.getRecords().add(value)
> > >> > > > > >>>> > > > >> > > > > > > > > >             aggregate
> > >> > > > > >>>> > > > >> > > > > > > > > >         },
> > >> > > > > >>>> > > > >> > > > > > > > > >
> >  Materialized.`as`<String,
> > >> > > > > >>>> > > MetricSequenceList,
> > >> > > > > >>>> > > > >> > > > > > > WindowStore<Bytes,
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > >
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > > > >>
> > >> > > > > >>>> > >
> > >> > > > > >>>>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > >> > > > > >>>> > > > >> > > > > > > > > > )
> > >> > > > > >>>> > > > >> > > > > > > > > > .toStream()
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> .flatTransform(TransformerSupplier {
> > >> > > > > >>>> > > > >> > > > > > > > > > ...
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > > basically in each window we
> > append
> > >> > the
> > >> > > > new
> > >> > > > > >>>> values
> > >> > > > > >>>> > > and
> > >> > > > > >>>> > > > >> then do
> > >> > > > > >>>> > > > >> > > > > some
> > >> > > > > >>>> > > > >> > > > > > > other
> > >> > > > > >>>> > > > >> > > > > > > > > > logic with the array of
> windowed
> > >> > > values.
> > >> > > > > >>>> > > > >> > > > > > > > > > The aggregate-store changelog
> > >> topic
> > >> > > > > >>>> configuration
> > >> > > > > >>>> > > uses
> > >> > > > > >>>> > > > >> > > > > > > compact,delete as
> > >> > > > > >>>> > > > >> > > > > > > > > > cleanup policy and has 12
> hours
> > of
> > >> > > > > retention.
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > > What we've seen is that on
> > >> > application
> > >> > > > > >>>> startup it
> > >> > > > > >>>> > > takes
> > >> > > > > >>>> > > > >> a
> > >> > > > > >>>> > > > >> > > couple
> > >> > > > > >>>> > > > >> > > > > > > minutes
> > >> > > > > >>>> > > > >> > > > > > > > > to
> > >> > > > > >>>> > > > >> > > > > > > > > > rebuild the state store, even
> if
> > >> the
> > >> > > > state
> > >> > > > > >>>> store
> > >> > > > > >>>> > > > >> directory is
> > >> > > > > >>>> > > > >> > > > > > > persisted
> > >> > > > > >>>> > > > >> > > > > > > > > > across restarts. That along
> with
> > >> an
> > >> > > > > >>>> exception that
> > >> > > > > >>>> > > > >> caused the
> > >> > > > > >>>> > > > >> > > > > docker
> > >> > > > > >>>> > > > >> > > > > > > > > > container to be restarted a
> > couple
> > >> > > > hundreds
> > >> > > > > >>>> times
> > >> > > > > >>>> > > > >> caused a
> > >> > > > > >>>> > > > >> > > big
> > >> > > > > >>>> > > > >> > > > > > > confluent
> > >> > > > > >>>> > > > >> > > > > > > > > > cloud bill compared to what we
> > >> > usually
> > >> > > > > spend
> > >> > > > > >>>> (1/4
> > >> > > > > >>>> > > of a
> > >> > > > > >>>> > > > >> full
> > >> > > > > >>>> > > > >> > > > > month in
> > >> > > > > >>>> > > > >> > > > > > > 1
> > >> > > > > >>>> > > > >> > > > > > > > > day).
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > > What I think is happening is
> > that
> > >> the
> > >> > > > topic
> > >> > > > > >>>> is
> > >> > > > > >>>> > > keeping
> > >> > > > > >>>> > > > >> all
> > >> > > > > >>>> > > > >> > > the
> > >> > > > > >>>> > > > >> > > > > > > previous
> > >> > > > > >>>> > > > >> > > > > > > > > > windows even with the
> compacting
> > >> > policy
> > >> > > > > >>>> because each
> > >> > > > > >>>> > > > >> key is
> > >> > > > > >>>> > > > >> > > the
> > >> > > > > >>>> > > > >> > > > > > > original
> > >> > > > > >>>> > > > >> > > > > > > > > > key + the timestamp not just
> the
> > >> key.
> > >> > > > Since
> > >> > > > > >>>> we don't
> > >> > > > > >>>> > > > >> care
> > >> > > > > >>>> > > > >> > > about
> > >> > > > > >>>> > > > >> > > > > > > previous
> > >> > > > > >>>> > > > >> > > > > > > > > > windows as the flatTransform
> > after
> > >> > the
> > >> > > > > >>>> toStream()
> > >> > > > > >>>> > > makes
> > >> > > > > >>>> > > > >> sure
> > >> > > > > >>>> > > > >> > > > > that we
> > >> > > > > >>>> > > > >> > > > > > > > > don't
> > >> > > > > >>>> > > > >> > > > > > > > > > process old windows (a custom
> > >> > > suppressor
> > >> > > > > >>>> basically)
> > >> > > > > >>>> > > is
> > >> > > > > >>>> > > > >> there
> > >> > > > > >>>> > > > >> > > a
> > >> > > > > >>>> > > > >> > > > > way to
> > >> > > > > >>>> > > > >> > > > > > > > > only
> > >> > > > > >>>> > > > >> > > > > > > > > > keep the last window so that
> the
> > >> > store
> > >> > > > > >>>> rebuilding
> > >> > > > > >>>> > > goes
> > >> > > > > >>>> > > > >> > > faster and
> > >> > > > > >>>> > > > >> > > > > > > without
> > >> > > > > >>>> > > > >> > > > > > > > > > rebuilding old windows too? Or
> > >> > should I
> > >> > > > > >>>> create a
> > >> > > > > >>>> > > custom
> > >> > > > > >>>> > > > >> > > window
> > >> > > > > >>>> > > > >> > > > > using
> > >> > > > > >>>> > > > >> > > > > > > the
> > >> > > > > >>>> > > > >> > > > > > > > > > original key as key so that
> the
> > >> > > > compaction
> > >> > > > > >>>> keeps
> > >> > > > > >>>> > > only
> > >> > > > > >>>> > > > >> the
> > >> > > > > >>>> > > > >> > > last
> > >> > > > > >>>> > > > >> > > > > window
> > >> > > > > >>>> > > > >> > > > > > > > > data?
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > > Thank you
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > > > --
> > >> > > > > >>>> > > > >> > > > > > > > > > Alessandro Tagliapietra
> > >> > > > > >>>> > > > >> > > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > > >
> > >> > > > > >>>> > > > >> > > > > > >
> > >> > > > > >>>> > > > >> > > > > >
> > >> > > > > >>>> > > > >> > > > >
> > >> > > > > >>>> > > > >> > > >
> > >> > > > > >>>> > > > >> > >
> > >> > > > > >>>> > > > >> >
> > >> > > > > >>>> > > > >>
> > >> > > > > >>>> > > > >
> > >> > > > > >>>> > > >
> > >> > > > > >>>> > >
> > >> > > > > >>>> >
> > >> > > > > >>>>
> > >> > > > > >>>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message