kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christopher Piggott <cpigg...@gmail.com>
Subject Re: Consumer questions
Date Sat, 17 Jan 2015 18:45:28 GMT
Thanks.  That helped clear a lot up in my mind.

I'm trying to high-level consumer now.  Occasionally I need to do a replay
of the stream.  The example is:

   KafkaStream.iterator();

which starts at wherever zookeeper recorded as where you left off.

With the high level interface, can you request an iterator that starts at
the very beginning?



On Fri, Jan 16, 2015 at 8:55 PM, Manikumar Reddy <kumar@nmsworks.co.in>
wrote:

> Hi,
>
> 1. In SimpleConsumer, you must keep track of the offsets in your
> application.
>    In the example code,  "readOffset"  variable  can be saved in
> redis/zookeeper.
>    You should plugin this logic in your code. High Level Consumer stores
> the last
>    read offset information in ZooKeeper.
>
> 2. You will get OffsetOutOfRange for any invalid offset.
>    On error, you can decide what to do. i.e read from the latest , earliest
> or some other offset.
>
> 3. https://issues.apache.org/jira/browse/KAFKA-1779
>
> 4. Yes
>
>
> Manikumar
>
> On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott <cpiggott@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am following this link:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> >
> > for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can find
> in
> > maven central).  I have a couple of questions about the consumer.  I
> > checked the archives and didn't see these exact questions asked already,
> > but I may have missed them -- I apologize if that is the case.
> >
> >
> > When I create a consumer I give it a consumer ID.  I assumed that it
> would
> > store my consumer's name as well as the last readOffset in zookeeper, but
> > looking in zookeeper that doesn't seem to be the case.  So it seems to me
> > that when my consumers come up they need to either get the entire history
> > from the start of time (which could take a long time, as I have 14 day
> > durability); or else they need to somehow keep track of the read offset
> > themselves.
> >
> > I have redis in my system already, so I have the choice of keeping track
> of
> > this in either redis or zookeeper.  It seems like zookeeper would be a
> > better idea.  Am I right, though, that the SimpleConsumer and the
> example I
> > linked above don't keep track of this, so if I want to do that I would
> have
> > to do it myself?
> >
> > Second question: in the example consumer, there is an error handler that
> > checks if you received an OffsetOutOfRange response from kafka.  If so,
> it
> > gets a new read offset .LatestTime().  My interpretation of this is that
> > you have asked it for an offset which doesn't make sense, so it just
> scans
> > you to the end of the stream.  That's a guaranteed data loss.  A simple
> > alternative would be to take the beginning of the stream, which if you
> have
> > idempotent processing would be fine - it would be a replay - but it could
> > take a long time.
> >
> > I don't know for sure what would cause you to get an OffsetOutOfRange -
> the
> > only thing I can really think of is that someone has changed the
> underlying
> > stream on you (like they deleted and recreated it and didn't tell all the
> > consumers).  I guess it's possible that if I have a 1 day stream
> durability
> > and I stop my consumer for 3 days that it could ask for a readOffset that
> > no longer exists; it's not clear to me whether or not that would result
> in
> > an OffsetOutOfRange error or not.
> >
> > Does that all make sense?
> >
> > Third question: I set a .maxWait(1000) interpreting that to mean that
> when
> > I make my fetch request the consumer will time out if there are no new
> > messages in 1 second.  It doesn't seem tow ork - my call to
> > consumer.fetch() seems to return immediately.  Is that expected?
> >
> > Final question: just to confirm:
> >
> >     new FetchRequestBuilder().addFetch( topic, shardNum, readOffset,
> > FETCH_SIZE )
> >
> > FETCH_SIZE is in bytes, not number of messages, so presumably it fetches
> as
> > many messages as will fit into that many byte buffer?  Is that right?
> >
> > Thanks.
> >
> >
> > Christopher Piggott
> > Sr. Staff Engineer
> > Golisano Institute for Sustainability
> > Rochester Institute of Technology
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message