kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hanchi Wang <Hanchi.W...@microsoft.com.INVALID>
Subject RE: How to always consume from latest offset in kafka-streams
Date Wed, 24 Jan 2018 18:13:34 GMT
Not sure about the deprecation plan for SipmpleConsumer, but you can use SimpleConsmer to get
the latest offset of a partition first and then consume from that offset.



From: TSANG, Brilly<mailto:brilly.tsang@hk.daiwacm.com>
Sent: Monday, January 22, 2018 6:57 PM
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Subject: RE: How to always consume from latest offset in kafka-streams

If you are doing dynamic assignment ( consumer.subscription), you can try this in your code:

 KafkaConsumer<String, TickData> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Collections.singletonList("your_topic"), this);
consumer.poll(0)  //Just so you are connected and will have TopicPartition dynamically assigned
to your consumer
Set<TopicPartition> assignment = consumer.assignment();

//Seek to end should reset all the index to latest and you can poll from there to read.

// do your regular consumer loop here


-----Original Message-----
From: Xin Li [mailto:Xin.Li@trivago.com]
Sent: Tuesday, January 23, 2018 3:01 AM
To: users@kafka.apache.org
Subject: Re: How to always consume from latest offset in kafka-streams

consumer.auto.offset.reset = latest


On 19.01.18, 19:34, "Saloni Vithalani" <saloniv@thoughtworks.com> wrote:

    Our requirement is such that if a kafka-stream app is consuming a
    partition, it should start it's consumption from latest offset of that

    This seems like do-able using

    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

    Now, let's say using above configuration, the kafka-stream app started
    consuming data from latest offset for a partition. And after some time, the
    app crashes. When the app comes back live, we want it to consume data from
    the latest offset of that partition, instead of the where it left last

    But I can't find anything that can help achieve it using kafka-streams api.

    P.S. We are using kafka-1.0.0.
    Saloni Vithalani
    Email saloniv@thoughtworks.com
    Telephone +91 8552889571 <8552889571>
    [image: ThoughtWorks]


This email and any attachment(s) are intended solely for the person(s) named above, and are
or may contain information of a proprietary or confidential nature. If you are not the intended
recipient(s), you should delete this message immediately. Any use, disclosure or distribution
of this message without our prior consent is strictly prohibited.
This message may be subject to errors, incomplete or late delivery, interruption, interception,
modification, or may contain viruses. Neither Daiwa Capital Markets Hong Kong Limited, its
subsidiaries, affiliates nor their officers or employees represent or warrant the accuracy
or completeness, nor accept any responsibility or liability whatsoever for any use of or reliance
upon, this email or any of the contents hereof. The contents of this message are for information
purposes only, and subject to change without notice.
This message is not and is not intended to be an offer or solicitation to buy or sell any
securities or financial products, nor does any recommendation, opinion or advice necessarily
reflect those of Daiwa Capital Markets Hong Kong Limited, its subsidiaries or affiliates.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message