kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Getting fixed amount of messages using Zookeeper based consumer
Date Sat, 15 Sep 2012 05:39:41 GMT
Actually, every time you consume a message, the offset moves to the
beginning of the next message.

Thanks,

Jun

On Fri, Sep 14, 2012 at 6:52 PM, Felix GV <felix@mate1inc.com> wrote:

> Hello,
>
> Sorry for doing thread necromancy on this one, but I have a little question
> hehe... Can you confirm whether my understanding, below, is correct please?
>
>    1. Every time I extract a message from a KafkaMessageStream, it sets my
>    consumer offset to the offset of the beginning of the message I just
>    extracted.
>    2. If I shut down my consumer (and call commitOffsets), then the offset
>    kept in memory by the KafkaMessageStream will be committed to ZK.
>    3. This means that if my program persists the message I just extracted
>    into a datastore, and then shuts down the consumer before I extract the
>    next message, then, the next time I start my consumer, I will begin from
>    the offset of the beginning of that same message, extract it again, and
> I
>    will thus end up persisting a duplicate of that message in my datastore.
>
> This is what my testing seems to demonstrate...
>
> I have found a way to account for this behavior in my current use case, but
> I wanted to know if the behavior I describe above is normal and intended,
> or if perhaps I'm doing something weird that could be causing unexpected
> behavior.
>
> The behavior I describe makes sense for "at least once" guarantees of
> delivery. I guess the alternative implementation would have been to set the
> consumer offset in ZK to that of the next message, whether the current
> iteration succeeds or not, which would have given "at most once" guarantees
> (as far as that part of the system is concerned anyway).
>
> So, yeah, I'd like it if someone could confirm or deny my above
> interpretation.
>
> And also, I have another related question: say that a consumer was to die
> without being shutdown gracefully (and without calling commitOffsets), then
> would the offset stored in ZK be the one that was put there the last time
> the autocommit.interval.ms elapsed, thus causing potentially even more
> duplicate messages the next time the consumer is started? (This is assuming
> the default settings where autocommit.enable is true.)
>
> Thanks in advance :) ...!
>
> --
> Felix
>
>
>
> On Thu, Jul 12, 2012 at 10:17 AM, Jun Rao <junrao@gmail.com> wrote:
>
> > Yes, it knows. The consumer offset is only advanced every time a message
> is
> > iterated over.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <vpuranik@gmail.com
> > >wrote:
> >
> > > The inner loop keeps running. If I break it in the middle, is Kafka
> > broker
> > > going to know that rest of the mesaages in the stream were not
> delivered?
> > >
> > > Regards,
> > > Vaibhav
> > > GumGum
> > > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpuranik@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > Is there any way to get a fixed amount of messages using Zookeeper
> > based
> > > > consumer (ConsumerConnector)?
> > > >
> > > > I know that with SimpleConsumer you can pass fetchSize as an argument
> > and
> > > > limit the number of messages coming back.
> > > >
> > > > This sample code creates 4 threads that keep consuming forever.
> > > >
> > > >
> > > > // consume the messages in the threads
> > > > for(final KafkaStream<Message> stream: streams) {
> > > >   executor.submit(new Runnable() {
> > > >     public void run() {
> > > >       for(MessageAndMetadata msgAndMetadata: stream) {
> > > >         // process message (msgAndMetadata.message())
> > > >       }
> > > >     }
> > > >   });
> > > > }
> > > >
> > > > Regards,
> > > > Vaibhav
> > > >
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message