kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From noah <iamn...@gmail.com>
Subject Re: Manual Offset Commits with High Level Consumer skipping messages
Date Fri, 19 Jun 2015 12:07:01 GMT
It is the value we get from calling MessageAndMetadata#offset() for the
last message processed. The MessageAndMetadata instance comes from the
ConsumerIterator.

On Fri, Jun 19, 2015 at 2:31 AM Carl Heymann <ch.heymann@gmail.com> wrote:

> How are you tracking the offsets that you manually commit? I.e. where do
> you get the metadata for the consumed messages?
>
> On Thu, Jun 18, 2015 at 11:21 PM, noah <iamnoah@gmail.com> wrote:
>
> > We are in a situation where we need at least once delivery. We have a
> > thread that pulls messages off the consumer, puts them in a queue where
> > they go through a few async steps, and then after the final step, we want
> > to commit the offset to the messages we have completed. There may be
> items
> > we have not completed still being processed, so
> > consumerConnector.commitOffsets() isn't an option for us.
> >
> > We are manually committing offsets to Kafka (0.8.2.1) (auto commit is
> off.)
> >
> > We have a simple test case that is supposed to verify that we don't lose
> > any messages if the Kafka server is shut down:
> >
> > // there are 25 messages, we send a few now and a few after the server
> > comes back up
> > for (TestMessageClass mess : messages.subList(0, mid)) {
> > producer.send(mess);
> > }
> >
> > stopKafka(); // in memory KafkaServer
> > startKafka();
> >
> > for (TestMessageClass mess : messages.subList(mid, total)) {
> > producer.send(mess);
> > }
> >
> > int tries = 0;
> > while(testConsumer.received.size() < total && tries++ < 10) {
> > Thread.sleep(200);
> > }
> > assertEquals(keys(testConsumer.received),
> > keys(ImmutableSet.copyOf(messages)));
> >
> > The test consumer is very simple:
> >
> > ConsumerIterator iterator;
> > while(iterator.hasNext()) {
> > process(iterator.next());
> > }
> >
> >         // end of process:
> >        commit(messageAndMetadata.offset());
> >
> > commit is basically the commit code from this page:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > ,
> > but runs the commit in a separate thread so it wont interfere with the
> > consumer.
> >
> > Here is the strange thing: If we do not commit, the test passes every
> time.
> > Kafka comes back up and the high level consumer picks up right where it
> > left off. But if we do commit, it does not recover, or we lose messages.
> > With 1 partition, we only get some prefix of the messages produced before
> > stopKafka(). With 2, one of the partitions never gets any of the messages
> > sent in the second half, while the other gets a prefix, but not all of
> the
> > messages for that partition.
> >
> > It seems like the most likely thing is that we are committing the wrong
> > offsets, but I cannot figure out how that is happening. Does the offset
> in
> > MessageAndMetadata not correspond to the offset in OffsetAndMetadata?
> >
> > Or do we have to abandon the high level consumer entirely if we want to
> > manually commit in this way?
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message