kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niklas Lönn <niklas.l...@gmail.com>
Subject Re: Why is segment.ms=10m for repartition topics in KafkaStreams?
Date Wed, 10 Oct 2018 14:38:23 GMT
Thanks Guozhang,

Thanks for a very good answer!
I now understand, so the idea is that the client cleans up after itself and
that way there is a minimal amount of garbage in the repartition topic.

We actually figured out we had another max open files limit we did hit
indeed, and adjusting that limit we now successfully managed to start our
application without crashing the brokers.

However, I think I discovered a bug in the repartitioning setup, let me
first try to explain our setup:
We have a compacted topic, containing mostly short lived values, where
tombstones normally are created within some hours, but could be delayed as
much as a month.
I suspect the repartition segments honor the timestamps of the records, and
when resetting the application we process records that are quite old,
therefore creating many many segments and a lot of open files as a result.

When running my application I noticed these messages:
Fetch offset 213792 is out of range for partition
app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting
offset
Fetch offset 110227 is out of range for partition
app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting
offset
Resetting offset for partition
app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset
233302.
Resetting offset for partition
app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset
119914.

This effectively made my application skip messages and I verified by
patching RepartitionTopicConfig.java that it is due to the undefined
retention.ms, leaving a default retention on the records meaning that my
application was competing with the log cleaner.

By adding this line i got rid of these messages:
tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); //
Infinite

My understanding is that this should be safe as the cleanup is handled by
the client invoking the admin api?

Kind regards
Niklas


On Tue, Oct 9, 2018 at 8:47 PM Guozhang Wang <wangguoz@gmail.com> wrote:

> Hi Niklas,
>
> Default value of segment.ms is set to 10min as part of this project
> (introduced in Kafka 1.1.0):
>
> https://jira.apache.org/jira/browse/KAFKA-6150
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API
>
> In KIP-204 (KAFKA-6150), we added admin request to periodically delete
> records immediately upon committing offsets, to make repartition topics
> really "transient", and along with it we set the default segment.ms to
> 10min. The rationale is that to make record purging effective, we need to
> have smaller segment size so that we can delete those files after the
> purged offset is larger that the segment's last offset in time.
>
>
> Which Kafka version are you using currently? Did you observe that data
> purging did not happen (otherwise segment files should be garbage collected
> quickly), or is your traffic very small or commit infrequently which
> resulted in ineffective purging?
>
>
> Guozhang
>
>
>
> On Tue, Oct 9, 2018 at 4:07 AM, Niklas Lönn <niklas.lonn@gmail.com> wrote:
>
> > Hi,
> >
> > Recently we experienced a problem when resetting a streams application,
> > doing quite a lot of operations based on 2 compacted source topics, with
> 20
> > partitions.
> >
> > We crashed entire broker cluster with TooManyOpenFiles exception (We
> have a
> > multi million limit already)
> >
> > When inspecting the internal topics configuration I noticed that the
> > repartition topics have a default config of:
> > *Configs:segment.bytes=52428800,segment.index.bytes=
> > 52428800,cleanup.policy=delete,segment.ms
> > <http://segment.ms>=600000*
> >
> > My source topic is a compacted topic used as a KTable, and lets assume I
> > have data for every segment of 10min, I would quickly get 1.440 segments
> > per partition per day.
> >
> > Since this repartition topic is not even compacted, I cant understand the
> > reasoning behind having a default of 10min segment.ms and 50mb
> > segment.bytes?
> >
> > Is there any best process regarding this? Potentially we could crash the
> > cluster every-time we need to reset an application.
> >
> > And does it make sense that it would keep so many open files at the same
> > time in the first place? Could it be a bug in file management of the
> Kafka
> > broker?
> >
> > Kind regards
> > Niklas
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message