kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: Starting from beginning of topic using Java API (kafka 0.7)
Date Sat, 08 Mar 2014 01:40:31 GMT
>From reading
around it looked like setting "autooffset.reset" = "smallest" would do
this, however I'm not actually seeing that behavior.

The reason is that a consumer actually consults this config only if it
doesn't find a previous offset stored for it's group in zookeeper. So, it
will respect this config only on startup and not on a subsequent run,
unless you delete the group information from zookeeper before starting the
consumer. That is the reason you see the right behavior with a new group.id.

Thanks,
Neha


On Fri, Mar 7, 2014 at 3:54 PM, Adam Phelps <amp@opendns.com> wrote:

> I've been trying to write a test consumer in Java for a new use of our
> Kafka cluster (currently used solely with Storm), however this use needs
> to always start from the earliest offset in the topic.  From reading
> around it looked like setting "autooffset.reset" = "smallest" would do
> this, however I'm not actually seeing that behavior.  So far the only
> way I've managed to force it to start from the beginning is using a new
> groupid with each run, which does not seem like an optimal method.
>
> Am I don't this incorrectly, or is there some other means of doing this
> from the Java API?
>
>     public KafkaUpdateSource(String zkQuorum, String topic, String group) {
>         Properties props = new Properties();
>         props.put("zk.connect", zkQuorum);
>         props.put("zk.connectiontimeout.ms", "100000");
>
>         if (group != null) {
>             props.put("groupid", group);
>         } else {
>             props.put("groupid", "test_group");
>         }
>         props.put("zk.synctime.ms", "200");
>         props.put("autocommit.interval.ms", "1000");
>         props.put("consumer.timeout.ms", "1000");
>
>         // XXX: For some reason the "start from smallest offset" option
> doesn't
>         // seem to work
>         props.put("autooffset.reset", "smallest");
>
>         ConsumerConnector consumer =
> Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put(topic, new Integer(1));
>         iterator =
> consumer.createMessageStreams(topicCountMap).get(topic).get(0).iterator();
>     }
>
> - Adam
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message