kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carl Heymann <ch.heym...@gmail.com>
Subject Re: Manual Offset Commits with High Level Consumer skipping messages
Date Fri, 19 Jun 2015 14:07:48 GMT
>From my understanding of the code (admittedly very limited), the offset in
OffsetAndMetadata corresponds to the start of the message just obtained
from iterator.next(). So if you commit that, a restarted consumer should
get that message again. So it should actually continue at the previous
message relative to what you might think. You should also be able to commit
that offset even before processing the message, and still get at-least once
delivery. So you should not miss messages, just get some duplicate
deliveries.

You mention some issues with produced messages being stored on the
partitions. Is this an independent problem from the consumer issue?

Do you have the full test code somewhere? I'm also trying to get
at-least-once delivery with the high level consumer.

On Fri, Jun 19, 2015 at 2:07 PM, noah <iamnoah@gmail.com> wrote:

> It is the value we get from calling MessageAndMetadata#offset() for the
> last message processed. The MessageAndMetadata instance comes from the
> ConsumerIterator.
>
> On Fri, Jun 19, 2015 at 2:31 AM Carl Heymann <ch.heymann@gmail.com> wrote:
>
> > How are you tracking the offsets that you manually commit? I.e. where do
> > you get the metadata for the consumed messages?
> >
> > On Thu, Jun 18, 2015 at 11:21 PM, noah <iamnoah@gmail.com> wrote:
> >
> > > We are in a situation where we need at least once delivery. We have a
> > > thread that pulls messages off the consumer, puts them in a queue where
> > > they go through a few async steps, and then after the final step, we
> want
> > > to commit the offset to the messages we have completed. There may be
> > items
> > > we have not completed still being processed, so
> > > consumerConnector.commitOffsets() isn't an option for us.
> > >
> > > We are manually committing offsets to Kafka (0.8.2.1) (auto commit is
> > off.)
> > >
> > > We have a simple test case that is supposed to verify that we don't
> lose
> > > any messages if the Kafka server is shut down:
> > >
> > > // there are 25 messages, we send a few now and a few after the server
> > > comes back up
> > > for (TestMessageClass mess : messages.subList(0, mid)) {
> > > producer.send(mess);
> > > }
> > >
> > > stopKafka(); // in memory KafkaServer
> > > startKafka();
> > >
> > > for (TestMessageClass mess : messages.subList(mid, total)) {
> > > producer.send(mess);
> > > }
> > >
> > > int tries = 0;
> > > while(testConsumer.received.size() < total && tries++ < 10) {
> > > Thread.sleep(200);
> > > }
> > > assertEquals(keys(testConsumer.received),
> > > keys(ImmutableSet.copyOf(messages)));
> > >
> > > The test consumer is very simple:
> > >
> > > ConsumerIterator iterator;
> > > while(iterator.hasNext()) {
> > > process(iterator.next());
> > > }
> > >
> > >         // end of process:
> > >        commit(messageAndMetadata.offset());
> > >
> > > commit is basically the commit code from this page:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > > ,
> > > but runs the commit in a separate thread so it wont interfere with the
> > > consumer.
> > >
> > > Here is the strange thing: If we do not commit, the test passes every
> > time.
> > > Kafka comes back up and the high level consumer picks up right where it
> > > left off. But if we do commit, it does not recover, or we lose
> messages.
> > > With 1 partition, we only get some prefix of the messages produced
> before
> > > stopKafka(). With 2, one of the partitions never gets any of the
> messages
> > > sent in the second half, while the other gets a prefix, but not all of
> > the
> > > messages for that partition.
> > >
> > > It seems like the most likely thing is that we are committing the wrong
> > > offsets, but I cannot figure out how that is happening. Does the offset
> > in
> > > MessageAndMetadata not correspond to the offset in OffsetAndMetadata?
> > >
> > > Or do we have to abandon the high level consumer entirely if we want to
> > > manually commit in this way?
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message