kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Wang <chen.apache.s...@gmail.com>
Subject Re: error recovery in multiple thread reading from Kafka with HighLevel api
Date Fri, 08 Aug 2014 19:18:36 GMT
sounds like a good idea! I think i will go with the high level consumer
then.
Another question along with this design is that is there a way to check the
lag for a consumer group for a topic? Upon machine crashes and restarts, I
want to only continue reading from a certain topic if the lag is NOT 0. I
know I could depend on the time out("consumer.timeout.ms") to check whether
there is still data in the topic, but wondering whether there is more
elegant way.
Thanks much for the help, Guozhang!

Chen


On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Using simple consumer you then need to take care of consumer failure
> detection and partition reassignment yourself. But you would have more
> flexibility of the offsets.
>
> If each time processing incur errors the corresponding consumer thread will
> fail also (i.e. will not be involved in the rebalance and hence commit
> offsets) and you could live with data duplicates, then you can just enable
> auto offset commits with say, 10 secs period. We usually have even larger
> period, like minutes.
>
> Guozhang
>
>
> On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <chen.apache.solr@gmail.com>
> wrote:
>
> > Maybe i could batch the messages before commit.., e.g committing every 10
> > second.this is what the auto commit does anyway and  I could live with
> > duplicate data.
> > What do u think?
> >
> > I would then also seem to need a monitoring daemon to check the lag to
> > restart the consumer during machine crashes..
> >
> >
> > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <chen.apache.solr@gmail.com>
> > wrote:
> >
> > > Thanks,Guozhang,
> > > So if I switch to SimpleConsumer, will these problems be taken care of
> > > already? I would assume that I will need to manage all the offset by
> > > myself, including the error recovery logic, right?
> > > Chen
> > >
> > >
> > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > >> Hello Chen,
> > >>
> > >> 1. Manually commit offsets does have the risk of duplicates, consider
> > the
> > >> following pattern:
> > >>
> > >> message = consumer.next();
> > >> process(message);
> > >> consumer.commit();
> > >>
> > >> the rebalance can happen between line 2 and 3, where the message has
> > been
> > >> processed but offset not being committed, if another consumer picks up
> > >> this
> > >> partition after the rebalance, it may re-consume this message again.
> > With
> > >> auto.commit turned on, offsets will always be committed before the
> > >> consumers release ownership of partitions during rebalances.
> > >>
> > >> In the 0.9 consumer design, we have fixed this issue by introducing
> the
> > >> onPartitionDeassigned callback, you can take a look at its current API
> > >> here:
> > >>
> > >>
> > >>
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >>
> > >> 2. Commit offsets too often does have an overhead since it is going to
> > >> Zookeeper, and ZK is not write-scalable. We are also fixing that issue
> > by
> > >> moving the offset management from ZK to kafka servers. This is already
> > >> checked in trunk, and will be included in 0.8.2 release.
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <chen.apache.solr@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Guozhang,
> > >> > Just to make it clear:
> > >> > If I have 10 threads with the same consumer group id, read the topic
> > T.
> > >> The
> > >> > auto commit is turned off, and commitOffset is called only when the
> > >> message
> > >> > is processed successfully.
> > >> > If thread 1 dies when processing message from partition P1, and the
> > last
> > >> > offset is Offset1.   Then kafka will ensure that one of the other
> > >> running 9
> > >> > threads will automatically pick up the message on partition P1 from
> > >> Offset1
> > >> > ? will the thread have the risk of reading the same message more
> than
> > >> once?
> > >> >
> > >> > Also I would assume commit offset for each message is a bit heavy.
> > What
> > >> you
> > >> > guys usually do for error handling during reading kafka?
> > >> > Thanks much!
> > >> > Chen
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wangguoz@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Yes, in that case you can turn of auto commit and call
> commitOffsets
> > >> > > manually after processing is finished. commitOffsets() will only
> > write
> > >> > the
> > >> > > offset of the partitions that the consumer is currently fetching,
> so
> > >> > there
> > >> > > is no need to coordinate this operation.
> > >> > >
> > >> > >
> > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > chen.apache.solr@gmail.com
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > But with the auto commit turned on, I am risking off losing
the
> > >> failed
> > >> > > > message, right? should I turn off the auto commit, and only
> commit
> > >> the
> > >> > > > offset when the message is processed successfully..But that
> would
> > >> > require
> > >> > > > the coordination between threads in order to know what is
the
> > right
> > >> > > timing
> > >> > > > to commit offset..
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > > Hello Chen,
> > >> > > > >
> > >> > > > > With high-level consumer, the partition re-assignment
is
> > automatic
> > >> > upon
> > >> > > > > consumer failures.
> > >> > > > >
> > >> > > > > Guozhang
> > >> > > > >
> > >> > > > >
> > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang <
> > >> > chen.apache.solr@gmail.com>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Folks,
> > >> > > > > >  I have a process started at specific time and
read from a
> > >> specific
> > >> > > > > topic.
> > >> > > > > > I am currently using the High Level API(consumer
group) to
> > read
> > >> > from
> > >> > > > > > kafka(and will stop once there is nothing in the
topic by
> > >> > specifying
> > >> > > a
> > >> > > > > > timeout). i am most concerned about error recovery
in
> multiple
> > >> > thread
> > >> > > > > > context. If one thread dies, will other running
bolt threads
> > >> picks
> > >> > up
> > >> > > > the
> > >> > > > > > failed message? Or I have to start another thread
in order
> to
> > >> pick
> > >> > up
> > >> > > > the
> > >> > > > > > failed message? What would be  a good practice
to ensure the
> > >> > message
> > >> > > > can
> > >> > > > > be
> > >> > > > > > processed at least once?
> > >> > > > > >
> > >> > > > > > Note that all threads are using the same group
id.
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Chen
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > > -- Guozhang
> > >> > > > >
> > >> > > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > --
> > >> > > -- Guozhang
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message