kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: error recovery in multiple thread reading from Kafka with HighLevel api
Date Fri, 08 Aug 2014 20:09:56 GMT
Chen,

You can use the ConsumerOffsetChecker tool.

http://kafka.apache.org/documentation.html#basic_ops_consumer_lag

Guozhang


On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <chen.apache.solr@gmail.com>
wrote:

> sounds like a good idea! I think i will go with the high level consumer
> then.
> Another question along with this design is that is there a way to check the
> lag for a consumer group for a topic? Upon machine crashes and restarts, I
> want to only continue reading from a certain topic if the lag is NOT 0. I
> know I could depend on the time out("consumer.timeout.ms") to check
> whether
> there is still data in the topic, but wondering whether there is more
> elegant way.
> Thanks much for the help, Guozhang!
>
> Chen
>
>
> On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Using simple consumer you then need to take care of consumer failure
> > detection and partition reassignment yourself. But you would have more
> > flexibility of the offsets.
> >
> > If each time processing incur errors the corresponding consumer thread
> will
> > fail also (i.e. will not be involved in the rebalance and hence commit
> > offsets) and you could live with data duplicates, then you can just
> enable
> > auto offset commits with say, 10 secs period. We usually have even larger
> > period, like minutes.
> >
> > Guozhang
> >
> >
> > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <chen.apache.solr@gmail.com>
> > wrote:
> >
> > > Maybe i could batch the messages before commit.., e.g committing every
> 10
> > > second.this is what the auto commit does anyway and  I could live with
> > > duplicate data.
> > > What do u think?
> > >
> > > I would then also seem to need a monitoring daemon to check the lag to
> > > restart the consumer during machine crashes..
> > >
> > >
> > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <chen.apache.solr@gmail.com
> >
> > > wrote:
> > >
> > > > Thanks,Guozhang,
> > > > So if I switch to SimpleConsumer, will these problems be taken care
> of
> > > > already? I would assume that I will need to manage all the offset by
> > > > myself, including the error recovery logic, right?
> > > > Chen
> > > >
> > > >
> > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > >> Hello Chen,
> > > >>
> > > >> 1. Manually commit offsets does have the risk of duplicates,
> consider
> > > the
> > > >> following pattern:
> > > >>
> > > >> message = consumer.next();
> > > >> process(message);
> > > >> consumer.commit();
> > > >>
> > > >> the rebalance can happen between line 2 and 3, where the message has
> > > been
> > > >> processed but offset not being committed, if another consumer picks
> up
> > > >> this
> > > >> partition after the rebalance, it may re-consume this message again.
> > > With
> > > >> auto.commit turned on, offsets will always be committed before the
> > > >> consumers release ownership of partitions during rebalances.
> > > >>
> > > >> In the 0.9 consumer design, we have fixed this issue by introducing
> > the
> > > >> onPartitionDeassigned callback, you can take a look at its current
> API
> > > >> here:
> > > >>
> > > >>
> > > >>
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >>
> > > >> 2. Commit offsets too often does have an overhead since it is going
> to
> > > >> Zookeeper, and ZK is not write-scalable. We are also fixing that
> issue
> > > by
> > > >> moving the offset management from ZK to kafka servers. This is
> already
> > > >> checked in trunk, and will be included in 0.8.2 release.
> > > >>
> > > >> Guozhang
> > > >>
> > > >>
> > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> chen.apache.solr@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >> > Guozhang,
> > > >> > Just to make it clear:
> > > >> > If I have 10 threads with the same consumer group id, read the
> topic
> > > T.
> > > >> The
> > > >> > auto commit is turned off, and commitOffset is called only when
> the
> > > >> message
> > > >> > is processed successfully.
> > > >> > If thread 1 dies when processing message from partition P1, and
> the
> > > last
> > > >> > offset is Offset1.   Then kafka will ensure that one of the other
> > > >> running 9
> > > >> > threads will automatically pick up the message on partition P1
> from
> > > >> Offset1
> > > >> > ? will the thread have the risk of reading the same message more
> > than
> > > >> once?
> > > >> >
> > > >> > Also I would assume commit offset for each message is a bit heavy.
> > > What
> > > >> you
> > > >> > guys usually do for error handling during reading kafka?
> > > >> > Thanks much!
> > > >> > Chen
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wangguoz@gmail.com
> >
> > > >> wrote:
> > > >> >
> > > >> > > Yes, in that case you can turn of auto commit and call
> > commitOffsets
> > > >> > > manually after processing is finished. commitOffsets() will
only
> > > write
> > > >> > the
> > > >> > > offset of the partitions that the consumer is currently
> fetching,
> > so
> > > >> > there
> > > >> > > is no need to coordinate this operation.
> > > >> > >
> > > >> > >
> > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > > chen.apache.solr@gmail.com
> > > >> >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > But with the auto commit turned on, I am risking off
losing
> the
> > > >> failed
> > > >> > > > message, right? should I turn off the auto commit,
and only
> > commit
> > > >> the
> > > >> > > > offset when the message is processed successfully..But
that
> > would
> > > >> > require
> > > >> > > > the coordination between threads in order to know what
is the
> > > right
> > > >> > > timing
> > > >> > > > to commit offset..
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > > >
> > > >> > > > > Hello Chen,
> > > >> > > > >
> > > >> > > > > With high-level consumer, the partition re-assignment
is
> > > automatic
> > > >> > upon
> > > >> > > > > consumer failures.
> > > >> > > > >
> > > >> > > > > Guozhang
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> > > >> > chen.apache.solr@gmail.com>
> > > >> > > > > wrote:
> > > >> > > > >
> > > >> > > > > > Folks,
> > > >> > > > > >  I have a process started at specific time
and read from a
> > > >> specific
> > > >> > > > > topic.
> > > >> > > > > > I am currently using the High Level API(consumer
group) to
> > > read
> > > >> > from
> > > >> > > > > > kafka(and will stop once there is nothing
in the topic by
> > > >> > specifying
> > > >> > > a
> > > >> > > > > > timeout). i am most concerned about error
recovery in
> > multiple
> > > >> > thread
> > > >> > > > > > context. If one thread dies, will other running
bolt
> threads
> > > >> picks
> > > >> > up
> > > >> > > > the
> > > >> > > > > > failed message? Or I have to start another
thread in order
> > to
> > > >> pick
> > > >> > up
> > > >> > > > the
> > > >> > > > > > failed message? What would be  a good practice
to ensure
> the
> > > >> > message
> > > >> > > > can
> > > >> > > > > be
> > > >> > > > > > processed at least once?
> > > >> > > > > >
> > > >> > > > > > Note that all threads are using the same
group id.
> > > >> > > > > >
> > > >> > > > > > Thanks,
> > > >> > > > > > Chen
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > --
> > > >> > > > > -- Guozhang
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > -- Guozhang
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message