storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harsha <st...@harsha.io>
Subject Re: Is there any way for a kafka spout to read from the committed offset in zookeeper?
Date Mon, 29 Dec 2014 04:04:23 GMT

It does read from the stored offsets. For the first time when you deploy
the topology and if you intend to read from the beginning of the topic
than set forceFromStart=true. If you kill and redeploy the topology and
you want to read from last saved position than make sure you set
forceFromStart=false. While topology is running it does save the offsets
into provided zookeeper root and replays the failed tuples based on the
saved offset. Here is the code where its reading from zookeeper
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java#L71



On Sun, Dec 28, 2014, at 05:14 PM, 이승진 wrote:
> Hi all,
>
> Is there any way for a kafka spout to read from the committed offset
> in zookeeper?
>
> Based on two test cases in storm-kafka, it seems like there is no such
> way but ways to read from the end or from the beginning.
>
> thanks in advance.
>
> @Test public void getOffsetFromConfigAndDontForceFromStart() {
> config.forceFromStart = false; config.startOffsetTime =
> OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long
> latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0,
> OffsetRequest.LatestTime()); long offsetFromConfig =
> KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
> assertThat(latestOffset, is(equalTo(offsetFromConfig))); }
>
> @Test public void getOffsetFromConfigAndFroceFromStart() {
> config.forceFromStart = true; config.startOffsetTime =
> OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long
> earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0,
> OffsetRequest.EarliestTime()); long offsetFromConfig =
> KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
> assertThat(earliestOffset, is(equalTo(offsetFromConfig))); }
>
>
>


Mime
View raw message