kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com>
Subject Re: Reducing streams startup bandwidth usage
Date Wed, 11 Dec 2019 03:04:02 GMT
Just an update since it has been happening again now and I have some more
metrics to show, the topology is this:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [sensors])
      --> KSTREAM-TRANSFORMVALUES-0000000001
    Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [new-data-store])
      --> KSTREAM-FLATMAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FLATMAPVALUES-0000000002 (stores: [])
      --> KSTREAM-TRANSFORMVALUES-0000000003
      <-- KSTREAM-TRANSFORMVALUES-0000000001
    Processor: KSTREAM-TRANSFORMVALUES-0000000003 (stores: [LastValueStore])
      --> KSTREAM-FILTER-0000000004
      <-- KSTREAM-FLATMAPVALUES-0000000002
    Processor: KSTREAM-FILTER-0000000004 (stores: [])
      --> KSTREAM-AGGREGATE-0000000005
      <-- KSTREAM-TRANSFORMVALUES-0000000003
    Processor: KSTREAM-AGGREGATE-0000000005 (stores: [aggregate-store])
      --> KTABLE-TOSTREAM-0000000006
      <-- KSTREAM-FILTER-0000000004
    Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
      --> KSTREAM-TRANSFORM-0000000007
      <-- KSTREAM-AGGREGATE-0000000005
    Processor: KSTREAM-TRANSFORM-0000000007 (stores: [suppress-store])
      --> KSTREAM-MAP-0000000008
      <-- KTABLE-TOSTREAM-0000000006
    Processor: KSTREAM-MAP-0000000008 (stores: [])
      --> KSTREAM-PRINTER-0000000009, KSTREAM-SINK-0000000010
      <-- KSTREAM-TRANSFORM-0000000007
    Processor: KSTREAM-PRINTER-0000000009 (stores: [])
      --> none
      <-- KSTREAM-MAP-0000000008
    Sink: KSTREAM-SINK-0000000010 (topic: sensors-output)
      <-- KSTREAM-MAP-0000000008

 - https://imgur.com/R3Pqypo this shows that the input source topic has the
same rate of messages
 - https://imgur.com/BTwq09p this is the number of records processed by
each processor node, at first there are 3 processor nodes
kstream-transformvalues-3, kstream-filter-4, kstream-aggregate-5 processing
4/5k records/min, then ktable-tostream-6 and kstream-transform-7 rump up
and the previous ones slow down due the higher load
 - https://imgur.com/5eXpf8l the state stores cache rate starts to decrease
 - https://imgur.com/dwFOb2g put and fetch operations of the window store
almost remain the same (maybe lowers due higher load)
 - https://imgur.com/1XZmMW5 commit latency increases
 - https://imgur.com/UdBpOVU commit rate stays almost the same
 - https://imgur.com/UJ3JB4f process latency increases
 - https://imgur.com/55YVmy2 process rate stays the same
 - https://imgur.com/GMJ3eGV sent records increase because of aggregate and
suppress store changelog records
 - https://imgur.com/XDm2kX6 sent bytes for those changelog topics increase

