kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mickael Maison <mickael.mai...@gmail.com>
Subject Re: Committing an invalid offset with KafkaConsumer.commitSync
Date Sun, 03 Sep 2017 17:47:24 GMT
I believe the Javadoc is slightly incorrect/misleading.
When it says "offset metadata is too large", it is about the metadata
you can commit along with the offset, not the offset. See
OffsetAndMetadata:
http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Regarding the offset value, we only check if it's negative and that's
only performed client side (presumably 3rd party clients could commit
a negative offset). Apart from that, no checks are made if the offset
is "in range" or not.
We had a look a while back to check if the offset is "in range" when
committing but it's complicated, see the comments on
https://issues.apache.org/jira/browse/KAFKA-4081

I opened a PR to update the Javadoc: https://github.com/apache/kafka/pull/3780

HTH

On Sun, Sep 3, 2017 at 4:57 PM, Stig Døssing <generalbas.srd@gmail.com> wrote:
> The broker and consumer are version 0.11.0.0.
>
> 2017-09-03 17:38 GMT+02:00 Jeff Widman <jeff@jeffwidman.com>:
>
>> What broker version are you testing with?
>>
>> On Sep 3, 2017 4:14 AM, "Stig Døssing" <generalbas.srd@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > The documentation for KafkaConsumer.commitSync(Map) states that a
>> > KafkaException will be thrown if the committed offset is invalid. I can't
>> > seem to provoke this behavior, so I'd like clarification on whether this
>> is
>> > something the consumer is intended to do.
>> >
>> > Here's the snippet I'd expect would error out (running in a loop):
>> > public long getEarliestOffset() {
>> >             LOG.info("Current offset " + consumer.committed(new
>> > TopicPartition(TOPIC, 0)));
>> >             ConsumerRecords<String, String> records =
>> consumer.poll(2000);
>> >             consumer.seekToBeginning(consumer.assignment());
>> >             consumer.commitSync(Collections.singletonMap(new
>> > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
>> >             return consumer.position(new TopicPartition(TOPIC, 0));
>> >         }
>> >
>> > The committed offset appears to be updated to 4.000.000 even though the
>> > highest offset in that partition is ~6000. It also seems to work the
>> other
>> > way round, if I set the log retention such that offsets before 2000 are
>> > deleted, I can still commit offset 0.
>> >
>> > I'm trying to clarify what the consumer is expected to do, because I'm
>> > trying to figure out whether a consumer loop that is committing offsets
>> > with commitSync(Map) for a partition where log deletion is enabled needs
>> to
>> > put a try-catch guard around the commit call in case the committed offset
>> > has been deleted.
>> >
>> > Thanks,
>> > Stig Rohde Døssing
>> >
>>

Mime
View raw message