kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niklas Lönn <niklas.l...@gmail.com>
Subject Re: Why is segment.ms=10m for repartition topics in KafkaStreams?
Date Thu, 11 Oct 2018 14:12:50 GMT
Hi again,

On another note, does it really make much sense to limit by time this way?
wouldn't it be sufficient with just the 50mb? In use-cases like ours,
restarting/rebuilding the state opened almost 30K extra files on each
broker (It seems it is doing the repartitioning much faster than the
remaining transformations, so it becomes a quite big "buffer").

It looked like it was not possible to configure this (thereby i had to
patch the file directly), maybe it would be desirable to make it
configurable for use-cases like this one if, not considered a main use-case?

Once my app caught up (took like 8 hours), the amount of open files
decreased again and it looks like the cleanup is doing its job.

Kind regards
Niklas

On Wed, Oct 10, 2018 at 4:38 PM Niklas Lönn <niklas.lonn@gmail.com> wrote:

> Thanks Guozhang,
>
> Thanks for a very good answer!
> I now understand, so the idea is that the client cleans up after itself
> and that way there is a minimal amount of garbage in the repartition topic.
>
> We actually figured out we had another max open files limit we did hit
> indeed, and adjusting that limit we now successfully managed to start our
> application without crashing the brokers.
>
> However, I think I discovered a bug in the repartitioning setup, let me
> first try to explain our setup:
> We have a compacted topic, containing mostly short lived values, where
> tombstones normally are created within some hours, but could be delayed as
> much as a month.
> I suspect the repartition segments honor the timestamps of the records,
> and when resetting the application we process records that are quite old,
> therefore creating many many segments and a lot of open files as a result.
>
> When running my application I noticed these messages:
> Fetch offset 213792 is out of range for partition
> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting
> offset
> Fetch offset 110227 is out of range for partition
> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting
> offset
> Resetting offset for partition
> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset
> 233302.
> Resetting offset for partition
> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset
> 119914.
>
> This effectively made my application skip messages and I verified by
> patching RepartitionTopicConfig.java that it is due to the undefined
> retention.ms, leaving a default retention on the records meaning that my
> application was competing with the log cleaner.
>
> By adding this line i got rid of these messages:
> tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); //
> Infinite
>
> My understanding is that this should be safe as the cleanup is handled by
> the client invoking the admin api?
>
> Kind regards
> Niklas
>
>
> On Tue, Oct 9, 2018 at 8:47 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> Hi Niklas,
>>
>> Default value of segment.ms is set to 10min as part of this project
>> (introduced in Kafka 1.1.0):
>>
>> https://jira.apache.org/jira/browse/KAFKA-6150
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API
>>
>> In KIP-204 (KAFKA-6150), we added admin request to periodically delete
>> records immediately upon committing offsets, to make repartition topics
>> really "transient", and along with it we set the default segment.ms to
>> 10min. The rationale is that to make record purging effective, we need to
>> have smaller segment size so that we can delete those files after the
>> purged offset is larger that the segment's last offset in time.
>>
>>
>> Which Kafka version are you using currently? Did you observe that data
>> purging did not happen (otherwise segment files should be garbage
>> collected
>> quickly), or is your traffic very small or commit infrequently which
>> resulted in ineffective purging?
>>
>>
>> Guozhang
>>
>>
>>
>> On Tue, Oct 9, 2018 at 4:07 AM, Niklas Lönn <niklas.lonn@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > Recently we experienced a problem when resetting a streams application,
>> > doing quite a lot of operations based on 2 compacted source topics,
>> with 20
>> > partitions.
>> >
>> > We crashed entire broker cluster with TooManyOpenFiles exception (We
>> have a
>> > multi million limit already)
>> >
>> > When inspecting the internal topics configuration I noticed that the
>> > repartition topics have a default config of:
>> > *Configs:segment.bytes=52428800,segment.index.bytes=
>> > 52428800,cleanup.policy=delete,segment.ms
>> > <http://segment.ms>=600000*
>> >
>> > My source topic is a compacted topic used as a KTable, and lets assume I
>> > have data for every segment of 10min, I would quickly get 1.440 segments
>> > per partition per day.
>> >
>> > Since this repartition topic is not even compacted, I cant understand
>> the
>> > reasoning behind having a default of 10min segment.ms and 50mb
>> > segment.bytes?
>> >
>> > Is there any best process regarding this? Potentially we could crash the
>> > cluster every-time we need to reset an application.
>> >
>> > And does it make sense that it would keep so many open files at the same
>> > time in the first place? Could it be a bug in file management of the
>> Kafka
>> > broker?
>> >
>> > Kind regards
>> > Niklas
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message