kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Phelps <...@opendns.com>
Subject Starting from beginning of topic using Java API (kafka 0.7)
Date Fri, 07 Mar 2014 23:54:02 GMT
I've been trying to write a test consumer in Java for a new use of our
Kafka cluster (currently used solely with Storm), however this use needs
to always start from the earliest offset in the topic.  From reading
around it looked like setting "autooffset.reset" = "smallest" would do
this, however I'm not actually seeing that behavior.  So far the only
way I've managed to force it to start from the beginning is using a new
groupid with each run, which does not seem like an optimal method.

Am I don't this incorrectly, or is there some other means of doing this
from the Java API?

    public KafkaUpdateSource(String zkQuorum, String topic, String group) {
        Properties props = new Properties();
        props.put("zk.connect", zkQuorum);
        props.put("zk.connectiontimeout.ms", "100000");

        if (group != null) {
            props.put("groupid", group);
        } else {
            props.put("groupid", "test_group");
        }
        props.put("zk.synctime.ms", "200");
        props.put("autocommit.interval.ms", "1000");
        props.put("consumer.timeout.ms", "1000");

        // XXX: For some reason the "start from smallest offset" option
doesn't
        // seem to work
        props.put("autooffset.reset", "smallest");

        ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        iterator =
consumer.createMessageStreams(topicCountMap).get(topic).get(0).iterator();
    }

- Adam

Mime
View raw message