kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taylor P <tdp002...@gmail.com>
Subject Re: Minimizing global store restoration time
Date Wed, 06 Feb 2019 23:47:28 GMT
Hi Patrik,

I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380 will
resolve this issue since our application is dependent on the global store
being fully restored before the application can be considered healthy. It
does not seem like KAFKA-7380 is aiming to address the nature of global
stores restoring each partition sequentially - it is aiming to change the
blocking nature of #start(). Restoring the global store partitions in
parallel would definitely speed things up, though, and admittedly my first
thought when debugging this was "why isn't this restoring each partition in
parallel?".

Changing our streams topology to avoid using a global store for such a
large amount of data would be doable but it does seem like a significant
amount of work. I am curious to know if anyone else is storing large
amounts of data in global stores and whether there are any inherent
limitations to the size of global stores.

Our topic is already using compaction.

Taylor

On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pkleindl@gmail.com> wrote:

> Hi Taylor
>
> We are facing the same issue, although on a smaller scale.
> The main problem as you found is that the restoration is running
> sequentially, this should be addressed in
> https://issues.apache.org/jira/browse/KAFKA-7380, although there has been
> no progress lately.
>
> On the other hand you could try re-evaluate if your problem can only be
> solved with global state stores, in our case (both in streams as well as
> for interactive queries) we could solve it with local state stores too,
> although only with more changes and more complexity in the topology.
>
> Not sure if it is applicable for your case, but have you looked into
> compression for the topics?
>
> best regards
>
> Patrik
>
> On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002019@gmail.com> wrote:
>
> > Hi,
> >
> > I am having issues with the global store taking a very long time to
> restore
> > during startup of a Kafka Streams 2.0.1 application. The global store is
> > backed by a RocksDB persistent store and is added to the Streams topology
> > in the following manner: https://pastebin.com/raw/VJutDyYe The global
> > store
> > topic has approximately 15 million records per partition and 18
> partitions.
> > The following global consumer settings are specified:
> >
> >     poll.timeout.ms = 10
> >     max.poll.records = 2000
> >     max.partition.fetch.bytes = 1048576
> >     fetch.max.bytes = 52428800
> >     receive.buffer.bytes = 65536
> >
> > I have tried tweaking the settings above on the consumer side, such as
> > increasing poll.timeout.ms to 2000, max.poll.records to 10000, and
> > max.partition.fetch.bytes to 52428800, but it seems that I keep hitting a
> > ceiling of restoring approximately 100,000 records per second. With 15
> > million records per partition, it takes approximately 150 seconds to
> > restore a single partition. With 18 partitions, it takes roughly 45
> minutes
> > to fully restore the global store.
> >
> > Switching from HDDs to SSDs on the brokers' log directories made
> > restoration roughly 25% faster overall, but this still feels slow. It
> seems
> > that I am hitting IOPS limits on the disks and am not even close to
> hitting
> > the throughput limits of the disks on either the broker or streams
> > application side.
> >
> > How can I minimize restoration time of a global store? Are there settings
> > that can increase throughput with the same number of IOPS? Ideally
> > restoration of each partition could be done in parallel but I recognize
> > there is only a single global store thread. Bringing up a new instance of
> > the Kafka Streams application occurs on a potentially daily basis, so the
> > restoration time is becoming more and more of a hassle.
> >
> > Thanks.
> >
> > Taylor
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message