kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Why is segment.ms=10m for repartition topics in KafkaStreams?
Date Tue, 16 Oct 2018 15:31:15 GMT
Hi Niklas,

By default, the retention.ms config is set to 7 days, and currently streams
does not try to override this value. What you observed, is probably because
your application is being reset and hence resuming on some very old offsets
which is for more than 7 days ago. As a result the log cleaner would kick
in and delete the log segments already causing "out of range" error.

Your approach for setting retention.ms to infinite sounds good to me. In
fact, in 2.0.0 Streams library have already overridden the default value
for repartition topics from 7 days to infinity as well. If you can upgrade
to that version you should be able to see that issue goes away.

Guozhang


On Mon, Oct 15, 2018 at 3:01 AM Niklas Lönn <niklas.lonn@gmail.com> wrote:

> I created the following bug report based on this conversation:
> https://issues.apache.org/jira/browse/KAFKA-7506
>
> //Niklas
>
> On Thu, Oct 11, 2018 at 4:12 PM Niklas Lönn <niklas.lonn@gmail.com> wrote:
>
> > Hi again,
> >
> > On another note, does it really make much sense to limit by time this
> way?
> > wouldn't it be sufficient with just the 50mb? In use-cases like ours,
> > restarting/rebuilding the state opened almost 30K extra files on each
> > broker (It seems it is doing the repartitioning much faster than the
> > remaining transformations, so it becomes a quite big "buffer").
> >
> > It looked like it was not possible to configure this (thereby i had to
> > patch the file directly), maybe it would be desirable to make it
> > configurable for use-cases like this one if, not considered a main
> use-case?
> >
> > Once my app caught up (took like 8 hours), the amount of open files
> > decreased again and it looks like the cleanup is doing its job.
> >
> > Kind regards
> > Niklas
> >
> > On Wed, Oct 10, 2018 at 4:38 PM Niklas Lönn <niklas.lonn@gmail.com>
> wrote:
> >
> >> Thanks Guozhang,
> >>
> >> Thanks for a very good answer!
> >> I now understand, so the idea is that the client cleans up after itself
> >> and that way there is a minimal amount of garbage in the repartition
> topic.
> >>
> >> We actually figured out we had another max open files limit we did hit
> >> indeed, and adjusting that limit we now successfully managed to start
> our
> >> application without crashing the brokers.
> >>
> >> However, I think I discovered a bug in the repartitioning setup, let me
> >> first try to explain our setup:
> >> We have a compacted topic, containing mostly short lived values, where
> >> tombstones normally are created within some hours, but could be delayed
> as
> >> much as a month.
> >> I suspect the repartition segments honor the timestamps of the records,
> >> and when resetting the application we process records that are quite
> old,
> >> therefore creating many many segments and a lot of open files as a
> result.
> >>
> >> When running my application I noticed these messages:
> >> Fetch offset 213792 is out of range for partition
> >> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting
> >> offset
> >> Fetch offset 110227 is out of range for partition
> >> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting
> >> offset
> >> Resetting offset for partition
> >> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset
> >> 233302.
> >> Resetting offset for partition
> >> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset
> >> 119914.
> >>
> >> This effectively made my application skip messages and I verified by
> >> patching RepartitionTopicConfig.java that it is due to the undefined
> >> retention.ms, leaving a default retention on the records meaning that
> my
> >> application was competing with the log cleaner.
> >>
> >> By adding this line i got rid of these messages:
> >> tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); //
> >> Infinite
> >>
> >> My understanding is that this should be safe as the cleanup is handled
> by
> >> the client invoking the admin api?
> >>
> >> Kind regards
> >> Niklas
> >>
> >>
> >> On Tue, Oct 9, 2018 at 8:47 PM Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >>
> >>> Hi Niklas,
> >>>
> >>> Default value of segment.ms is set to 10min as part of this project
> >>> (introduced in Kafka 1.1.0):
> >>>
> >>> https://jira.apache.org/jira/browse/KAFKA-6150
> >>>
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API
> >>>
> >>> In KIP-204 (KAFKA-6150), we added admin request to periodically delete
> >>> records immediately upon committing offsets, to make repartition topics
> >>> really "transient", and along with it we set the default segment.ms to
> >>> 10min. The rationale is that to make record purging effective, we need
> to
> >>> have smaller segment size so that we can delete those files after the
> >>> purged offset is larger that the segment's last offset in time.
> >>>
> >>>
> >>> Which Kafka version are you using currently? Did you observe that data
> >>> purging did not happen (otherwise segment files should be garbage
> >>> collected
> >>> quickly), or is your traffic very small or commit infrequently which
> >>> resulted in ineffective purging?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Tue, Oct 9, 2018 at 4:07 AM, Niklas Lönn <niklas.lonn@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi,
> >>> >
> >>> > Recently we experienced a problem when resetting a streams
> application,
> >>> > doing quite a lot of operations based on 2 compacted source topics,
> >>> with 20
> >>> > partitions.
> >>> >
> >>> > We crashed entire broker cluster with TooManyOpenFiles exception (We
> >>> have a
> >>> > multi million limit already)
> >>> >
> >>> > When inspecting the internal topics configuration I noticed that the
> >>> > repartition topics have a default config of:
> >>> > *Configs:segment.bytes=52428800,segment.index.bytes=
> >>> > 52428800,cleanup.policy=delete,segment.ms
> >>> > <http://segment.ms>=600000*
> >>> >
> >>> > My source topic is a compacted topic used as a KTable, and lets
> assume
> >>> I
> >>> > have data for every segment of 10min, I would quickly get 1.440
> >>> segments
> >>> > per partition per day.
> >>> >
> >>> > Since this repartition topic is not even compacted, I cant understand
> >>> the
> >>> > reasoning behind having a default of 10min segment.ms and 50mb
> >>> > segment.bytes?
> >>> >
> >>> > Is there any best process regarding this? Potentially we could crash
> >>> the
> >>> > cluster every-time we need to reset an application.
> >>> >
> >>> > And does it make sense that it would keep so many open files at the
> >>> same
> >>> > time in the first place? Could it be a bug in file management of the
> >>> Kafka
> >>> > broker?
> >>> >
> >>> > Kind regards
> >>> > Niklas
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message