kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marina <ppi...@yahoo.com.INVALID>
Subject Re: how to modify offsets stored in Kafka in version?
Date Mon, 22 Jun 2015 13:16:48 GMT
Actually, there is one more case:

Case 3: skipping messages / cleaning up topics

It might be less applicable to production, but quite often for load testing: sometimes you
find issues with either events that are already pushed into Kafka, or the app that processes
them - and you know that you won't be able to clear out the topic in a normal way, and would
like to just skip the events and re-start your consumer from the end of the log (LATEST offset).
With Low-level consumer you can easily do that. With HL consumer, the only way I was able
to do that was by setting the offset directly in Zookeeper (or deleting the whole path in
Zookeeper and the whole log dir in Kafka). 

Not sure if there is a better way to do this kind of cleanup.


----- Original Message -----
From: Marina <ppine7@yahoo.com.INVALID>
To: "users@kafka.apache.org" <users@kafka.apache.org>
Sent: Monday, June 22, 2015 8:45 AM
Subject: Re: how to modify offsets stored in Kafka in version?

Thanks, Raja, Guozhang, for your response!
Raja - the slides are great, very helpful information - woudl be good to have them included
into the Kafka's WIKI pages too.

here are two use cases where I fund having a cmd tool very useful:

Case 1: failed events re-processing

While processing events in my HighLevel Consumer, some may fail parsing or other business
processing. When that happens, I store the offset of such failed event into a separate log
- failedEvents.log (I have configured a specific Logback Logger for that). I do not fail processing
of the rest of the events, and do not commit offsets from the application, letting the auto-commit
to do it, which allows me to keep churning through the event stream as fast as possible ,
without slowing down the consumer - which is very important for my use case, as I need to
process 2-5K events per second. 

As a separate activity, I am checking the failedEvents.log periodically , and if anything
is found there and it is possible to reprocess those events (that's a manual investigation
that has to happen) - I determine the earliest offset of such failed events and would like
to restart the HL consumer starting from that offset.  Of course, other non-failed messages
will be reprocessed as well, but that is a different issue and in my case I am trying to make
event re-processing as idempotent as possible. Would be awesome to be able to re-process just
events with the offsets from the list - but I am not going that far :)

Without the ability to set the current offset manually in the HL consumer, I have two options:
1) in addition to (or instead of) storing offsets of the failed events in the failedEvents.log
- also store the event itself. And then have a separate app that would read through the log
and re-send each failed event to Kafka. It is ok - but duplicates the event storage, since
Kafka already has them preserved...

2) use Low-Level consumer and provide current offset value as an input parameter (to re-set
on re-start) - that's what I do for another app that requires this.

Case 2: performance and load testing

When doing load testing of my apps, I often generate millions of events and dump into Kafka
topics. As is expected with testing - sometimes something in the down-stream (from Kafka)
apps fails, and I need to stop the test and re-wind to some previous messages. In some cases
it is Ok to start from the EARLIEST offset, but in some I would like to start from a specific
offset that I know has the events I need to process. Also, since all events are preserved
in Kafka (love this feature!) - I can simulate different loads by just starting form a different
offset - for example, given a log size of 1.5M events, I can start form offset 500K to have
an exact 1M load, or I can start from offset 1M if I want 500K load.... Very convenient and
easy for QA to use.

Thanks again for your help!

----- Original Message -----
From: Guozhang Wang <wangguoz@gmail.com>
To: "users@kafka.apache.org" <users@kafka.apache.org>
Sent: Saturday, June 20, 2015 7:51 PM
Subject: Re: how to modify offsets stored in Kafka in version?


We do not have a command line tool to manually set offsets stored in Kafka
yet, but we are thinking about adding this feature soon. Could you
elaborate your use case of cmd manual offset modification a little bit so I
can understand your scenario better while working on the cmd design?


On Fri, Jun 19, 2015 at 7:11 AM, Rajasekar Elango <relango@salesforce.com>

> Hi Marina,
> Check slide 32 in this presentation
> <http://www.slideshare.net/jjkoshy/offset-management-in-kafka>.
> Hope this helps.
> Thanks,
> Raja.
> On Fri, Jun 19, 2015 at 9:43 AM, Marina <ppine7@yahoo.com.invalid> wrote:
> > Thanks, Stevo, for the quick reply,
> > Yes, I understand how to do this programmatically - but I would like to
> be
> > able to do this manually from a command line, just as before I was able
> to
> > do this in the Zookeeper shell. I don't want to write and run a Java app
> > just to set an offset :)
> >
> > [unless, of course, this is the only way to do this.....]
> >
> > thanks!
> > Marina
> >
> >
> >
> >
> > ----- Original Message -----
> > From: Stevo Slavić <sslavic@gmail.com>
> > To: users@kafka.apache.org; Marina <ppine7@yahoo.com>
> > Cc:
> > Sent: Friday, June 19, 2015 9:33 AM
> > Subject: Re: how to modify offsets stored in Kafka in version?
> >
> > Hello Marina,
> >
> > There's Kafka API to fetch and commit offsets
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > - maybe it will work for you.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> >
> > On Fri, Jun 19, 2015 at 3:23 PM, Marina <ppine7@yahoo.com.invalid>
> wrote:
> >
> > > Hi,
> > >
> > > in older Kafka versions where offsets were stored in Zookeeper - I
> could
> > > manually update the value of the Zookeeper's node:
> > >
> > >
> >
> /consumers/<consumer_group_name>/offsets/<topic_name>/<partition_number>/<offset_value>.
> > >
> > > In - there are no values in offsets anymore, but there is a new
> > > topic,
> > > __consumer_offsets, where as I understand offsets are tracked now.
> > >
> > > the ConsumerOffsetChecker tool seems to be able to get the offsets
> values
> > > from this topic , since I see correct value running it.
> > > So, how do I access this info myself?
> > >
> > >
> > > I tried:
> > >
> > > ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> > > __consumer_offsets --from-beginning
> > >
> > > but it does not show anything....
> > > Also, how would I change the offset? I need to do this sometimes if I
> > want
> > > to skip/ignore some messages and just advance offset manually.
> > >
> > > thanks,
> > > Marina
> > >
> >
> --
> Thanks,
> Raja.

-- Guozhang

View raw message