hmm..that sounds good but it's not working.. any idea?

014-12-05 22:51:33 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [-5]; retrying with default start offset time from configuration. configured start offset time: [1] offset: [-5]
2014-12-05 22:51:33 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [-5]; retrying with default start offset time from configuration. configured start offset time: [1] offset: [-5]
2014-12-05 22:51:33 s.k.KafkaUtils [ERROR] Error fetching data from [Partition{host=xxx3.com:9092, partition=0}] for topic [DXSPreAgg]: [OFFSET_OUT_OF_RANGE]
2014-12-05 22:51:33 s.k.KafkaSpout [WARN] Fetch failed
storm.kafka.FailedFetchException: Error fetching data from [Partition{host=xxx3.com:9092, partition=0}] for topic [DXSPreAgg]: [OFFSET_OUT_OF_RANGE]
	at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:192) ~[stormjar.jar:na]
	at storm.kafka.PartitionManager.fill(PartitionManager.java:159) ~[stormjar.jar:na]
	at storm.kafka.PartitionManager.next(PartitionManager.java:123) ~[stormjar.jar:na]
	at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) ~[stormjar.jar:na]
	at backtype.storm.daemon.executor$fn__4836$fn__4851$fn__4880.invoke(executor.clj:584) [storm-core-0.9.2-incubating-security.jar:0.9.2-incubating-security]
	at backtype.storm.util$async_loop$fn__1033.invoke(util.clj:439) [storm-core-0.9.2-incubating-security.jar:0.9.2-incubating-security]
	at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
	at java.lang.Thread.run(Thread.java:722) [na:1.7.0_17]

On Thu, Dec 4, 2014 at 6:46 PM, Andrew Neilson <arsneilson@gmail.com> wrote:
-1 and -2 come from kafka.api.OffsetRequest: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetRequest.scala

-1 is the latest time, -2 is the earliest time

In order to be sure you always start from the most recent offset in kafka, you need to set up your KafkaConfig (https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java) so that forceFromStart is set to true and startOffsetTime is -1:

config.forceFromStart = true; // this might be what you're missing
config.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // i.e. -1

On Thu, Dec 4, 2014 at 5:34 PM, Filipa Moura <filipa.mendesmoura@gmail.com> wrote:
Hi,
I'm trying to get my KafkaSpout to read the latest offset from Kafka.

With my standard configurations, I see startOffsetTime being set to -2 on the logs:
2014-12-04 23:19:34 s.k.PartitionManager [INFO] Read last commit offset from zookeeper: 2924325359; old topology_id: de65d4f8-a8e6-4f72-99bd-2d66e95fd293 - new topology_id: 89cf7268-9db1-423a-978b-3fe214d64e8e
2014-12-04 23:19:34 s.k.PartitionManager [INFO] Read last commit offset from zookeeper: 3217013339; old topology_id: de65d4f8-a8e6-4f72-99bd-2d66e95fd293 - new topology_id: 89cf7268-9db1-423a-978b-3fe214d64e8e
2014-12-04 23:19:34 s.k.PartitionManager [INFO] Last commit offset from zookeeper: 2924325359
2014-12-04 23:19:34 s.k.PartitionManager [INFO] Starting Kafka xx3.com:1 from offset 3217013339
2014-12-04 23:19:34 s.k.PartitionManager [INFO] Commit offset 2924497976 is more than 100000 behind, resetting to startOffsetTime=-2
2014-12-04 23:19:34 s.k.PartitionManager [INFO] Starting Kafka xxx3.com:0 from offset 2924497976

Adding the following on the code "spoutConfig.startOffsetTime = -1;" :
2014-12-04 23:14:21 s.k.PartitionManager [INFO] Read last commit offset from zookeeper: 3217004355; old topology_id: a7010f51-9fea-43e4-ba16-c9ad1f0ec245 - new topology_id: de65d4f8-a8e6-4f72-99bd-2d66e95fd293
2014-12-04 23:14:21 s.k.PartitionManager [INFO] Read last commit offset from zookeeper: 2923885086; old topology_id: a7010f51-9fea-43e4-ba16-c9ad1f0ec245 - new topology_id: de65d4f8-a8e6-4f72-99bd-2d66e95fd293
2014-12-04 23:14:21 s.k.PartitionManager [INFO] Starting Kafka xxx3.com:1 from offset 3217004355
2014-12-04 23:14:21 s.k.PartitionManager [INFO] Last commit offset from zookeeper: 2923885086
2014-12-04 23:14:21 s.k.PartitionManager [INFO] Commit offset 2924022915 is more than 100000 behind, resetting to startOffsetTime=-1
2014-12-04 23:14:21 s.k.PartitionManager [INFO] Starting Kafka xxx3.com:0 from offset 2924022915

What is the difference? And how can I be sure it's using the most recent offset from Kafka?

Thank you,
Filipa