kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: error recovery in multiple thread reading from Kafka with HighLevel api
Date Fri, 08 Aug 2014 18:23:30 GMT
Using simple consumer you then need to take care of consumer failure
detection and partition reassignment yourself. But you would have more
flexibility of the offsets.

If each time processing incur errors the corresponding consumer thread will
fail also (i.e. will not be involved in the rebalance and hence commit
offsets) and you could live with data duplicates, then you can just enable
auto offset commits with say, 10 secs period. We usually have even larger
period, like minutes.

Guozhang


On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <chen.apache.solr@gmail.com>
wrote:

> Maybe i could batch the messages before commit.., e.g committing every 10
> second.this is what the auto commit does anyway and  I could live with
> duplicate data.
> What do u think?
>
> I would then also seem to need a monitoring daemon to check the lag to
> restart the consumer during machine crashes..
>
>
> On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <chen.apache.solr@gmail.com>
> wrote:
>
> > Thanks,Guozhang,
> > So if I switch to SimpleConsumer, will these problems be taken care of
> > already? I would assume that I will need to manage all the offset by
> > myself, including the error recovery logic, right?
> > Chen
> >
> >
> > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> >> Hello Chen,
> >>
> >> 1. Manually commit offsets does have the risk of duplicates, consider
> the
> >> following pattern:
> >>
> >> message = consumer.next();
> >> process(message);
> >> consumer.commit();
> >>
> >> the rebalance can happen between line 2 and 3, where the message has
> been
> >> processed but offset not being committed, if another consumer picks up
> >> this
> >> partition after the rebalance, it may re-consume this message again.
> With
> >> auto.commit turned on, offsets will always be committed before the
> >> consumers release ownership of partitions during rebalances.
> >>
> >> In the 0.9 consumer design, we have fixed this issue by introducing the
> >> onPartitionDeassigned callback, you can take a look at its current API
> >> here:
> >>
> >>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >>
> >> 2. Commit offsets too often does have an overhead since it is going to
> >> Zookeeper, and ZK is not write-scalable. We are also fixing that issue
> by
> >> moving the offset management from ZK to kafka servers. This is already
> >> checked in trunk, and will be included in 0.8.2 release.
> >>
> >> Guozhang
> >>
> >>
> >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <chen.apache.solr@gmail.com>
> >> wrote:
> >>
> >> > Guozhang,
> >> > Just to make it clear:
> >> > If I have 10 threads with the same consumer group id, read the topic
> T.
> >> The
> >> > auto commit is turned off, and commitOffset is called only when the
> >> message
> >> > is processed successfully.
> >> > If thread 1 dies when processing message from partition P1, and the
> last
> >> > offset is Offset1.   Then kafka will ensure that one of the other
> >> running 9
> >> > threads will automatically pick up the message on partition P1 from
> >> Offset1
> >> > ? will the thread have the risk of reading the same message more than
> >> once?
> >> >
> >> > Also I would assume commit offset for each message is a bit heavy.
> What
> >> you
> >> > guys usually do for error handling during reading kafka?
> >> > Thanks much!
> >> > Chen
> >> >
> >> >
> >> >
> >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wangguoz@gmail.com>
> >> wrote:
> >> >
> >> > > Yes, in that case you can turn of auto commit and call commitOffsets
> >> > > manually after processing is finished. commitOffsets() will only
> write
> >> > the
> >> > > offset of the partitions that the consumer is currently fetching,
so
> >> > there
> >> > > is no need to coordinate this operation.
> >> > >
> >> > >
> >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> chen.apache.solr@gmail.com
> >> >
> >> > > wrote:
> >> > >
> >> > > > But with the auto commit turned on, I am risking off losing the
> >> failed
> >> > > > message, right? should I turn off the auto commit, and only commit
> >> the
> >> > > > offset when the message is processed successfully..But that would
> >> > require
> >> > > > the coordination between threads in order to know what is the
> right
> >> > > timing
> >> > > > to commit offset..
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wangguoz@gmail.com
> >
> >> > > wrote:
> >> > > >
> >> > > > > Hello Chen,
> >> > > > >
> >> > > > > With high-level consumer, the partition re-assignment is
> automatic
> >> > upon
> >> > > > > consumer failures.
> >> > > > >
> >> > > > > Guozhang
> >> > > > >
> >> > > > >
> >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> >> > chen.apache.solr@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Folks,
> >> > > > > >  I have a process started at specific time and read
from a
> >> specific
> >> > > > > topic.
> >> > > > > > I am currently using the High Level API(consumer group)
to
> read
> >> > from
> >> > > > > > kafka(and will stop once there is nothing in the topic
by
> >> > specifying
> >> > > a
> >> > > > > > timeout). i am most concerned about error recovery
in multiple
> >> > thread
> >> > > > > > context. If one thread dies, will other running bolt
threads
> >> picks
> >> > up
> >> > > > the
> >> > > > > > failed message? Or I have to start another thread in
order to
> >> pick
> >> > up
> >> > > > the
> >> > > > > > failed message? What would be  a good practice to ensure
the
> >> > message
> >> > > > can
> >> > > > > be
> >> > > > > > processed at least once?
> >> > > > > >
> >> > > > > > Note that all threads are using the same group id.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Chen
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message