kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niklas Lönn <niklas.l...@gmail.com>
Subject Re: Why is segment.ms=10m for repartition topics in KafkaStreams?
Date Mon, 15 Oct 2018 10:01:12 GMT
I created the following bug report based on this conversation:
https://issues.apache.org/jira/browse/KAFKA-7506

//Niklas

On Thu, Oct 11, 2018 at 4:12 PM Niklas Lönn <niklas.lonn@gmail.com> wrote:

> Hi again,
>
> On another note, does it really make much sense to limit by time this way?
> wouldn't it be sufficient with just the 50mb? In use-cases like ours,
> restarting/rebuilding the state opened almost 30K extra files on each
> broker (It seems it is doing the repartitioning much faster than the
> remaining transformations, so it becomes a quite big "buffer").
>
> It looked like it was not possible to configure this (thereby i had to
> patch the file directly), maybe it would be desirable to make it
> configurable for use-cases like this one if, not considered a main use-case?
>
> Once my app caught up (took like 8 hours), the amount of open files
> decreased again and it looks like the cleanup is doing its job.
>
> Kind regards
> Niklas
>
> On Wed, Oct 10, 2018 at 4:38 PM Niklas Lönn <niklas.lonn@gmail.com> wrote:
>
>> Thanks Guozhang,
>>
>> Thanks for a very good answer!
>> I now understand, so the idea is that the client cleans up after itself
>> and that way there is a minimal amount of garbage in the repartition topic.
>>
>> We actually figured out we had another max open files limit we did hit
>> indeed, and adjusting that limit we now successfully managed to start our
>> application without crashing the brokers.
>>
>> However, I think I discovered a bug in the repartitioning setup, let me
>> first try to explain our setup:
>> We have a compacted topic, containing mostly short lived values, where
>> tombstones normally are created within some hours, but could be delayed as
>> much as a month.
>> I suspect the repartition segments honor the timestamps of the records,
>> and when resetting the application we process records that are quite old,
>> therefore creating many many segments and a lot of open files as a result.
>>
>> When running my application I noticed these messages:
>> Fetch offset 213792 is out of range for partition
>> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting
>> offset
>> Fetch offset 110227 is out of range for partition
>> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting
>> offset
>> Resetting offset for partition
>> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset
>> 233302.
>> Resetting offset for partition
>> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset
>> 119914.
>>
>> This effectively made my application skip messages and I verified by
>> patching RepartitionTopicConfig.java that it is due to the undefined
>> retention.ms, leaving a default retention on the records meaning that my
>> application was competing with the log cleaner.
>>
>> By adding this line i got rid of these messages:
>> tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); //
>> Infinite
>>
>> My understanding is that this should be safe as the cleanup is handled by
>> the client invoking the admin api?
>>
>> Kind regards
>> Niklas
>>
>>
>> On Tue, Oct 9, 2018 at 8:47 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>>
>>> Hi Niklas,
>>>
>>> Default value of segment.ms is set to 10min as part of this project
>>> (introduced in Kafka 1.1.0):
>>>
>>> https://jira.apache.org/jira/browse/KAFKA-6150
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API
>>>
>>> In KIP-204 (KAFKA-6150), we added admin request to periodically delete
>>> records immediately upon committing offsets, to make repartition topics
>>> really "transient", and along with it we set the default segment.ms to
>>> 10min. The rationale is that to make record purging effective, we need to
>>> have smaller segment size so that we can delete those files after the
>>> purged offset is larger that the segment's last offset in time.
>>>
>>>
>>> Which Kafka version are you using currently? Did you observe that data
>>> purging did not happen (otherwise segment files should be garbage
>>> collected
>>> quickly), or is your traffic very small or commit infrequently which
>>> resulted in ineffective purging?
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Tue, Oct 9, 2018 at 4:07 AM, Niklas Lönn <niklas.lonn@gmail.com>
>>> wrote:
>>>
>>> > Hi,
>>> >
>>> > Recently we experienced a problem when resetting a streams application,
>>> > doing quite a lot of operations based on 2 compacted source topics,
>>> with 20
>>> > partitions.
>>> >
>>> > We crashed entire broker cluster with TooManyOpenFiles exception (We
>>> have a
>>> > multi million limit already)
>>> >
>>> > When inspecting the internal topics configuration I noticed that the
>>> > repartition topics have a default config of:
>>> > *Configs:segment.bytes=52428800,segment.index.bytes=
>>> > 52428800,cleanup.policy=delete,segment.ms
>>> > <http://segment.ms>=600000*
>>> >
>>> > My source topic is a compacted topic used as a KTable, and lets assume
>>> I
>>> > have data for every segment of 10min, I would quickly get 1.440
>>> segments
>>> > per partition per day.
>>> >
>>> > Since this repartition topic is not even compacted, I cant understand
>>> the
>>> > reasoning behind having a default of 10min segment.ms and 50mb
>>> > segment.bytes?
>>> >
>>> > Is there any best process regarding this? Potentially we could crash
>>> the
>>> > cluster every-time we need to reset an application.
>>> >
>>> > And does it make sense that it would keep so many open files at the
>>> same
>>> > time in the first place? Could it be a bug in file management of the
>>> Kafka
>>> > broker?
>>> >
>>> > Kind regards
>>> > Niklas
>>> >
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message