kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Wang <chen.apache.s...@gmail.com>
Subject Re: error recovery in multiple thread reading from Kafka with HighLevel api
Date Fri, 08 Aug 2014 21:03:56 GMT
ah..my bad..didn't notice i have put two auto.commit.interval.ms in the
config. After fixing it it now behaves as expected.:-)
Thanks again!!
Chen


On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Chen,
>
> Your auto.commit.interval.ms is set to 1 sec, which may be too small.
> Could
> you try with larger numbers, like 10000?
>
> Guozhang
>
>
> On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <chen.apache.solr@gmail.com>
> wrote:
>
> > Guozhang,
> > I just did a simple test, and kafka does not seem to do what it is
> supposed
> > to do:
> > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, and
> > throw Runtime exception on all the even numbered messages. (2, 4, 6,..)
> >
> >   while (it.hasNext()){
> >
> >        String message =  new String(it.next().message());
> >
> >        System.out.println("message received" + message);
> >
> >        int messageInt = Integer.parseInt(message);
> >
> >        if(messageInt % 2 == 0){
> >
> >         // crash all the even numbered message
> >
> >         throw new RuntimeException("mesasge " + message + " failed");
> >
> >        }
> >
> >        }}
> >
> > My config is like this;
> >
> >     props.put("zookeeper.connect", a_zookeeper);
> >
> >         props.put("group.id", a_groupId);
> >
> >         props.put("zookeeper.session.timeout.ms", "4000");
> >
> >         props.put("zookeeper.sync.time.ms", "200");
> >
> >         props.put("auto.commit.interval.ms", "1000");
> >
> >         props.put("consumer.timeout.ms","6000");
> >
> >         props.put("autocommit.interval.ms", "360000");
> >
> >         props.put("auto.offset.reset","smallest");
> >
> >
> > I started 10 threads, but it seems that whenever I get the even numbered
> > message, the thread crashes, then I restart them, it starts read from the
> > next message: so in the first batch:
> >
> > message received1
> >
> > message received2
> >
> > Then I start again:
> >
> > message received3
> >
> > message received4
> >
> >
> > As you can see, message 2 is not replayed. Is this expected? I
> > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its consistent
> > with the testing result.(even numbered failed messages are not re
> > retrieved)
> >
> > What i am missing here?
> >
> > Chen
> >
> >
> >
> >
> >
> >
> > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Chen,
> > >
> > > You can use the ConsumerOffsetChecker tool.
> > >
> > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <chen.apache.solr@gmail.com
> >
> > > wrote:
> > >
> > > > sounds like a good idea! I think i will go with the high level
> consumer
> > > > then.
> > > > Another question along with this design is that is there a way to
> check
> > > the
> > > > lag for a consumer group for a topic? Upon machine crashes and
> > restarts,
> > > I
> > > > want to only continue reading from a certain topic if the lag is NOT
> > 0. I
> > > > know I could depend on the time out("consumer.timeout.ms") to check
> > > > whether
> > > > there is still data in the topic, but wondering whether there is more
> > > > elegant way.
> > > > Thanks much for the help, Guozhang!
> > > >
> > > > Chen
> > > >
> > > >
> > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > > > Using simple consumer you then need to take care of consumer
> failure
> > > > > detection and partition reassignment yourself. But you would have
> > more
> > > > > flexibility of the offsets.
> > > > >
> > > > > If each time processing incur errors the corresponding consumer
> > thread
> > > > will
> > > > > fail also (i.e. will not be involved in the rebalance and hence
> > commit
> > > > > offsets) and you could live with data duplicates, then you can just
> > > > enable
> > > > > auto offset commits with say, 10 secs period. We usually have even
> > > larger
> > > > > period, like minutes.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <
> > chen.apache.solr@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Maybe i could batch the messages before commit.., e.g committing
> > > every
> > > > 10
> > > > > > second.this is what the auto commit does anyway and  I could
live
> > > with
> > > > > > duplicate data.
> > > > > > What do u think?
> > > > > >
> > > > > > I would then also seem to need a monitoring daemon to check
the
> lag
> > > to
> > > > > > restart the consumer during machine crashes..
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
> > > chen.apache.solr@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks,Guozhang,
> > > > > > > So if I switch to SimpleConsumer, will these problems be
taken
> > care
> > > > of
> > > > > > > already? I would assume that I will need to manage all
the
> offset
> > > by
> > > > > > > myself, including the error recovery logic, right?
> > > > > > > Chen
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <
> > wangguoz@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hello Chen,
> > > > > > >>
> > > > > > >> 1. Manually commit offsets does have the risk of duplicates,
> > > > consider
> > > > > > the
> > > > > > >> following pattern:
> > > > > > >>
> > > > > > >> message = consumer.next();
> > > > > > >> process(message);
> > > > > > >> consumer.commit();
> > > > > > >>
> > > > > > >> the rebalance can happen between line 2 and 3, where
the
> message
> > > has
> > > > > > been
> > > > > > >> processed but offset not being committed, if another
consumer
> > > picks
> > > > up
> > > > > > >> this
> > > > > > >> partition after the rebalance, it may re-consume this
message
> > > again.
> > > > > > With
> > > > > > >> auto.commit turned on, offsets will always be committed
before
> > the
> > > > > > >> consumers release ownership of partitions during rebalances.
> > > > > > >>
> > > > > > >> In the 0.9 consumer design, we have fixed this issue
by
> > > introducing
> > > > > the
> > > > > > >> onPartitionDeassigned callback, you can take a look
at its
> > current
> > > > API
> > > > > > >> here:
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > >>
> > > > > > >> 2. Commit offsets too often does have an overhead since
it is
> > > going
> > > > to
> > > > > > >> Zookeeper, and ZK is not write-scalable. We are also
fixing
> that
> > > > issue
> > > > > > by
> > > > > > >> moving the offset management from ZK to kafka servers.
This is
> > > > already
> > > > > > >> checked in trunk, and will be included in 0.8.2 release.
> > > > > > >>
> > > > > > >> Guozhang
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> > > > chen.apache.solr@gmail.com
> > > > > >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Guozhang,
> > > > > > >> > Just to make it clear:
> > > > > > >> > If I have 10 threads with the same consumer group
id, read
> the
> > > > topic
> > > > > > T.
> > > > > > >> The
> > > > > > >> > auto commit is turned off, and commitOffset is
called only
> > when
> > > > the
> > > > > > >> message
> > > > > > >> > is processed successfully.
> > > > > > >> > If thread 1 dies when processing message from
partition P1,
> > and
> > > > the
> > > > > > last
> > > > > > >> > offset is Offset1.   Then kafka will ensure that
one of the
> > > other
> > > > > > >> running 9
> > > > > > >> > threads will automatically pick up the message
on partition
> P1
> > > > from
> > > > > > >> Offset1
> > > > > > >> > ? will the thread have the risk of reading the
same message
> > more
> > > > > than
> > > > > > >> once?
> > > > > > >> >
> > > > > > >> > Also I would assume commit offset for each message
is a bit
> > > heavy.
> > > > > > What
> > > > > > >> you
> > > > > > >> > guys usually do for error handling during reading
kafka?
> > > > > > >> > Thanks much!
> > > > > > >> > Chen
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang
<
> > > wangguoz@gmail.com
> > > > >
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > Yes, in that case you can turn of auto commit
and call
> > > > > commitOffsets
> > > > > > >> > > manually after processing is finished. commitOffsets()
> will
> > > only
> > > > > > write
> > > > > > >> > the
> > > > > > >> > > offset of the partitions that the consumer
is currently
> > > > fetching,
> > > > > so
> > > > > > >> > there
> > > > > > >> > > is no need to coordinate this operation.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang
<
> > > > > > chen.apache.solr@gmail.com
> > > > > > >> >
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > But with the auto commit turned on,
I am risking off
> > losing
> > > > the
> > > > > > >> failed
> > > > > > >> > > > message, right? should I turn off the
auto commit, and
> > only
> > > > > commit
> > > > > > >> the
> > > > > > >> > > > offset when the message is processed
successfully..But
> > that
> > > > > would
> > > > > > >> > require
> > > > > > >> > > > the coordination between threads in
order to know what
> is
> > > the
> > > > > > right
> > > > > > >> > > timing
> > > > > > >> > > > to commit offset..
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang
Wang <
> > > > > wangguoz@gmail.com
> > > > > > >
> > > > > > >> > > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hello Chen,
> > > > > > >> > > > >
> > > > > > >> > > > > With high-level consumer, the partition
re-assignment
> is
> > > > > > automatic
> > > > > > >> > upon
> > > > > > >> > > > > consumer failures.
> > > > > > >> > > > >
> > > > > > >> > > > > Guozhang
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM,
Chen Wang <
> > > > > > >> > chen.apache.solr@gmail.com>
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > Folks,
> > > > > > >> > > > > >  I have a process started
at specific time and read
> > > from a
> > > > > > >> specific
> > > > > > >> > > > > topic.
> > > > > > >> > > > > > I am currently using the High
Level API(consumer
> > group)
> > > to
> > > > > > read
> > > > > > >> > from
> > > > > > >> > > > > > kafka(and will stop once there
is nothing in the
> topic
> > > by
> > > > > > >> > specifying
> > > > > > >> > > a
> > > > > > >> > > > > > timeout). i am most concerned
about error recovery
> in
> > > > > multiple
> > > > > > >> > thread
> > > > > > >> > > > > > context. If one thread dies,
will other running bolt
> > > > threads
> > > > > > >> picks
> > > > > > >> > up
> > > > > > >> > > > the
> > > > > > >> > > > > > failed message? Or I have
to start another thread in
> > > order
> > > > > to
> > > > > > >> pick
> > > > > > >> > up
> > > > > > >> > > > the
> > > > > > >> > > > > > failed message? What would
be  a good practice to
> > ensure
> > > > the
> > > > > > >> > message
> > > > > > >> > > > can
> > > > > > >> > > > > be
> > > > > > >> > > > > > processed at least once?
> > > > > > >> > > > > >
> > > > > > >> > > > > > Note that all threads are
using the same group id.
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks,
> > > > > > >> > > > > > Chen
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > --
> > > > > > >> > > > > -- Guozhang
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > --
> > > > > > >> > > -- Guozhang
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> -- Guozhang
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message