kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler" <vvcep...@apache.org>
Subject Re: Reducing streams startup bandwidth usage
Date Mon, 02 Dec 2019 17:18:25 GMT
Hi Alessandro,

I'm sorry to hear that.

The restore process only takes one factor into account: the current offset position of the
changelog topic is stored in a local file alongside the state stores. On startup, the app
checks if the recorded position lags the latest offset in the changelog. If so, then it reads
the missing changelog records before starting processing.

Thus, it would not restore any old window data.

There might be a few different things going on to explain your observation:
* if there is more than one instance in your Streams cluster, maybe the task is "flopping"
between instances, so each instance still has to recover state, since it wasn't the last one
actively processing it.
* if the application isn't stopped gracefully, it might not get a chance to record its offset
in that local file, so on restart it has to restore some or all of the state store from changelog.

Or it could be something else; that's just what comes to mind.

If you want to get to the bottom of it, you can take a look at the logs, paying close attention
to which tasks are assigned to which instances after each restart. You can also look into
the logs from `org.apache.kafka.streams.processor.internals.StoreChangelogReader` (might want
to set it to DEBUG or TRACE level to really see what's happening).

I hope this helps!
-John

On Sun, Dec 1, 2019, at 21:25, Alessandro Tagliapietra wrote:
> Hello everyone,
> 
> we're having a problem with bandwidth usage on streams application startup,
> our current setup does this:
> 
> ...
> .groupByKey()
> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> .aggregate(
>         { MetricSequenceList(ArrayList()) },
>         { key, value, aggregate ->
>             aggregate.getRecords().add(value)
>             aggregate
>         },
>         Materialized.`as`<String, MetricSequenceList, WindowStore<Bytes,
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> )
> .toStream()
> .flatTransform(TransformerSupplier {
> ...
> 
> basically in each window we append the new values and then do some other
> logic with the array of windowed values.
> The aggregate-store changelog topic configuration  uses compact,delete as
> cleanup policy and has 12 hours of retention.
> 
> What we've seen is that on application startup it takes a couple minutes to
> rebuild the state store, even if the state store directory is persisted
> across restarts. That along with an exception that caused the docker
> container to be restarted a couple hundreds times caused a big confluent
> cloud bill compared to what we usually spend (1/4 of a full month in 1 day).
> 
> What I think is happening is that the topic is keeping all the previous
> windows even with the compacting policy because each key is the original
> key + the timestamp not just the key. Since we don't care about previous
> windows as the flatTransform after the toStream() makes sure that we don't
> process old windows (a custom suppressor basically) is there a way to only
> keep the last window so that the store rebuilding goes faster and without
> rebuilding old windows too? Or should I create a custom window using the
> original key as key so that the compaction keeps only the last window data?
> 
> Thank you
> 
> --
> Alessandro Tagliapietra
>

Mime
View raw message