kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: error recovery in multiple thread reading from Kafka with HighLevel api
Date Fri, 08 Aug 2014 20:58:47 GMT
Chen,

Your auto.commit.interval.ms is set to 1 sec, which may be too small. Could
you try with larger numbers, like 10000?

Guozhang


On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <chen.apache.solr@gmail.com>
wrote:

> Guozhang,
> I just did a simple test, and kafka does not seem to do what it is supposed
> to do:
> I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, and
> throw Runtime exception on all the even numbered messages. (2, 4, 6,..)
>
>   while (it.hasNext()){
>
>        String message =  new String(it.next().message());
>
>        System.out.println("message received" + message);
>
>        int messageInt = Integer.parseInt(message);
>
>        if(messageInt % 2 == 0){
>
>         // crash all the even numbered message
>
>         throw new RuntimeException("mesasge " + message + " failed");
>
>        }
>
>        }}
>
> My config is like this;
>
>     props.put("zookeeper.connect", a_zookeeper);
>
>         props.put("group.id", a_groupId);
>
>         props.put("zookeeper.session.timeout.ms", "4000");
>
>         props.put("zookeeper.sync.time.ms", "200");
>
>         props.put("auto.commit.interval.ms", "1000");
>
>         props.put("consumer.timeout.ms","6000");
>
>         props.put("autocommit.interval.ms", "360000");
>
>         props.put("auto.offset.reset","smallest");
>
>
> I started 10 threads, but it seems that whenever I get the even numbered
> message, the thread crashes, then I restart them, it starts read from the
> next message: so in the first batch:
>
> message received1
>
> message received2
>
> Then I start again:
>
> message received3
>
> message received4
>
>
> As you can see, message 2 is not replayed. Is this expected? I
> run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> chen_test_6 --topic test_20 -zkconnect localhost:2182, and its consistent
> with the testing result.(even numbered failed messages are not re
> retrieved)
>
> What i am missing here?
>
> Chen
>
>
>
>
>
>
> On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Chen,
> >
> > You can use the ConsumerOffsetChecker tool.
> >
> > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
> >
> > Guozhang
> >
> >
> > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <chen.apache.solr@gmail.com>
> > wrote:
> >
> > > sounds like a good idea! I think i will go with the high level consumer
> > > then.
> > > Another question along with this design is that is there a way to check
> > the
> > > lag for a consumer group for a topic? Upon machine crashes and
> restarts,
> > I
> > > want to only continue reading from a certain topic if the lag is NOT
> 0. I
> > > know I could depend on the time out("consumer.timeout.ms") to check
> > > whether
> > > there is still data in the topic, but wondering whether there is more
> > > elegant way.
> > > Thanks much for the help, Guozhang!
> > >
> > > Chen
> > >
> > >
> > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > > > Using simple consumer you then need to take care of consumer failure
> > > > detection and partition reassignment yourself. But you would have
> more
> > > > flexibility of the offsets.
> > > >
> > > > If each time processing incur errors the corresponding consumer
> thread
> > > will
> > > > fail also (i.e. will not be involved in the rebalance and hence
> commit
> > > > offsets) and you could live with data duplicates, then you can just
> > > enable
> > > > auto offset commits with say, 10 secs period. We usually have even
> > larger
> > > > period, like minutes.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <
> chen.apache.solr@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Maybe i could batch the messages before commit.., e.g committing
> > every
> > > 10
> > > > > second.this is what the auto commit does anyway and  I could live
> > with
> > > > > duplicate data.
> > > > > What do u think?
> > > > >
> > > > > I would then also seem to need a monitoring daemon to check the lag
> > to
> > > > > restart the consumer during machine crashes..
> > > > >
> > > > >
> > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
> > chen.apache.solr@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Thanks,Guozhang,
> > > > > > So if I switch to SimpleConsumer, will these problems be taken
> care
> > > of
> > > > > > already? I would assume that I will need to manage all the offset
> > by
> > > > > > myself, including the error recovery logic, right?
> > > > > > Chen
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Hello Chen,
> > > > > >>
> > > > > >> 1. Manually commit offsets does have the risk of duplicates,
> > > consider
> > > > > the
> > > > > >> following pattern:
> > > > > >>
> > > > > >> message = consumer.next();
> > > > > >> process(message);
> > > > > >> consumer.commit();
> > > > > >>
> > > > > >> the rebalance can happen between line 2 and 3, where the
message
> > has
> > > > > been
> > > > > >> processed but offset not being committed, if another consumer
> > picks
> > > up
> > > > > >> this
> > > > > >> partition after the rebalance, it may re-consume this message
> > again.
> > > > > With
> > > > > >> auto.commit turned on, offsets will always be committed
before
> the
> > > > > >> consumers release ownership of partitions during rebalances.
> > > > > >>
> > > > > >> In the 0.9 consumer design, we have fixed this issue by
> > introducing
> > > > the
> > > > > >> onPartitionDeassigned callback, you can take a look at its
> current
> > > API
> > > > > >> here:
> > > > > >>
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > >>
> > > > > >> 2. Commit offsets too often does have an overhead since
it is
> > going
> > > to
> > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing
that
> > > issue
> > > > > by
> > > > > >> moving the offset management from ZK to kafka servers. This
is
> > > already
> > > > > >> checked in trunk, and will be included in 0.8.2 release.
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> > > chen.apache.solr@gmail.com
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Guozhang,
> > > > > >> > Just to make it clear:
> > > > > >> > If I have 10 threads with the same consumer group id,
read the
> > > topic
> > > > > T.
> > > > > >> The
> > > > > >> > auto commit is turned off, and commitOffset is called
only
> when
> > > the
> > > > > >> message
> > > > > >> > is processed successfully.
> > > > > >> > If thread 1 dies when processing message from partition
P1,
> and
> > > the
> > > > > last
> > > > > >> > offset is Offset1.   Then kafka will ensure that one
of the
> > other
> > > > > >> running 9
> > > > > >> > threads will automatically pick up the message on partition
P1
> > > from
> > > > > >> Offset1
> > > > > >> > ? will the thread have the risk of reading the same
message
> more
> > > > than
> > > > > >> once?
> > > > > >> >
> > > > > >> > Also I would assume commit offset for each message
is a bit
> > heavy.
> > > > > What
> > > > > >> you
> > > > > >> > guys usually do for error handling during reading kafka?
> > > > > >> > Thanks much!
> > > > > >> > Chen
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <
> > wangguoz@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Yes, in that case you can turn of auto commit
and call
> > > > commitOffsets
> > > > > >> > > manually after processing is finished. commitOffsets()
will
> > only
> > > > > write
> > > > > >> > the
> > > > > >> > > offset of the partitions that the consumer is
currently
> > > fetching,
> > > > so
> > > > > >> > there
> > > > > >> > > is no need to coordinate this operation.
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > > > > chen.apache.solr@gmail.com
> > > > > >> >
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > But with the auto commit turned on, I am
risking off
> losing
> > > the
> > > > > >> failed
> > > > > >> > > > message, right? should I turn off the auto
commit, and
> only
> > > > commit
> > > > > >> the
> > > > > >> > > > offset when the message is processed successfully..But
> that
> > > > would
> > > > > >> > require
> > > > > >> > > > the coordination between threads in order
to know what is
> > the
> > > > > right
> > > > > >> > > timing
> > > > > >> > > > to commit offset..
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang
Wang <
> > > > wangguoz@gmail.com
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > > >
> > > > > >> > > > > Hello Chen,
> > > > > >> > > > >
> > > > > >> > > > > With high-level consumer, the partition
re-assignment is
> > > > > automatic
> > > > > >> > upon
> > > > > >> > > > > consumer failures.
> > > > > >> > > > >
> > > > > >> > > > > Guozhang
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen
Wang <
> > > > > >> > chen.apache.solr@gmail.com>
> > > > > >> > > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Folks,
> > > > > >> > > > > >  I have a process started at specific
time and read
> > from a
> > > > > >> specific
> > > > > >> > > > > topic.
> > > > > >> > > > > > I am currently using the High Level
API(consumer
> group)
> > to
> > > > > read
> > > > > >> > from
> > > > > >> > > > > > kafka(and will stop once there
is nothing in the topic
> > by
> > > > > >> > specifying
> > > > > >> > > a
> > > > > >> > > > > > timeout). i am most concerned about
error recovery in
> > > > multiple
> > > > > >> > thread
> > > > > >> > > > > > context. If one thread dies, will
other running bolt
> > > threads
> > > > > >> picks
> > > > > >> > up
> > > > > >> > > > the
> > > > > >> > > > > > failed message? Or I have to start
another thread in
> > order
> > > > to
> > > > > >> pick
> > > > > >> > up
> > > > > >> > > > the
> > > > > >> > > > > > failed message? What would be 
a good practice to
> ensure
> > > the
> > > > > >> > message
> > > > > >> > > > can
> > > > > >> > > > > be
> > > > > >> > > > > > processed at least once?
> > > > > >> > > > > >
> > > > > >> > > > > > Note that all threads are using
the same group id.
> > > > > >> > > > > >
> > > > > >> > > > > > Thanks,
> > > > > >> > > > > > Chen
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > --
> > > > > >> > > > > -- Guozhang
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > --
> > > > > >> > > -- Guozhang
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> -- Guozhang
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message