kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Døssing <generalbas....@gmail.com>
Subject Re: Committing an invalid offset with KafkaConsumer.commitSync
Date Sun, 03 Sep 2017 18:01:06 GMT
Thanks for the answer, I won't need try-catch around commitSync then. Also
thanks for updating the docs.

2017-09-03 19:47 GMT+02:00 Mickael Maison <mickael.maison@gmail.com>:

> I believe the Javadoc is slightly incorrect/misleading.
> When it says "offset metadata is too large", it is about the metadata
> you can commit along with the offset, not the offset. See
> OffsetAndMetadata:
> http://kafka.apache.org/0110/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html
>
> Regarding the offset value, we only check if it's negative and that's
> only performed client side (presumably 3rd party clients could commit
> a negative offset). Apart from that, no checks are made if the offset
> is "in range" or not.
> We had a look a while back to check if the offset is "in range" when
> committing but it's complicated, see the comments on
> https://issues.apache.org/jira/browse/KAFKA-4081
>
> I opened a PR to update the Javadoc: https://github.com/apache/
> kafka/pull/3780
>
> HTH
>
> On Sun, Sep 3, 2017 at 4:57 PM, Stig Døssing <generalbas.srd@gmail.com>
> wrote:
> > The broker and consumer are version 0.11.0.0.
> >
> > 2017-09-03 17:38 GMT+02:00 Jeff Widman <jeff@jeffwidman.com>:
> >
> >> What broker version are you testing with?
> >>
> >> On Sep 3, 2017 4:14 AM, "Stig Døssing" <generalbas.srd@gmail.com>
> wrote:
> >>
> >> > Hi,
> >> >
> >> > The documentation for KafkaConsumer.commitSync(Map) states that a
> >> > KafkaException will be thrown if the committed offset is invalid. I
> can't
> >> > seem to provoke this behavior, so I'd like clarification on whether
> this
> >> is
> >> > something the consumer is intended to do.
> >> >
> >> > Here's the snippet I'd expect would error out (running in a loop):
> >> > public long getEarliestOffset() {
> >> >             LOG.info("Current offset " + consumer.committed(new
> >> > TopicPartition(TOPIC, 0)));
> >> >             ConsumerRecords<String, String> records =
> >> consumer.poll(2000);
> >> >             consumer.seekToBeginning(consumer.assignment());
> >> >             consumer.commitSync(Collections.singletonMap(new
> >> > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
> >> >             return consumer.position(new TopicPartition(TOPIC, 0));
> >> >         }
> >> >
> >> > The committed offset appears to be updated to 4.000.000 even though
> the
> >> > highest offset in that partition is ~6000. It also seems to work the
> >> other
> >> > way round, if I set the log retention such that offsets before 2000
> are
> >> > deleted, I can still commit offset 0.
> >> >
> >> > I'm trying to clarify what the consumer is expected to do, because I'm
> >> > trying to figure out whether a consumer loop that is committing
> offsets
> >> > with commitSync(Map) for a partition where log deletion is enabled
> needs
> >> to
> >> > put a try-catch guard around the commit call in case the committed
> offset
> >> > has been deleted.
> >> >
> >> > Thanks,
> >> > Stig Rohde Døssing
> >> >
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message