(full album https://imgur.com/a/tXlJJEO)

Any other metric that might be important?

It seems that the issue is between the aggregate and Ktable.toStream()

After a restart as expected usage go back to normal values

--
Alessandro Tagliapietra


On Mon, Dec 9, 2019 at 7:22 PM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> You're saying that with a 100ms commit interval, caching won't help
> because it would still send the compacted changes to the changelog every
> 100ms?
>
> Regarding the custom state store I'll look into that because I didn't go
> much further than transformers and stores in my kafka experience so I'll
> need to understand better what that implies.
>
> Yeah I only have one window per key in the store.
>
> The only thing I don't understand is why cache works 80% of the time and
> then suddenly the changelog sent bytes increase 90x.
> I mean, if cache wasn't working, why enabling it in our pipeline decreased
> the sent bytes from 30-40MB/minute to 400KB/minute?
>
> I'll look into the custom state store tho.
>
> Thanks
>
> --
> Alessandro Tagliapietra
>
>
>
> On Mon, Dec 9, 2019 at 7:02 PM Sophie Blee-Goldman <sophie@confluent.io>
> wrote:
>
>> Alright, well I see why you have so much data being sent to the changelog
>> if each
>> update involves appending to a list and then writing in the whole list.
>> And
>> with 340
>> records/minute I'm actually not sure how the cache could really help at
>> all
>> when it's
>> being flushed every 100ms.
>>
>> Here's kind of a wild idea, if you really only need append semantics: what
>> if you wrote
>> a custom StateStore that wrapped the normal RocksDBStore (or
>> RocksDBWindowStore)
>> and did the append for you under the hood? The changelogging layer sits
>> between the
>> layer that you would call #put on in your transformer and the final layer
>> that actually writes
>> to the underlying storage engine. If you insert an extra layer and modify
>> your transformer
>> to only call put on the new data (rather than the entire list) then only
>> this new data will get
>> sent to the changelog. Your custom storage layer will know it's actually
>> append semantics,
>> and add the new data to the existing list before sending it on to RocksDB.
>>
>> Since you only ever have one window per key in the store (right?) you just
>> need to make
>> sure that nothing from the current window gets deleted prematurely. You'd
>> want to turn off
>> compaction on the changelog and caching on the store of course, and maybe
>> give the
>> changelog some extra retention time to be safe.
>>
>> Obviously I haven't thoroughly verified this alternative, but it seems
>> like
>> this approach (or
>> something to its effect) could help you cut down on the changelog data.
>> WDYT?
>>
>> On Mon, Dec 9, 2019 at 4:35 PM Alessandro Tagliapietra <
>> tagliapietra.alessandro@gmail.com> wrote:
>>
>> > Hi Sophie,
>> >
>> > Just to give a better context, yes we use EOS and the problem happens in
>> > our aggregation store.
>> > Basically when windowing data we append each record into a list that's
>> > stored in the aggregation store.
>> > We have 2 versions, in production we use the kafka streams windowing
>> API,
>> > in staging we manually calculate the window end timestamp and aggregate
>> > using that timestamp.
>> >
>> > To give you an example of the staging code, it's a simple transformer
>> that:
>> >  - if incoming data fits in the same window as the data in store, append
>> > the data to the existing store list overwriting the same key and
>> nothing is
>> > sent downstream
>> >  - if incoming data has a timestamp smaller than the existing store
>> data,
>> > discard the record
>> >  - if incoming data has a timestamp bigger than the existing store data,
>> > send the stored list downstream and store the new window data into the
>> > store
>> >
>> > This way we don't use multiple keys (kafka streams instead uses a store
>> > where each key is stream-key + window key) as we overwrite the store
>> data
>> > using the same key over and over.
>> > So what I would expect is that since we're overwriting the same keys
>> there
>> > isn't more  and more data to be cached as the number of keys are always
>> the
>> > same and we don't really need to cache more data over time.
>> >
>> > To respond to your questions:
>> >  - yes when I say that cache "stopped/started" working I mean that at
>> some
>> > point the store started sending more and more data to che changelog
>> topic
>> > and then suddenly stopped again even without a restart (a restart always
>> > fixes the problem).
>> >  - Yes there are no density changes in the input stream, I've checked
>> the
>> > number of records sent to the stream input topic and there is a
>> variation
>> > of ~10-20 records per minute on an average of 340 records per minute.
>> Most
>> > of the records are also generated by simulators with very predictable
>> > output rate.
>> >
>> > In the meantime I've enabled reporting of debug metrics (so including
>> cache
>> > hit ratio) to hopefully get better insights the next time it happens.
>> >
>> > Thank you in advance
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> > On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman <sophie@confluent.io
>> >
>> > wrote:
>> >
>> > > It's an LRU cache, so once it gets full new records will cause older
>> ones
>> > > to be evicted (and thus sent
>> > > downstream). Of course this should only apply to records of a
>> different
>> > > key, otherwise it will just cause
>> > > an update of that key in the cache.
>> > >
>> > > I missed that you were using EOS, given the short commit interval it's
>> > hard
>> > > to see those effects.
>> > > When you say that it stopped working and then appeared to start
>> working
>> > > again, are you just
>> > > referring to the amount of data being sent to the changelog? And you
>> can
>> > > definitely rule out differences
>> > > in the density of updates in the input stream?
>> > >
>> > >
>> > >
>> > > On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra <
>> > > tagliapietra.alessandro@gmail.com> wrote:
>> > >
>> > > > Hi Sophie,
>> > > >
>> > > > thanks fo helping.
>> > > >
>> > > > By eviction of older records you mean they get flushed to the
>> changelog
>> > > > topic?
>> > > > Or the cache is just full and so all new records go to the changelog
>> > > topic
>> > > > until the old ones are evicted?
>> > > >
>> > > > Regarding the timing, what timing do you mean? Between when the
>> cache
>> > > stops
>> > > > and starts working again? We're using EOS os I believe the commit
>> > > interval
>> > > > is every 100ms.
>> > > >
>> > > > Regards
>> > > >
>> > > > --
>> > > > Alessandro Tagliapietra
>> > > >
>> > > >
>> > > >
>> > > > On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman <
>> > sophie@confluent.io
>> > > >
>> > > > wrote:
>> > > >
>> > > > > It might be that the cache appears to "stop working" because it
>> gets
>> > > > full,
>> > > > > and each
>> > > > > new update causes an eviction (of some older record). This would
>> also
>> > > > > explain the
>> > > > > opposite behavior, that it "starts working" again after some time
>> > > without
>> > > > > being restarted,
>> > > > > since the cache is completely flushed on commit. Does the timing
>> seem
>> > > to
>> > > > > align with your
>> > > > > commit interval (default is 30s)?
>> > > > >
>> > > > > On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
>> > > > > tagliapietra.alessandro@gmail.com> wrote:
>> > > > >
>> > > > > > And it seems that for some reason after a while caching works
>> again
>> > > > > > without a restart of the streams application
>> > > > > >
>> > > > > > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
>> > > > > >
>> > > > > > I'll try to enable debug metrics and see if I can find something
>> > > useful
>> > > > > > there.
>> > > > > > Any idea is appreciated in the meantime :)
>> > > > > >
>> > > > > > --
>> > > > > > Alessandro Tagliapietra
>> > > > > >
>> > > > > > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
>> > > > > > tagliapietra.alessandro@gmail.com> wrote:
>> > > > > >
>> > > > > >> It seems that even with caching enabled, after a while the sent
>> > > bytes
>> > > > > >> stil go up
>> > > > > >>
>> > > > > >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
>> > > > > >>
>> > > > > >> you can see the deploy when I've enabled caching but it looks
>> like
>> > > > it's
>> > > > > >> still a temporary solution.
>> > > > > >>
>> > > > > >> --
>> > > > > >> Alessandro Tagliapietra
>> > > > > >>
>> > > > > >>
>> > > > > >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
>> > > > > >> tagliapietra.alessandro@gmail.com> wrote:
>> > > > > >>
>> > > > > >>> Could be, but since we have a limite amount of input keys
>> (~30),
>> > > > > >>> windowing generates new keys but old ones are never touched
>> again
>> > > > > since the
>> > > > > >>> data per key is in order, I assume it shouldn't be a big deal
>> for
>> > > it
>> > > > to
>> > > > > >>> handle 30 keys
>> > > > > >>> I'll have a look at cache metrics and see if something pops
>> out
>> > > > > >>>
>> > > > > >>> Thanks
>> > > > > >>>
>> > > > > >>> --
>> > > > > >>> Alessandro Tagliapietra
>> > > > > >>>
>> > > > > >>>
>> > > > > >>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler <
>> > vvcephei@apache.org>
>> > > > > >>> wrote:
>> > > > > >>>
>> > > > > >>>> Hmm, that’s a good question. Now that we’re talking about
>> > > caching, I
>> > > > > >>>> wonder if the cache was just too small. It’s not very big by
>> > > > default.
>> > > > > >>>>
>> > > > > >>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
>> > > > > >>>> > Ok I'll check on that!
>> > > > > >>>> >
>> > > > > >>>> > Now I can see that with caching we went from 3-4MB/s to
>> > 400KB/s,
>> > > > > that
>> > > > > >>>> will
>> > > > > >>>> > help with the bill.
>> > > > > >>>> >
>> > > > > >>>> > Last question, any reason why after a while the regular
>> > windowed
>> > > > > >>>> stream
>> > > > > >>>> > starts sending every update instead of caching?
>> > > > > >>>> > Could it be because it doesn't have any more memory
>> available?
>> > > Any
>> > > > > >>>> other
>> > > > > >>>> > possible reason?
>> > > > > >>>> >
>> > > > > >>>> > Thank you so much for your help
>> > > > > >>>> >
>> > > > > >>>> > --
>> > > > > >>>> > Alessandro Tagliapietra
>> > > > > >>>> >
>> > > > > >>>> >
>> > > > > >>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <
>> > > vvcephei@apache.org>
>> > > > > >>>> wrote:
>> > > > > >>>> >
>> > > > > >>>> > > Ah, yes. Glad you figured it out!
>> > > > > >>>> > >
>> > > > > >>>> > > Caching does not reduce EOS guarantees at all. I highly
>> > > > recommend
>> > > > > >>>> using
>> > > > > >>>> > > it. You might even want to take a look at the caching
>> > metrics
>> > > to
>> > > > > >>>> make sure
>> > > > > >>>> > > you have a good hit ratio.
>> > > > > >>>> > >
>> > > > > >>>> > > -John
>> > > > > >>>> > >
>> > > > > >>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra
>> > wrote:
>> > > > > >>>> > > > Never mind I've found out I can use
>> `.withCachingEnabled`
>> > on
>> > > > the
>> > > > > >>>> store
>> > > > > >>>> > > > builder to achieve the same thing as the windowing
>> example
>> > > as
>> > > > > >>>> > > > `Materialized.as` turns that on by default.
>> > > > > >>>> > > >
>> > > > > >>>> > > > Does caching in any way reduces the EOS guarantees?
>> > > > > >>>> > > >
>> > > > > >>>> > > > --
>> > > > > >>>> > > > Alessandro Tagliapietra
>> > > > > >>>> > > >
>> > > > > >>>> > > >
>> > > > > >>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra
>> <
>> > > > > >>>> > > > tagliapietra.alessandro@gmail.com> wrote:
>> > > > > >>>> > > >
>> > > > > >>>> > > > > Seems my journey with this isn't done just yet,
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > This seems very complicated to me but I'll try to
>> > explain
>> > > it
>> > > > > as
>> > > > > >>>> best I
>> > > > > >>>> > > can.
>> > > > > >>>> > > > > To better understand the streams network usage I've
>> used
>> > > > > >>>> prometheus
>> > > > > >>>> > > with
>> > > > > >>>> > > > > the JMX exporter to export kafka metrics.
>> > > > > >>>> > > > > To check the amount of data we use I'm looking at the
>> > > > > increments
>> > > > > >>>> > > > > of kafka_producer_topic_metrics_byte_total and
>> > > > > >>>> > > > >
>> kafka_producer_producer_topic_metrics_record_send_total,
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > Our current (before the change mentioned above) code
>> > looks
>> > > > > like
>> > > > > >>>> this:
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > // This transformers just pairs a value with the
>> > previous
>> > > > one
>> > > > > >>>> storing
>> > > > > >>>> > > the
>> > > > > >>>> > > > > temporary one in a store
>> > > > > >>>> > > > > val pairsStream = metricStream
>> > > > > >>>> > > > >   .transformValues(ValueTransformerWithKeySupplier {
>> > > > > >>>> PairTransformer()
>> > > > > >>>> > > },
>> > > > > >>>> > > > > "LastValueStore")
>> > > > > >>>> > > > >   .filter { _, value: MetricSequence? -> value !=
>> null }
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > // Create a store to store suppressed windows until a
>> > new
>> > > > one
>> > > > > is
>> > > > > >>>> > > received
>> > > > > >>>> > > > > val suppressStoreSupplier =
>> > > > > >>>> > > > >
>> > > > > >>>> > >
>> > > > > >>>>
>> > > > >
>> > > >
>> > >
>> >
>> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
>> > > > > >>>> > > > > ......
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > // Window and aggregate data in 1 minute intervals
>> > > > > >>>> > > > > val aggregatedStream = pairsStream
>> > > > > >>>> > > > >   .groupByKey()
>> > > > > >>>> > > > >
>> > > > >  .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
>> > > > > >>>> > > > >   .aggregate(
>> > > > > >>>> > > > >           { MetricSequenceList(ArrayList()) },
>> > > > > >>>> > > > >           { key, value, aggregate ->
>> > > > > >>>> > > > >               aggregate.getRecords().add(value)
>> > > > > >>>> > > > >               aggregate
>> > > > > >>>> > > > >           },
>> > > > > >>>> > > > >           Materialized.`as`<String,
>> MetricSequenceList,
>> > > > > >>>> > > WindowStore<Bytes,
>> > > > > >>>> > > > >
>> > > > > >>>> > >
>> > > > > >>>>
>> > > > >
>> > > >
>> > >
>> >
>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>> > > > > >>>> > > > >   )
>> > > > > >>>> > > > >   .toStream()
>> > > > > >>>> > > > >   .flatTransform(TransformerSupplier {
>> > > > > >>>> > > > >       // This transformer basically waits until a new
>> > > window
>> > > > > is
>> > > > > >>>> > > received
>> > > > > >>>> > > > > to emit the previous one
>> > > > > >>>> > > > >   }, "suppress-store")
>> > > > > >>>> > > > >   .map { sensorId: String, suppressedOutput:
>> > > > SuppressedOutput
>> > > > > ->
>> > > > > >>>> > > > >       .... etc ....
>> > > > > >>>> > > > >
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > Basically:
>> > > > > >>>> > > > >  - all data goes through LastValueStore store that
>> > stores
>> > > > each
>> > > > > >>>> message
>> > > > > >>>> > > and
>> > > > > >>>> > > > > emits a pair with the previous one
>> > > > > >>>> > > > >  - the aggregate-store is used to store the
>> per-window
>> > > list
>> > > > of
>> > > > > >>>> > > messages in
>> > > > > >>>> > > > > the aggregate method
>> > > > > >>>> > > > >  - the suppress store is used to store each received
>> > > window
>> > > > > >>>> which is
>> > > > > >>>> > > > > emitted only after a newer one is received
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > What I'm experiencing is that:
>> > > > > >>>> > > > >  - during normal execution, the streams app sends to
>> the
>> > > > > >>>> lastvalue
>> > > > > >>>> > > store
>> > > > > >>>> > > > > changelog topic 5k messages/min, the aggregate and
>> > > suppress
>> > > > > >>>> store
>> > > > > >>>> > > changelog
>> > > > > >>>> > > > > topics only about 100
>> > > > > >>>> > > > >  - at some point (after many hours of operation), the
>> > > > streams
>> > > > > >>>> app
>> > > > > >>>> > > starts
>> > > > > >>>> > > > > sending to the aggregate and suppress store changelog
>> > > topic
>> > > > > the
>> > > > > >>>> same
>> > > > > >>>> > > amount
>> > > > > >>>> > > > > of messages going through the lastvaluestore
>> > > > > >>>> > > > >  - if I restart the streams app it goes back to the
>> > > initial
>> > > > > >>>> behavior
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > You can see the behavior in this graph
>> > > > > >>>> https://imgur.com/dJcUNSf
>> > > > > >>>> > > > > You can also see that after a restart everything goes
>> > back
>> > > > to
>> > > > > >>>> normal
>> > > > > >>>> > > > > levels.
>> > > > > >>>> > > > > Regarding other metrics, process latency increases,
>> poll
>> > > > > latency
>> > > > > >>>> > > > > decreases, poll rate decreases, commit rate stays the
>> > same
>> > > > > >>>> while commit
>> > > > > >>>> > > > > latency increases.
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > Now, I've these questions:
>> > > > > >>>> > > > >  - why isn't the aggregate/suppress store changelog
>> > topic
>> > > > > >>>> throughput
>> > > > > >>>> > > the
>> > > > > >>>> > > > > same as the LastValueStore? Shouldn't every time it
>> > > > aggregates
>> > > > > >>>> send a
>> > > > > >>>> > > > > record to the changelog?
>> > > > > >>>> > > > >  - is the windowing doing some internal caching like
>> not
>> > > > > >>>> sending every
>> > > > > >>>> > > > > aggregation record until the window time is passed?
>> (if
>> > > so,
>> > > > > >>>> where can I
>> > > > > >>>> > > > > find that code since I would like to use that also
>> for
>> > our
>> > > > new
>> > > > > >>>> > > > > implementation)
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > Thank you in advance
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > --
>> > > > > >>>> > > > > Alessandro Tagliapietra
>> > > > > >>>> > > > >
>> > > > > >>>> > > > >
>> > > > > >>>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <
>> > > > > >>>> vvcephei@apache.org>
>> > > > > >>>> > > wrote:
>> > > > > >>>> > > > >
>> > > > > >>>> > > > >> Oh, good!
>> > > > > >>>> > > > >>
>> > > > > >>>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro
>> Tagliapietra
>> > > > wrote:
>> > > > > >>>> > > > >> > Testing on staging shows that a restart on
>> exception
>> > is
>> > > > > much
>> > > > > >>>> faster
>> > > > > >>>> > > and
>> > > > > >>>> > > > >> the
>> > > > > >>>> > > > >> > stream starts right away which I think means we're
>> > > > reading
>> > > > > >>>> way less
>> > > > > >>>> > > data
>> > > > > >>>> > > > >> > than before!
>> > > > > >>>> > > > >> >
>> > > > > >>>> > > > >> > What I was referring to is that, in Streams, the
>> keys
>> > > for
>> > > > > >>>> window
>> > > > > >>>> > > > >> > > aggregation state is actually composed of both
>> the
>> > > > window
>> > > > > >>>> itself
>> > > > > >>>> > > and
>> > > > > >>>> > > > >> the
>> > > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>".
>> That
>> > > > > results
>> > > > > >>>> in the
>> > > > > >>>> > > > >> store
>> > > > > >>>> > > > >> > > having a unique key per window for each K,
>> which is
>> > > why
>> > > > > we
>> > > > > >>>> need
>> > > > > >>>> > > > >> retention
>> > > > > >>>> > > > >> > > as well as compaction for our changelogs. But
>> for
>> > > you,
>> > > > if
>> > > > > >>>> you just
>> > > > > >>>> > > > >> make the
>> > > > > >>>> > > > >> > > key "K", then compaction alone should do the
>> trick.
>> > > > > >>>> > > > >> >
>> > > > > >>>> > > > >> > Yes we had compact,delete as cleanup policy but
>> > > probably
>> > > > it
>> > > > > >>>> still
>> > > > > >>>> > > had a
>> > > > > >>>> > > > >> too
>> > > > > >>>> > > > >> > long retention value, also the rocksdb store is
>> > > probably
>> > > > > much
>> > > > > >>>> > > faster now
>> > > > > >>>> > > > >> > having only one key per key instead of one key per
>> > > window
>> > > > > >>>> per key.
>> > > > > >>>> > > > >> >
>> > > > > >>>> > > > >> > Thanks a lot for helping! I'm now going to setup a
>> > > > > >>>> prometheus-jmx
>> > > > > >>>> > > > >> > monitoring so we can keep better track of what's
>> > going
>> > > on
>> > > > > :)
>> > > > > >>>> > > > >> >
>> > > > > >>>> > > > >> > --
>> > > > > >>>> > > > >> > Alessandro Tagliapietra
>> > > > > >>>> > > > >> >
>> > > > > >>>> > > > >> >
>> > > > > >>>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <
>> > > > > >>>> vvcephei@apache.org>
>> > > > > >>>> > > > >> wrote:
>> > > > > >>>> > > > >> >
>> > > > > >>>> > > > >> > > Oh, yeah, I remember that conversation!
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > > > >> > > Yes, then, I agree, if you're only storing
>> state of
>> > > the
>> > > > > >>>> most
>> > > > > >>>> > > recent
>> > > > > >>>> > > > >> window
>> > > > > >>>> > > > >> > > for each key, and the key you use for that
>> state is
>> > > > > >>>> actually the
>> > > > > >>>> > > key
>> > > > > >>>> > > > >> of the
>> > > > > >>>> > > > >> > > records, then an aggressive compaction policy
>> plus
>> > > your
>> > > > > >>>> custom
>> > > > > >>>> > > > >> transformer
>> > > > > >>>> > > > >> > > seems like a good way forward.
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > > > >> > > What I was referring to is that, in Streams, the
>> > keys
>> > > > for
>> > > > > >>>> window
>> > > > > >>>> > > > >> > > aggregation state is actually composed of both
>> the
>> > > > window
>> > > > > >>>> itself
>> > > > > >>>> > > and
>> > > > > >>>> > > > >> the
>> > > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>".
>> That
>> > > > > results
>> > > > > >>>> in the
>> > > > > >>>> > > > >> store
>> > > > > >>>> > > > >> > > having a unique key per window for each K,
>> which is
>> > > why
>> > > > > we
>> > > > > >>>> need
>> > > > > >>>> > > > >> retention
>> > > > > >>>> > > > >> > > as well as compaction for our changelogs. But
>> for
>> > > you,
>> > > > if
>> > > > > >>>> you just
>> > > > > >>>> > > > >> make the
>> > > > > >>>> > > > >> > > key "K", then compaction alone should do the
>> trick.
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > > > >> > > And yes, if you manage the topic yourself, then
>> > > Streams
>> > > > > >>>> won't
>> > > > > >>>> > > adjust
>> > > > > >>>> > > > >> the
>> > > > > >>>> > > > >> > > retention time. I think it might validate that
>> the
>> > > > > >>>> retention
>> > > > > >>>> > > isn't too
>> > > > > >>>> > > > >> > > short, but I don't remember offhand.
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > > > >> > > Cheers, and let me know how it goes!
>> > > > > >>>> > > > >> > > -John
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro
>> > > Tagliapietra
>> > > > > >>>> wrote:
>> > > > > >>>> > > > >> > > > Hi John,
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > > > afaik grace period uses stream time
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > > > >>
>> > > > > >>>> > >
>> > > > > >>>>
>> > > > >
>> > > >
>> > >
>> >
>> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
>> > > > > >>>> > > > >> > > > which is
>> > > > > >>>> > > > >> > > > per partition, unfortunately we process data
>> > that's
>> > > > not
>> > > > > >>>> in sync
>> > > > > >>>> > > > >> between
>> > > > > >>>> > > > >> > > > keys so each key needs to be independent and a
>> > key
>> > > > can
>> > > > > >>>> have much
>> > > > > >>>> > > > >> older
>> > > > > >>>> > > > >> > > > data
>> > > > > >>>> > > > >> > > > than the other.
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > > > Having a small grace period would probably
>> close
>> > > old
>> > > > > >>>> windows
>> > > > > >>>> > > sooner
>> > > > > >>>> > > > >> than
>> > > > > >>>> > > > >> > > > expected. That's also why in my use case a
>> custom
>> > > > store
>> > > > > >>>> that
>> > > > > >>>> > > just
>> > > > > >>>> > > > >> stores
>> > > > > >>>> > > > >> > > > the last window data for each key might work
>> > > better.
>> > > > I
>> > > > > >>>> had the
>> > > > > >>>> > > same
>> > > > > >>>> > > > >> issue
>> > > > > >>>> > > > >> > > > with suppression and it has been reported here
>> > > > > >>>> > > > >> > > >
>> https://issues.apache.org/jira/browse/KAFKA-8769
>> > > > > >>>> > > > >> > > > Oh I just saw that you're the one that helped
>> me
>> > on
>> > > > > >>>> slack and
>> > > > > >>>> > > > >> created the
>> > > > > >>>> > > > >> > > > issue (thanks again for that).
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > > > The behavior that you mention about streams
>> > setting
>> > > > > >>>> changelog
>> > > > > >>>> > > > >> retention
>> > > > > >>>> > > > >> > > > time is something they do on creation of the
>> > topic
>> > > > when
>> > > > > >>>> the
>> > > > > >>>> > > broker
>> > > > > >>>> > > > >> has
>> > > > > >>>> > > > >> > > auto
>> > > > > >>>> > > > >> > > > creation enabled? Because we're using
>> confluent
>> > > cloud
>> > > > > >>>> and I had
>> > > > > >>>> > > to
>> > > > > >>>> > > > >> create
>> > > > > >>>> > > > >> > > > it manually.
>> > > > > >>>> > > > >> > > > Regarding the change in the recovery behavior,
>> > with
>> > > > > >>>> compact
>> > > > > >>>> > > cleanup
>> > > > > >>>> > > > >> > > policy
>> > > > > >>>> > > > >> > > > shouldn't the changelog only keep the last
>> value?
>> > > > That
>> > > > > >>>> would
>> > > > > >>>> > > make
>> > > > > >>>> > > > >> the
>> > > > > >>>> > > > >> > > > recovery faster and "cheaper" as it would only
>> > need
>> > > > to
>> > > > > >>>> read a
>> > > > > >>>> > > single
>> > > > > >>>> > > > >> > > value
>> > > > > >>>> > > > >> > > > per key (if the cleanup just happened) right?
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > > > --
>> > > > > >>>> > > > >> > > > Alessandro Tagliapietra
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler <
>> > > > > >>>> > > vvcephei@apache.org>
>> > > > > >>>> > > > >> wrote:
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > > > > Hey Alessandro,
>> > > > > >>>> > > > >> > > > >
>> > > > > >>>> > > > >> > > > > That sounds also like it would work. I'm
>> > > wondering
>> > > > if
>> > > > > >>>> it would
>> > > > > >>>> > > > >> actually
>> > > > > >>>> > > > >> > > > > change what you observe w.r.t. recovery
>> > behavior,
>> > > > > >>>> though.
>> > > > > >>>> > > Streams
>> > > > > >>>> > > > >> > > already
>> > > > > >>>> > > > >> > > > > sets the retention time on the changelog to
>> > equal
>> > > > the
>> > > > > >>>> > > retention
>> > > > > >>>> > > > >> time
>> > > > > >>>> > > > >> > > of the
>> > > > > >>>> > > > >> > > > > windows, for windowed aggregations, so you
>> > > > shouldn't
>> > > > > be
>> > > > > >>>> > > loading a
>> > > > > >>>> > > > >> lot
>> > > > > >>>> > > > >> > > of
>> > > > > >>>> > > > >> > > > > window data for old windows you no longer
>> care
>> > > > about.
>> > > > > >>>> > > > >> > > > >
>> > > > > >>>> > > > >> > > > > Have you set the "grace period" on your
>> window
>> > > > > >>>> definition? By
>> > > > > >>>> > > > >> default,
>> > > > > >>>> > > > >> > > it
>> > > > > >>>> > > > >> > > > > is set to 24 hours, but you can set it as
>> low
>> > as
>> > > > you
>> > > > > >>>> like.
>> > > > > >>>> > > E.g.,
>> > > > > >>>> > > > >> if you
>> > > > > >>>> > > > >> > > > > want to commit to having in-order data only,
>> > then
>> > > > you
>> > > > > >>>> can set
>> > > > > >>>> > > the
>> > > > > >>>> > > > >> grace
>> > > > > >>>> > > > >> > > > > period to zero. This _should_ let the broker
>> > > clean
>> > > > up
>> > > > > >>>> the
>> > > > > >>>> > > > >> changelog
>> > > > > >>>> > > > >> > > records
>> > > > > >>>> > > > >> > > > > as soon as the window ends.
>> > > > > >>>> > > > >> > > > >
>> > > > > >>>> > > > >> > > > > Of course, the log cleaner doesn't run all
>> the
>> > > > time,
>> > > > > so
>> > > > > >>>> > > there's
>> > > > > >>>> > > > >> some
>> > > > > >>>> > > > >> > > extra
>> > > > > >>>> > > > >> > > > > delay in which "expired" data would still be
>> > > > visible
>> > > > > >>>> in the
>> > > > > >>>> > > > >> changelog,
>> > > > > >>>> > > > >> > > but
>> > > > > >>>> > > > >> > > > > it would actually be just the same as if you
>> > > manage
>> > > > > >>>> the store
>> > > > > >>>> > > > >> yourself.
>> > > > > >>>> > > > >> > > > >
>> > > > > >>>> > > > >> > > > > Hope this helps!
>> > > > > >>>> > > > >> > > > > -John
>> > > > > >>>> > > > >> > > > >
>> > > > > >>>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro
>> > > > > Tagliapietra
>> > > > > >>>> wrote:
>> > > > > >>>> > > > >> > > > > > Thanks John for the explanation,
>> > > > > >>>> > > > >> > > > > >
>> > > > > >>>> > > > >> > > > > > I thought that with EOS enabled (which we
>> > have)
>> > > > it
>> > > > > >>>> would in
>> > > > > >>>> > > the
>> > > > > >>>> > > > >> worst
>> > > > > >>>> > > > >> > > > > case
>> > > > > >>>> > > > >> > > > > > find a valid checkpoint and start the
>> restore
>> > > > from
>> > > > > >>>> there
>> > > > > >>>> > > until
>> > > > > >>>> > > > >> it
>> > > > > >>>> > > > >> > > reached
>> > > > > >>>> > > > >> > > > > > the last committed status, not completely
>> > from
>> > > > > >>>> scratch. What
>> > > > > >>>> > > > >> you say
>> > > > > >>>> > > > >> > > > > > definitely makes sense now.
>> > > > > >>>> > > > >> > > > > > Since we don't really need old time
>> windows
>> > and
>> > > > we
>> > > > > >>>> ensure
>> > > > > >>>> > > data
>> > > > > >>>> > > > >> is
>> > > > > >>>> > > > >> > > ordered
>> > > > > >>>> > > > >> > > > > > when processed I think I"ll just write a
>> > custom
>> > > > > >>>> transformer
>> > > > > >>>> > > to
>> > > > > >>>> > > > >> keep
>> > > > > >>>> > > > >> > > only
>> > > > > >>>> > > > >> > > > > > the last window, store intermediate
>> > aggregation
>> > > > > >>>> results in
>> > > > > >>>> > > the
>> > > > > >>>> > > > >> store
>> > > > > >>>> > > > >> > > and
>> > > > > >>>> > > > >> > > > > > emit a new value only when we receive data
>> > > > > belonging
>> > > > > >>>> to a
>> > > > > >>>> > > new
>> > > > > >>>> > > > >> window.
>> > > > > >>>> > > > >> > > > > > That with a compact only changelog topic
>> > should
>> > > > > keep
>> > > > > >>>> the
>> > > > > >>>> > > rebuild
>> > > > > >>>> > > > >> > > data to
>> > > > > >>>> > > > >> > > > > > the minimum as it would have only the last
>> > > value
>> > > > > for
>> > > > > >>>> each
>> > > > > >>>> > > key.
>> > > > > >>>> > > > >> > > > > >
>> > > > > >>>> > > > >> > > > > > Hope that makes sense
>> > > > > >>>> > > > >> > > > > >
>> > > > > >>>> > > > >> > > > > > Thanks again
>> > > > > >>>> > > > >> > > > > >
>> > > > > >>>> > > > >> > > > > > --
>> > > > > >>>> > > > >> > > > > > Alessandro Tagliapietra
>> > > > > >>>> > > > >> > > > > >
>> > > > > >>>> > > > >> > > > > >
>> > > > > >>>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John
>> Roesler <
>> > > > > >>>> > > > >> vvcephei@apache.org>
>> > > > > >>>> > > > >> > > wrote:
>> > > > > >>>> > > > >> > > > > >
>> > > > > >>>> > > > >> > > > > > > Hi Alessandro,
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > > > To take a stab at your question, maybe
>> it
>> > > first
>> > > > > >>>> doesn't
>> > > > > >>>> > > find
>> > > > > >>>> > > > >> it,
>> > > > > >>>> > > > >> > > but
>> > > > > >>>> > > > >> > > > > then
>> > > > > >>>> > > > >> > > > > > > restores some data, writes the
>> checkpoint,
>> > > and
>> > > > > >>>> then later
>> > > > > >>>> > > on,
>> > > > > >>>> > > > >> it
>> > > > > >>>> > > > >> > > has to
>> > > > > >>>> > > > >> > > > > > > re-initialize the task for some reason,
>> and
>> > > > > that's
>> > > > > >>>> why it
>> > > > > >>>> > > does
>> > > > > >>>> > > > >> > > find a
>> > > > > >>>> > > > >> > > > > > > checkpoint then?
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > > > More to the heart of the issue, if you
>> have
>> > > EOS
>> > > > > >>>> enabled,
>> > > > > >>>> > > > >> Streams
>> > > > > >>>> > > > >> > > _only_
>> > > > > >>>> > > > >> > > > > > > records the checkpoint when the store is
>> > in a
>> > > > > >>>> > > known-consistent
>> > > > > >>>> > > > >> > > state.
>> > > > > >>>> > > > >> > > > > For
>> > > > > >>>> > > > >> > > > > > > example, if you have a graceful
>> shutdown,
>> > > > Streams
>> > > > > >>>> will
>> > > > > >>>> > > flush
>> > > > > >>>> > > > >> all
>> > > > > >>>> > > > >> > > the
>> > > > > >>>> > > > >> > > > > > > stores, commit all the transactions, and
>> > then
>> > > > > >>>> write the
>> > > > > >>>> > > > >> checkpoint
>> > > > > >>>> > > > >> > > > > file.
>> > > > > >>>> > > > >> > > > > > > Then, on re-start, it will pick up from
>> > that
>> > > > > >>>> checkpoint.
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > > > But as soon as it starts processing
>> > records,
>> > > it
>> > > > > >>>> removes
>> > > > > >>>> > > the
>> > > > > >>>> > > > >> > > checkpoint
>> > > > > >>>> > > > >> > > > > > > file, so if it crashes while it was
>> > > processing,
>> > > > > >>>> there is
>> > > > > >>>> > > no
>> > > > > >>>> > > > >> > > checkpoint
>> > > > > >>>> > > > >> > > > > file
>> > > > > >>>> > > > >> > > > > > > there, and it will have to restore from
>> the
>> > > > > >>>> beginning of
>> > > > > >>>> > > the
>> > > > > >>>> > > > >> > > changelog.
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > > > This design is there on purpose, because
>> > > > > otherwise
>> > > > > >>>> we
>> > > > > >>>> > > cannot
>> > > > > >>>> > > > >> > > actually
>> > > > > >>>> > > > >> > > > > > > guarantee correctness... For example, if
>> > you
>> > > > are
>> > > > > >>>> > > maintaining a
>> > > > > >>>> > > > >> > > count
>> > > > > >>>> > > > >> > > > > > > operation, and we process an input
>> record
>> > i,
>> > > > > >>>> increment the
>> > > > > >>>> > > > >> count
>> > > > > >>>> > > > >> > > and
>> > > > > >>>> > > > >> > > > > write
>> > > > > >>>> > > > >> > > > > > > it to the state store, and to the
>> changelog
>> > > > > topic.
>> > > > > >>>> But we
>> > > > > >>>> > > > >> crash
>> > > > > >>>> > > > >> > > before
>> > > > > >>>> > > > >> > > > > we
>> > > > > >>>> > > > >> > > > > > > commit that transaction. Then, the
>> write to
>> > > the
>> > > > > >>>> changelog
>> > > > > >>>> > > > >> would be
>> > > > > >>>> > > > >> > > > > aborted,
>> > > > > >>>> > > > >> > > > > > > and we would re-process record i .
>> However,
>> > > > we've
>> > > > > >>>> already
>> > > > > >>>> > > > >> updated
>> > > > > >>>> > > > >> > > the
>> > > > > >>>> > > > >> > > > > local
>> > > > > >>>> > > > >> > > > > > > state store, so when we increment it
>> again,
>> > > it
>> > > > > >>>> results in
>> > > > > >>>> > > > >> > > > > double-counting
>> > > > > >>>> > > > >> > > > > > > i. The key point here is that there's no
>> > way
>> > > to
>> > > > > do
>> > > > > >>>> an
>> > > > > >>>> > > atomic
>> > > > > >>>> > > > >> > > operation
>> > > > > >>>> > > > >> > > > > > > across two different systems (local
>> state
>> > > store
>> > > > > >>>> and the
>> > > > > >>>> > > > >> changelog
>> > > > > >>>> > > > >> > > > > topic).
>> > > > > >>>> > > > >> > > > > > > Since we can't guarantee that we roll
>> back
>> > > the
>> > > > > >>>> incremented
>> > > > > >>>> > > > >> count
>> > > > > >>>> > > > >> > > when
>> > > > > >>>> > > > >> > > > > the
>> > > > > >>>> > > > >> > > > > > > changelog transaction is aborted, we
>> can't
>> > > keep
>> > > > > >>>> the local
>> > > > > >>>> > > > >> store
>> > > > > >>>> > > > >> > > > > consistent
>> > > > > >>>> > > > >> > > > > > > with the changelog.
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > > > After a crash, the only way to ensure
>> the
>> > > local
>> > > > > >>>> store is
>> > > > > >>>> > > > >> consistent
>> > > > > >>>> > > > >> > > > > with
>> > > > > >>>> > > > >> > > > > > > the changelog is to discard the entire
>> > thing
>> > > > and
>> > > > > >>>> rebuild
>> > > > > >>>> > > it.
>> > > > > >>>> > > > >> This
>> > > > > >>>> > > > >> > > is
>> > > > > >>>> > > > >> > > > > why we
>> > > > > >>>> > > > >> > > > > > > have an invariant that the checkpoint
>> file
>> > > only
>> > > > > >>>> exists
>> > > > > >>>> > > when we
>> > > > > >>>> > > > >> > > _know_
>> > > > > >>>> > > > >> > > > > that
>> > > > > >>>> > > > >> > > > > > > the local store is consistent with the
>> > > > changelog,
>> > > > > >>>> and
>> > > > > >>>> > > this is
>> > > > > >>>> > > > >> why
>> > > > > >>>> > > > >> > > > > you're
>> > > > > >>>> > > > >> > > > > > > seeing so much bandwidth when
>> re-starting
>> > > from
>> > > > an
>> > > > > >>>> unclean
>> > > > > >>>> > > > >> shutdown.
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > > > Note that it's definitely possible to do
>> > > better
>> > > > > >>>> than this,
>> > > > > >>>> > > > >> and we
>> > > > > >>>> > > > >> > > would
>> > > > > >>>> > > > >> > > > > > > very much like to improve it in the
>> future.
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > > > Thanks,
>> > > > > >>>> > > > >> > > > > > > -John
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16,
>> Alessandro
>> > > > > >>>> Tagliapietra
>> > > > > >>>> > > wrote:
>> > > > > >>>> > > > >> > > > > > > > Hi John,
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > thanks a lot for helping, regarding
>> your
>> > > > > message:
>> > > > > >>>> > > > >> > > > > > > >  - no we only have 1 instance of the
>> > stream
>> > > > > >>>> application,
>> > > > > >>>> > > > >> and it
>> > > > > >>>> > > > >> > > > > always
>> > > > > >>>> > > > >> > > > > > > > re-uses the same state folder
>> > > > > >>>> > > > >> > > > > > > >  - yes we're seeing most issues when
>> > > > restarting
>> > > > > >>>> not
>> > > > > >>>> > > > >> gracefully
>> > > > > >>>> > > > >> > > due
>> > > > > >>>> > > > >> > > > > > > exception
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > I've enabled trace logging and
>> filtering
>> > > by a
>> > > > > >>>> single
>> > > > > >>>> > > state
>> > > > > >>>> > > > >> store
>> > > > > >>>> > > > >> > > the
>> > > > > >>>> > > > >> > > > > > > > StoreChangelogReader messages are:
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
>> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-0
>> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
>> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-1
>> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
>> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-2
>> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-2
>> > > > for
>> > > > > >>>> store
>> > > > > >>>> > > > >> > > aggregate-store,
>> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
>> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-1
>> > > > for
>> > > > > >>>> store
>> > > > > >>>> > > > >> > > aggregate-store,
>> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
>> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-0
>> > > > for
>> > > > > >>>> store
>> > > > > >>>> > > > >> > > aggregate-store,
>> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
>> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_2 state
>> > > store
>> > > > > >>>> > > aggregate-store
>> > > > > >>>> > > > >> > > > > changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-2
>> > > > with
>> > > > > >>>> EOS
>> > > > > >>>> > > turned
>> > > > > >>>> > > > >> on.
>> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore
>> its
>> > > state
>> > > > > >>>> from the
>> > > > > >>>> > > > >> beginning.
>> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_1 state
>> > > store
>> > > > > >>>> > > aggregate-store
>> > > > > >>>> > > > >> > > > > changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-1
>> > > > with
>> > > > > >>>> EOS
>> > > > > >>>> > > turned
>> > > > > >>>> > > > >> on.
>> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore
>> its
>> > > state
>> > > > > >>>> from the
>> > > > > >>>> > > > >> beginning.
>> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_0 state
>> > > store
>> > > > > >>>> > > aggregate-store
>> > > > > >>>> > > > >> > > > > changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-0
>> > > > with
>> > > > > >>>> EOS
>> > > > > >>>> > > turned
>> > > > > >>>> > > > >> on.
>> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore
>> its
>> > > state
>> > > > > >>>> from the
>> > > > > >>>> > > > >> beginning.
>> > > > > >>>> > > > >> > > > > > > > Found checkpoint 709937 from changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-2
>> > > > for
>> > > > > >>>> store
>> > > > > >>>> > > > >> > > aggregate-store.
>> > > > > >>>> > > > >> > > > > > > > Restoring partition
>> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-2
>> > > > > >>>> > > > >> > > from
>> > > > > >>>> > > > >> > > > > > > offset
>> > > > > >>>> > > > >> > > > > > > > 709937 to endOffset 742799
>> > > > > >>>> > > > >> > > > > > > > Found checkpoint 3024234 from
>> changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-1
>> > > > for
>> > > > > >>>> store
>> > > > > >>>> > > > >> > > aggregate-store.
>> > > > > >>>> > > > >> > > > > > > > Restoring partition
>> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-1
>> > > > > >>>> > > > >> > > from
>> > > > > >>>> > > > >> > > > > > > offset
>> > > > > >>>> > > > >> > > > > > > > 3024234 to endOffset 3131513
>> > > > > >>>> > > > >> > > > > > > > Found checkpoint 14514072 from
>> changelog
>> > > > > >>>> > > > >> > > > > > > >
>> > sensors-stream-aggregate-store-changelog-0
>> > > > for
>> > > > > >>>> store
>> > > > > >>>> > > > >> > > aggregate-store.
>> > > > > >>>> > > > >> > > > > > > > Restoring partition
>> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-0
>> > > > > >>>> > > > >> > > from
>> > > > > >>>> > > > >> > > > > > > offset
>> > > > > >>>> > > > >> > > > > > > > 14514072 to endOffset 17116574
>> > > > > >>>> > > > >> > > > > > > > Restored from
>> > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to
>> > > > > >>>> > > > >> > > > > > > aggregate-store
>> > > > > >>>> > > > >> > > > > > > > with 966 records, ending offset is
>> > 711432,
>> > > > next
>> > > > > >>>> starting
>> > > > > >>>> > > > >> > > position is
>> > > > > >>>> > > > >> > > > > > > 711434
>> > > > > >>>> > > > >> > > > > > > > Restored from
>> > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to
>> > > > > >>>> > > > >> > > > > > > aggregate-store
>> > > > > >>>> > > > >> > > > > > > > with 914 records, ending offset is
>> > 712711,
>> > > > next
>> > > > > >>>> starting
>> > > > > >>>> > > > >> > > position is
>> > > > > >>>> > > > >> > > > > > > 712713
>> > > > > >>>> > > > >> > > > > > > > Restored from
>> > > > > >>>> > > sensors-stream-aggregate-store-changelog-1 to
>> > > > > >>>> > > > >> > > > > > > aggregate-store
>> > > > > >>>> > > > >> > > > > > > > with 18 records, ending offset is
>> > 3024261,
>> > > > next
>> > > > > >>>> starting
>> > > > > >>>> > > > >> > > position is
>> > > > > >>>> > > > >> > > > > > > 3024262
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > why it first says it didn't find the
>> > > > checkpoint
>> > > > > >>>> and
>> > > > > >>>> > > then it
>> > > > > >>>> > > > >> does
>> > > > > >>>> > > > >> > > > > find it?
>> > > > > >>>> > > > >> > > > > > > > It seems it loaded about  2.7M records
>> > (sum
>> > > > of
>> > > > > >>>> offset
>> > > > > >>>> > > > >> difference
>> > > > > >>>> > > > >> > > in
>> > > > > >>>> > > > >> > > > > the
>> > > > > >>>> > > > >> > > > > > > > "restorting partition ...." messages)
>> > > right?
>> > > > > >>>> > > > >> > > > > > > > Maybe should I try to reduce the
>> > checkpoint
>> > > > > >>>> interval?
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > Regards
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > --
>> > > > > >>>> > > > >> > > > > > > > Alessandro Tagliapietra
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John
>> > > Roesler <
>> > > > > >>>> > > > >> vvcephei@apache.org
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > > > > wrote:
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > Hi Alessandro,
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > I'm sorry to hear that.
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > The restore process only takes one
>> > factor
>> > > > > into
>> > > > > >>>> > > account:
>> > > > > >>>> > > > >> the
>> > > > > >>>> > > > >> > > current
>> > > > > >>>> > > > >> > > > > > > offset
>> > > > > >>>> > > > >> > > > > > > > > position of the changelog topic is
>> > stored
>> > > > in
>> > > > > a
>> > > > > >>>> local
>> > > > > >>>> > > file
>> > > > > >>>> > > > >> > > > > alongside the
>> > > > > >>>> > > > >> > > > > > > > > state stores. On startup, the app
>> > checks
>> > > if
>> > > > > the
>> > > > > >>>> > > recorded
>> > > > > >>>> > > > >> > > position
>> > > > > >>>> > > > >> > > > > lags
>> > > > > >>>> > > > >> > > > > > > the
>> > > > > >>>> > > > >> > > > > > > > > latest offset in the changelog. If
>> so,
>> > > then
>> > > > > it
>> > > > > >>>> reads
>> > > > > >>>> > > the
>> > > > > >>>> > > > >> > > missing
>> > > > > >>>> > > > >> > > > > > > changelog
>> > > > > >>>> > > > >> > > > > > > > > records before starting processing.
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > Thus, it would not restore any old
>> > window
>> > > > > data.
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > There might be a few different
>> things
>> > > going
>> > > > > on
>> > > > > >>>> to
>> > > > > >>>> > > explain
>> > > > > >>>> > > > >> your
>> > > > > >>>> > > > >> > > > > > > observation:
>> > > > > >>>> > > > >> > > > > > > > > * if there is more than one
>> instance in
>> > > > your
>> > > > > >>>> Streams
>> > > > > >>>> > > > >> cluster,
>> > > > > >>>> > > > >> > > > > maybe the
>> > > > > >>>> > > > >> > > > > > > > > task is "flopping" between
>> instances,
>> > so
>> > > > each
>> > > > > >>>> instance
>> > > > > >>>> > > > >> still
>> > > > > >>>> > > > >> > > has to
>> > > > > >>>> > > > >> > > > > > > recover
>> > > > > >>>> > > > >> > > > > > > > > state, since it wasn't the last one
>> > > > actively
>> > > > > >>>> > > processing
>> > > > > >>>> > > > >> it.
>> > > > > >>>> > > > >> > > > > > > > > * if the application isn't stopped
>> > > > > gracefully,
>> > > > > >>>> it
>> > > > > >>>> > > might
>> > > > > >>>> > > > >> not
>> > > > > >>>> > > > >> > > get a
>> > > > > >>>> > > > >> > > > > > > chance
>> > > > > >>>> > > > >> > > > > > > > > to record its offset in that local
>> > file,
>> > > so
>> > > > > on
>> > > > > >>>> > > restart it
>> > > > > >>>> > > > >> has
>> > > > > >>>> > > > >> > > to
>> > > > > >>>> > > > >> > > > > > > restore
>> > > > > >>>> > > > >> > > > > > > > > some or all of the state store from
>> > > > > changelog.
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > Or it could be something else;
>> that's
>> > > just
>> > > > > >>>> what comes
>> > > > > >>>> > > to
>> > > > > >>>> > > > >> mind.
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > If you want to get to the bottom of
>> it,
>> > > you
>> > > > > >>>> can take a
>> > > > > >>>> > > > >> look at
>> > > > > >>>> > > > >> > > the
>> > > > > >>>> > > > >> > > > > > > logs,
>> > > > > >>>> > > > >> > > > > > > > > paying close attention to which
>> tasks
>> > are
>> > > > > >>>> assigned to
>> > > > > >>>> > > > >> which
>> > > > > >>>> > > > >> > > > > instances
>> > > > > >>>> > > > >> > > > > > > after
>> > > > > >>>> > > > >> > > > > > > > > each restart. You can also look into
>> > the
>> > > > logs
>> > > > > >>>> from
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > >
>> > > > >
>> `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
>> > > > > >>>> > > > >> > > > > > > (might
>> > > > > >>>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE
>> level
>> > to
>> > > > > >>>> really see
>> > > > > >>>> > > > >> what's
>> > > > > >>>> > > > >> > > > > happening).
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > I hope this helps!
>> > > > > >>>> > > > >> > > > > > > > > -John
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25,
>> > Alessandro
>> > > > > >>>> Tagliapietra
>> > > > > >>>> > > > >> wrote:
>> > > > > >>>> > > > >> > > > > > > > > > Hello everyone,
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > > we're having a problem with
>> bandwidth
>> > > > usage
>> > > > > >>>> on
>> > > > > >>>> > > streams
>> > > > > >>>> > > > >> > > > > application
>> > > > > >>>> > > > >> > > > > > > > > startup,
>> > > > > >>>> > > > >> > > > > > > > > > our current setup does this:
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > > ...
>> > > > > >>>> > > > >> > > > > > > > > > .groupByKey()
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > >
>> > > > > >>>>
>> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
>> > > > > >>>> > > > >> > > > > > > > > > .aggregate(
>> > > > > >>>> > > > >> > > > > > > > > >         {
>> > > MetricSequenceList(ArrayList())
>> > > > > },
>> > > > > >>>> > > > >> > > > > > > > > >         { key, value, aggregate ->
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > >  aggregate.getRecords().add(value)
>> > > > > >>>> > > > >> > > > > > > > > >             aggregate
>> > > > > >>>> > > > >> > > > > > > > > >         },
>> > > > > >>>> > > > >> > > > > > > > > >         Materialized.`as`<String,
>> > > > > >>>> > > MetricSequenceList,
>> > > > > >>>> > > > >> > > > > > > WindowStore<Bytes,
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > >
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > > > >>
>> > > > > >>>> > >
>> > > > > >>>>
>> > > > >
>> > > >
>> > >
>> >
>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>> > > > > >>>> > > > >> > > > > > > > > > )
>> > > > > >>>> > > > >> > > > > > > > > > .toStream()
>> > > > > >>>> > > > >> > > > > > > > > >
>> .flatTransform(TransformerSupplier {
>> > > > > >>>> > > > >> > > > > > > > > > ...
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > > basically in each window we append
>> > the
>> > > > new
>> > > > > >>>> values
>> > > > > >>>> > > and
>> > > > > >>>> > > > >> then do
>> > > > > >>>> > > > >> > > > > some
>> > > > > >>>> > > > >> > > > > > > other
>> > > > > >>>> > > > >> > > > > > > > > > logic with the array of windowed
>> > > values.
>> > > > > >>>> > > > >> > > > > > > > > > The aggregate-store changelog
>> topic
>> > > > > >>>> configuration
>> > > > > >>>> > > uses
>> > > > > >>>> > > > >> > > > > > > compact,delete as
>> > > > > >>>> > > > >> > > > > > > > > > cleanup policy and has 12 hours of
>> > > > > retention.
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > > What we've seen is that on
>> > application
>> > > > > >>>> startup it
>> > > > > >>>> > > takes
>> > > > > >>>> > > > >> a
>> > > > > >>>> > > > >> > > couple
>> > > > > >>>> > > > >> > > > > > > minutes
>> > > > > >>>> > > > >> > > > > > > > > to
>> > > > > >>>> > > > >> > > > > > > > > > rebuild the state store, even if
>> the
>> > > > state
>> > > > > >>>> store
>> > > > > >>>> > > > >> directory is
>> > > > > >>>> > > > >> > > > > > > persisted
>> > > > > >>>> > > > >> > > > > > > > > > across restarts. That along with
>> an
>> > > > > >>>> exception that
>> > > > > >>>> > > > >> caused the
>> > > > > >>>> > > > >> > > > > docker
>> > > > > >>>> > > > >> > > > > > > > > > container to be restarted a couple
>> > > > hundreds
>> > > > > >>>> times
>> > > > > >>>> > > > >> caused a
>> > > > > >>>> > > > >> > > big
>> > > > > >>>> > > > >> > > > > > > confluent
>> > > > > >>>> > > > >> > > > > > > > > > cloud bill compared to what we
>> > usually
>> > > > > spend
>> > > > > >>>> (1/4
>> > > > > >>>> > > of a
>> > > > > >>>> > > > >> full
>> > > > > >>>> > > > >> > > > > month in
>> > > > > >>>> > > > >> > > > > > > 1
>> > > > > >>>> > > > >> > > > > > > > > day).
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > > What I think is happening is that
>> the
>> > > > topic
>> > > > > >>>> is
>> > > > > >>>> > > keeping
>> > > > > >>>> > > > >> all
>> > > > > >>>> > > > >> > > the
>> > > > > >>>> > > > >> > > > > > > previous
>> > > > > >>>> > > > >> > > > > > > > > > windows even with the compacting
>> > policy
>> > > > > >>>> because each
>> > > > > >>>> > > > >> key is
>> > > > > >>>> > > > >> > > the
>> > > > > >>>> > > > >> > > > > > > original
>> > > > > >>>> > > > >> > > > > > > > > > key + the timestamp not just the
>> key.
>> > > > Since
>> > > > > >>>> we don't
>> > > > > >>>> > > > >> care
>> > > > > >>>> > > > >> > > about
>> > > > > >>>> > > > >> > > > > > > previous
>> > > > > >>>> > > > >> > > > > > > > > > windows as the flatTransform after
>> > the
>> > > > > >>>> toStream()
>> > > > > >>>> > > makes
>> > > > > >>>> > > > >> sure
>> > > > > >>>> > > > >> > > > > that we
>> > > > > >>>> > > > >> > > > > > > > > don't
>> > > > > >>>> > > > >> > > > > > > > > > process old windows (a custom
>> > > suppressor
>> > > > > >>>> basically)
>> > > > > >>>> > > is
>> > > > > >>>> > > > >> there
>> > > > > >>>> > > > >> > > a
>> > > > > >>>> > > > >> > > > > way to
>> > > > > >>>> > > > >> > > > > > > > > only
>> > > > > >>>> > > > >> > > > > > > > > > keep the last window so that the
>> > store
>> > > > > >>>> rebuilding
>> > > > > >>>> > > goes
>> > > > > >>>> > > > >> > > faster and
>> > > > > >>>> > > > >> > > > > > > without
>> > > > > >>>> > > > >> > > > > > > > > > rebuilding old windows too? Or
>> > should I
>> > > > > >>>> create a
>> > > > > >>>> > > custom
>> > > > > >>>> > > > >> > > window
>> > > > > >>>> > > > >> > > > > using
>> > > > > >>>> > > > >> > > > > > > the
>> > > > > >>>> > > > >> > > > > > > > > > original key as key so that the
>> > > > compaction
>> > > > > >>>> keeps
>> > > > > >>>> > > only
>> > > > > >>>> > > > >> the
>> > > > > >>>> > > > >> > > last
>> > > > > >>>> > > > >> > > > > window
>> > > > > >>>> > > > >> > > > > > > > > data?
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > > Thank you
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > > > --
>> > > > > >>>> > > > >> > > > > > > > > > Alessandro Tagliapietra
>> > > > > >>>> > > > >> > > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > > >
>> > > > > >>>> > > > >> > > > > > > >
>> > > > > >>>> > > > >> > > > > > >
>> > > > > >>>> > > > >> > > > > >
>> > > > > >>>> > > > >> > > > >
>> > > > > >>>> > > > >> > > >
>> > > > > >>>> > > > >> > >
>> > > > > >>>> > > > >> >
>> > > > > >>>> > > > >>
>> > > > > >>>> > > > >
>> > > > > >>>> > > >
>> > > > > >>>> > >
>> > > > > >>>> >
>> > > > > >>>>
>> > > > > >>>
>> > > > >
>> > > >
>> > >
>> >
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message