kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chia-Chun Shih <chiachun.s...@gmail.com>
Subject Re: consumer ack for high-level consumer?
Date Mon, 10 Nov 2014 07:27:38 GMT
Got it! Thanks for your response.



2014-11-07 13:14 GMT+08:00 Guozhang Wang <wangguoz@gmail.com>:

> 0. Yes, if consumer crashed before commit its offset it can cause
> duplicates.
>
> 1. Yes, since from the consumer client's point of view, once the message is
> returned from the iterator it is considered as "consumed"; if you want
> consumer to only consider a message as consumed when it is processed by the
> application on top of it, you need to turn off auto offset and manually
> call commit.
>
> On Thu, Nov 6, 2014 at 6:25 PM, Chia-Chun Shih <chiachun.shih@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thanks for your response. Therefore, offsets in ZK may be out-of-date. It
> > is possible to deliver duplicated messages when clients restart.
> >
> > I also wonder the possibilities of losing message. Is it possible that
> > things occur in this order?
> >
> >    1. Client calls ConsumerIterator$next() to get a message, update local
> >    offsets
> >    2. ZookeeperConsumerConnector$commitOffset() is called, local offsets
> >    sync to ZK
> >    3. Client fails when processing this message
> >    4. Client restarts, but this message is marked as consumed in ZK
> >
> > Thanks,
> > Chia-Chun
> >
> > 2014-11-07 1:45 GMT+08:00 Guozhang Wang <wangguoz@gmail.com>:
> >
> > > That is correct.
> > >
> > > Guozhang
> > >
> > > On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih <
> chiachun.shih@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for your response. I just read source code and found that:
> > > >
> > > >   1) ConsumerIterator$next() use
> PartitionTopicInfo$resetConsumeOffset
> > to
> > > > update offsets in PartitionTopicInfo objects.
> > > >   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets
> from
> > > > PartitionTopicInfo objects, and update offsets to ZK.
> > > >
> > > > So, when clients iterate through messages, offsets are updated
> locally
> > > > in PartitionTopicInfo
> > > > objects. When ZookeeperConsumerConnector$commitOffset is called,
> local
> > > > offsets are sync to ZK. Is it correct?
> > > >
> > > > regards,
> > > > Chia-Chun
> > > >
> > > > 2014-11-06 0:24 GMT+08:00 Guozhang Wang <wangguoz@gmail.com>:
> > > >
> > > > > Hello,
> > > > >
> > > > > You can turn of auto.commit.offset and manually call
> > > > > connector.commitOffset() manually after you have processed the
> data.
> > > One
> > > > > thing to remember is that the commit frequency is related to ZK (in
> > the
> > > > > future, Kafka) writes and hence you may not want to commit after
> > > > processed
> > > > > every single message but only a batch of messages.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <
> > > chiachun.shih@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am a new to Kafka. In my understanding, high-level consumer
(
> > > > > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > > > > by ConsumerIterator. But I would like to change offset when
> message
> > > is
> > > > > > processed, not when message is drawn from broker. So if a
> consumer
> > > dies
> > > > > > before a message is completely processed, the message will be
> > > processed
> > > > > > again. Is it possible?
> > > > > >
> > > > > > Thanks.
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message