kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cherry <zhanglt2...@gmail.com>
Subject lost message when poll multiple partitions
Date Wed, 14 Aug 2019 01:32:06 GMT
I poll records from multiple partitions and just commit one record, then
the rest of records seems to be committed too, for I can't poll them again.
And I already set auto.commit to false, can't figure out why. But if the
topic has only one partition, it works fine, the records that I didn't
commit on the first poll will be re-polled again. Why the result is
different when there is more than one partition?

public static void main(String[] args) throws IOException,
   InterruptedException {
           initKafkaConsumer();
           do {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (TopicPartition partition : records.partitions()) {

                List<ConsumerRecord<String, String>> partitionRecords
= records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    LOG.info("partition = {}, offset = {}, value =
{}", record.partition(), record.offset(), record.value());
                    consumer.commitSync(
                        Collections.singletonMap(partition, new
OffsetAndMetadata(record.offset() + 1)));
                    break;
                }
            }


        } while (true);
    }

According to @mazaneicha's comment, I re-run the program, and the result
seems like I have committed all the records for not breaking out of outter
loop, but my log says(I have printed out the record info in code) weird
things, like these:

09:37:38.786 [main] INFO org.test.kafka.Consumer - partition = 2, offset =
0, value = this message 2

09:37:38.786 [main] INFO org.test.kafka.Consumer - partition = 2, offset =
10, value = this message 21

09:37:38.786 [main] INFO org.test.kafka.Consumer - partition = 2, offset =
11, value = this message 24

09:37:38.786 [main] INFO org.test.kafka.Consumer - partition = 2, offset =
13, value = this message 30

09:37:38.786 [main] INFO org.test.kafka.Consumer - partition = 2, offset =
14, value = this message 33

You can see from log that some of messages are lost, for the offset is not
consecutive. Anyone who have clues of why?

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message