kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Dubiel <dubiel.a...@gmail.com>
Subject Re: Manual Offset Commits with High Level Consumer skipping messages
Date Fri, 19 Jun 2015 14:34:41 GMT
Hi,

We have been solving this very problem in Hermes. You can see what we came
up by examining classes located here:

https://github.com/allegro/hermes/tree/master/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset

We are quite sure this gives us at-least-once guarantees. This is basically
a queue of offsets per partition, each with ready/not ready marker. They
are sorted, and every N seconds we try to commit highest offsets, whose all
predecessors are ready (per partition). Since there is possibility of never
committing (i.e. one message gets stuck in processing and it never reports
back) we have some time-based monitoring that alerts us if there was no
commit for this partition for a long time. We never do a force-commit,
since we value our messages a lot :)

This is internal code, so it's not pure, as it references our configs and
metrics, but i hope you can get the idea :)


2015-06-19 16:07 GMT+02:00 Carl Heymann <ch.heymann@gmail.com>:

> From my understanding of the code (admittedly very limited), the offset in
> OffsetAndMetadata corresponds to the start of the message just obtained
> from iterator.next(). So if you commit that, a restarted consumer should
> get that message again. So it should actually continue at the previous
> message relative to what you might think. You should also be able to commit
> that offset even before processing the message, and still get at-least once
> delivery. So you should not miss messages, just get some duplicate
> deliveries.
>
> You mention some issues with produced messages being stored on the
> partitions. Is this an independent problem from the consumer issue?
>
> Do you have the full test code somewhere? I'm also trying to get
> at-least-once delivery with the high level consumer.
>
> On Fri, Jun 19, 2015 at 2:07 PM, noah <iamnoah@gmail.com> wrote:
>
> > It is the value we get from calling MessageAndMetadata#offset() for the
> > last message processed. The MessageAndMetadata instance comes from the
> > ConsumerIterator.
> >
> > On Fri, Jun 19, 2015 at 2:31 AM Carl Heymann <ch.heymann@gmail.com>
> wrote:
> >
> > > How are you tracking the offsets that you manually commit? I.e. where
> do
> > > you get the metadata for the consumed messages?
> > >
> > > On Thu, Jun 18, 2015 at 11:21 PM, noah <iamnoah@gmail.com> wrote:
> > >
> > > > We are in a situation where we need at least once delivery. We have a
> > > > thread that pulls messages off the consumer, puts them in a queue
> where
> > > > they go through a few async steps, and then after the final step, we
> > want
> > > > to commit the offset to the messages we have completed. There may be
> > > items
> > > > we have not completed still being processed, so
> > > > consumerConnector.commitOffsets() isn't an option for us.
> > > >
> > > > We are manually committing offsets to Kafka (0.8.2.1) (auto commit is
> > > off.)
> > > >
> > > > We have a simple test case that is supposed to verify that we don't
> > lose
> > > > any messages if the Kafka server is shut down:
> > > >
> > > > // there are 25 messages, we send a few now and a few after the
> server
> > > > comes back up
> > > > for (TestMessageClass mess : messages.subList(0, mid)) {
> > > > producer.send(mess);
> > > > }
> > > >
> > > > stopKafka(); // in memory KafkaServer
> > > > startKafka();
> > > >
> > > > for (TestMessageClass mess : messages.subList(mid, total)) {
> > > > producer.send(mess);
> > > > }
> > > >
> > > > int tries = 0;
> > > > while(testConsumer.received.size() < total && tries++ <
10) {
> > > > Thread.sleep(200);
> > > > }
> > > > assertEquals(keys(testConsumer.received),
> > > > keys(ImmutableSet.copyOf(messages)));
> > > >
> > > > The test consumer is very simple:
> > > >
> > > > ConsumerIterator iterator;
> > > > while(iterator.hasNext()) {
> > > > process(iterator.next());
> > > > }
> > > >
> > > >         // end of process:
> > > >        commit(messageAndMetadata.offset());
> > > >
> > > > commit is basically the commit code from this page:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > > > ,
> > > > but runs the commit in a separate thread so it wont interfere with
> the
> > > > consumer.
> > > >
> > > > Here is the strange thing: If we do not commit, the test passes every
> > > time.
> > > > Kafka comes back up and the high level consumer picks up right where
> it
> > > > left off. But if we do commit, it does not recover, or we lose
> > messages.
> > > > With 1 partition, we only get some prefix of the messages produced
> > before
> > > > stopKafka(). With 2, one of the partitions never gets any of the
> > messages
> > > > sent in the second half, while the other gets a prefix, but not all
> of
> > > the
> > > > messages for that partition.
> > > >
> > > > It seems like the most likely thing is that we are committing the
> wrong
> > > > offsets, but I cannot figure out how that is happening. Does the
> offset
> > > in
> > > > MessageAndMetadata not correspond to the offset in OffsetAndMetadata?
> > > >
> > > > Or do we have to abandon the high level consumer entirely if we want
> to
> > > > manually commit in this way?
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message