kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carl Heymann <ch.heym...@gmail.com>
Subject Re: Manual Offset Commits with High Level Consumer skipping messages
Date Fri, 19 Jun 2015 07:31:25 GMT
How are you tracking the offsets that you manually commit? I.e. where do
you get the metadata for the consumed messages?

On Thu, Jun 18, 2015 at 11:21 PM, noah <iamnoah@gmail.com> wrote:

> We are in a situation where we need at least once delivery. We have a
> thread that pulls messages off the consumer, puts them in a queue where
> they go through a few async steps, and then after the final step, we want
> to commit the offset to the messages we have completed. There may be items
> we have not completed still being processed, so
> consumerConnector.commitOffsets() isn't an option for us.
>
> We are manually committing offsets to Kafka (0.8.2.1) (auto commit is off.)
>
> We have a simple test case that is supposed to verify that we don't lose
> any messages if the Kafka server is shut down:
>
> // there are 25 messages, we send a few now and a few after the server
> comes back up
> for (TestMessageClass mess : messages.subList(0, mid)) {
> producer.send(mess);
> }
>
> stopKafka(); // in memory KafkaServer
> startKafka();
>
> for (TestMessageClass mess : messages.subList(mid, total)) {
> producer.send(mess);
> }
>
> int tries = 0;
> while(testConsumer.received.size() < total && tries++ < 10) {
> Thread.sleep(200);
> }
> assertEquals(keys(testConsumer.received),
> keys(ImmutableSet.copyOf(messages)));
>
> The test consumer is very simple:
>
> ConsumerIterator iterator;
> while(iterator.hasNext()) {
> process(iterator.next());
> }
>
>         // end of process:
>        commit(messageAndMetadata.offset());
>
> commit is basically the commit code from this page:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> ,
> but runs the commit in a separate thread so it wont interfere with the
> consumer.
>
> Here is the strange thing: If we do not commit, the test passes every time.
> Kafka comes back up and the high level consumer picks up right where it
> left off. But if we do commit, it does not recover, or we lose messages.
> With 1 partition, we only get some prefix of the messages produced before
> stopKafka(). With 2, one of the partitions never gets any of the messages
> sent in the second half, while the other gets a prefix, but not all of the
> messages for that partition.
>
> It seems like the most likely thing is that we are committing the wrong
> offsets, but I cannot figure out how that is happening. Does the offset in
> MessageAndMetadata not correspond to the offset in OffsetAndMetadata?
>
> Or do we have to abandon the high level consumer entirely if we want to
> manually commit in this way?
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message