pulsar-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Enrico Olivelli <eolive...@gmail.com>
Subject Getting last message id
Date Thu, 27 Dec 2018 14:45:45 GMT
Hi,
I am migrating an application from Kafka to Pulsar, I want to know
which is the ID of the latest message sent to a topic.
My case is that I would like to know if the reader has already
processed all available data.


I am using the Reader API because I need full control on the consumer
side about the portion of data to process (I will store the last
Message ID on a DB).

In Kafka I had a "consumer#seekToEnd + consumer#position"  API to move
to the end of a topic and then read the offsets for each partition.

In Pulsar I am trying to achieve the same goal with:

try (Reader<byte[]> pulsarReader = pulsarClient.newReader()
                .startMessageId(MessageId.latest)
                .topic(topicName)
                .create()) {
            MessageId lastMessageId = null;
            Message<byte[]> readNext = pulsarReader.readNext(1,
TimeUnit.SECONDS);
            if (readNext != null) {
                lastMessageId = readNext.getMessageId();
            }
}

This is not working, as within 1 second it is not guaranteed to have a
message....

Which is the best way ?

Thanks
Enrico

Mime
View raw message