kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexey Borschenko <aborsche...@elance-odesk.com>
Subject SimpleConsumer.getOffsetsBefore problem
Date Thu, 16 Apr 2015 14:35:10 GMT
Hi all!

I need to read offsets closing to specified timestamp.
As I can see this can be achieved by using SImpleConsumer API.
To test things I use SimpleConsumer example provided on site:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

I use Kafka 8.2.1

When I pass -1 or -2 (latest or earliest) as time to getOffsetsBefore - it
worx fine: returns 1562 and 73794 accordingly
When I pass System.currentTimeMillis() as time: returns 1562 - same as
earliest
When I pass System.currentTimeMillis() - 10*60*1000 - it returns 1562
When I pass System.currentTimeMillis() - 200*60*1000 - it returns 0

Here is code snippet:

*****************

public void run(long a_maxReads, String a_topic, int a_partition,
List<String> a_seedBrokers, int a_port) throws Exception {
    System.out.println("a_maxReads = [" + a_maxReads + "], a_topic =
[" + a_topic + "], a_partition = [" + a_partition + "], a_seedBrokers
= [" + a_seedBrokers + "], a_port = [" + a_port + "]");
    // find the meta data about the topic and partition we are interested in
    //
    PartitionMetadata metadata = findLeader(a_seedBrokers, a_port,
a_topic, a_partition);
    if (metadata == null) {
        System.out.println("Can't find metadata for Topic and
Partition. Exiting");
        return;
    }
    if (metadata.leader() == null) {
        System.out.println("Can't find Leader for Topic and Partition.
Exiting");
        return;
    }
    String leadBroker = metadata.leader().host();
    String clientName = "Client_" + a_topic + "_" + a_partition;

    SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
100000, 64 * 1024, clientName);
    long readOffset = getLastOffset(
            consumer, a_topic, a_partition,
            System.currentTimeMillis() - 10 * 60 * 1000,
            clientName);
    System.out.println("readOffset = " + readOffset);
}

public static long getLastOffset(SimpleConsumer consumer, String
topic, int partition,
                                 long whichTime, String clientName) {
    System.out.println("consumer = [" + consumer + "], topic = [" +
topic + "], partition = [" + partition + "], whichTime = [" +
whichTime + "], clientName = [" + clientName + "]");
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the
Broker. Reason: " + response.errorCode(topic, partition));
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    if (offsets.length == 0) {
        return 0;
    }
    return offsets[0];
}

************************************

How can I get more or less accurate offset values close to specified timestamp?

Thanx!

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message