kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Wang <chen.apache.s...@gmail.com>
Subject Re: error recovery in multiple thread reading from Kafka with HighLevel api
Date Fri, 08 Aug 2014 20:41:42 GMT
Guozhang,
I just did a simple test, and kafka does not seem to do what it is supposed
to do:
I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, and
throw Runtime exception on all the even numbered messages. (2, 4, 6,..)

  while (it.hasNext()){

       String message =  new String(it.next().message());

       System.out.println("message received" + message);

       int messageInt = Integer.parseInt(message);

       if(messageInt % 2 == 0){

        // crash all the even numbered message

        throw new RuntimeException("mesasge " + message + " failed");

       }

       }}

My config is like this;

    props.put("zookeeper.connect", a_zookeeper);

        props.put("group.id", a_groupId);

        props.put("zookeeper.session.timeout.ms", "4000");

        props.put("zookeeper.sync.time.ms", "200");

        props.put("auto.commit.interval.ms", "1000");

        props.put("consumer.timeout.ms","6000");

        props.put("autocommit.interval.ms", "360000");

        props.put("auto.offset.reset","smallest");


I started 10 threads, but it seems that whenever I get the even numbered
message, the thread crashes, then I restart them, it starts read from the
next message: so in the first batch:

message received1

message received2

Then I start again:

message received3

message received4


As you can see, message 2 is not replayed. Is this expected? I
run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
chen_test_6 --topic test_20 -zkconnect localhost:2182, and its consistent
with the testing result.(even numbered failed messages are not re retrieved)

What i am missing here?

Chen






On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Chen,
>
> You can use the ConsumerOffsetChecker tool.
>
> http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
>
> Guozhang
>
>
> On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <chen.apache.solr@gmail.com>
> wrote:
>
> > sounds like a good idea! I think i will go with the high level consumer
> > then.
> > Another question along with this design is that is there a way to check
> the
> > lag for a consumer group for a topic? Upon machine crashes and restarts,
> I
> > want to only continue reading from a certain topic if the lag is NOT 0. I
> > know I could depend on the time out("consumer.timeout.ms") to check
> > whether
> > there is still data in the topic, but wondering whether there is more
> > elegant way.
> > Thanks much for the help, Guozhang!
> >
> > Chen
> >
> >
> > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Using simple consumer you then need to take care of consumer failure
> > > detection and partition reassignment yourself. But you would have more
> > > flexibility of the offsets.
> > >
> > > If each time processing incur errors the corresponding consumer thread
> > will
> > > fail also (i.e. will not be involved in the rebalance and hence commit
> > > offsets) and you could live with data duplicates, then you can just
> > enable
> > > auto offset commits with say, 10 secs period. We usually have even
> larger
> > > period, like minutes.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <chen.apache.solr@gmail.com
> >
> > > wrote:
> > >
> > > > Maybe i could batch the messages before commit.., e.g committing
> every
> > 10
> > > > second.this is what the auto commit does anyway and  I could live
> with
> > > > duplicate data.
> > > > What do u think?
> > > >
> > > > I would then also seem to need a monitoring daemon to check the lag
> to
> > > > restart the consumer during machine crashes..
> > > >
> > > >
> > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
> chen.apache.solr@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thanks,Guozhang,
> > > > > So if I switch to SimpleConsumer, will these problems be taken care
> > of
> > > > > already? I would assume that I will need to manage all the offset
> by
> > > > > myself, including the error recovery logic, right?
> > > > > Chen
> > > > >
> > > > >
> > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wangguoz@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Hello Chen,
> > > > >>
> > > > >> 1. Manually commit offsets does have the risk of duplicates,
> > consider
> > > > the
> > > > >> following pattern:
> > > > >>
> > > > >> message = consumer.next();
> > > > >> process(message);
> > > > >> consumer.commit();
> > > > >>
> > > > >> the rebalance can happen between line 2 and 3, where the message
> has
> > > > been
> > > > >> processed but offset not being committed, if another consumer
> picks
> > up
> > > > >> this
> > > > >> partition after the rebalance, it may re-consume this message
> again.
> > > > With
> > > > >> auto.commit turned on, offsets will always be committed before
the
> > > > >> consumers release ownership of partitions during rebalances.
> > > > >>
> > > > >> In the 0.9 consumer design, we have fixed this issue by
> introducing
> > > the
> > > > >> onPartitionDeassigned callback, you can take a look at its current
> > API
> > > > >> here:
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > >>
> > > > >> 2. Commit offsets too often does have an overhead since it is
> going
> > to
> > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing that
> > issue
> > > > by
> > > > >> moving the offset management from ZK to kafka servers. This is
> > already
> > > > >> checked in trunk, and will be included in 0.8.2 release.
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >>
> > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
> > chen.apache.solr@gmail.com
> > > >
> > > > >> wrote:
> > > > >>
> > > > >> > Guozhang,
> > > > >> > Just to make it clear:
> > > > >> > If I have 10 threads with the same consumer group id, read
the
> > topic
> > > > T.
> > > > >> The
> > > > >> > auto commit is turned off, and commitOffset is called only
when
> > the
> > > > >> message
> > > > >> > is processed successfully.
> > > > >> > If thread 1 dies when processing message from partition
P1, and
> > the
> > > > last
> > > > >> > offset is Offset1.   Then kafka will ensure that one of
the
> other
> > > > >> running 9
> > > > >> > threads will automatically pick up the message on partition
P1
> > from
> > > > >> Offset1
> > > > >> > ? will the thread have the risk of reading the same message
more
> > > than
> > > > >> once?
> > > > >> >
> > > > >> > Also I would assume commit offset for each message is a
bit
> heavy.
> > > > What
> > > > >> you
> > > > >> > guys usually do for error handling during reading kafka?
> > > > >> > Thanks much!
> > > > >> > Chen
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > >> wrote:
> > > > >> >
> > > > >> > > Yes, in that case you can turn of auto commit and call
> > > commitOffsets
> > > > >> > > manually after processing is finished. commitOffsets()
will
> only
> > > > write
> > > > >> > the
> > > > >> > > offset of the partitions that the consumer is currently
> > fetching,
> > > so
> > > > >> > there
> > > > >> > > is no need to coordinate this operation.
> > > > >> > >
> > > > >> > >
> > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <
> > > > chen.apache.solr@gmail.com
> > > > >> >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > But with the auto commit turned on, I am risking
off losing
> > the
> > > > >> failed
> > > > >> > > > message, right? should I turn off the auto commit,
and only
> > > commit
> > > > >> the
> > > > >> > > > offset when the message is processed successfully..But
that
> > > would
> > > > >> > require
> > > > >> > > > the coordination between threads in order to know
what is
> the
> > > > right
> > > > >> > > timing
> > > > >> > > > to commit offset..
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang
<
> > > wangguoz@gmail.com
> > > > >
> > > > >> > > wrote:
> > > > >> > > >
> > > > >> > > > > Hello Chen,
> > > > >> > > > >
> > > > >> > > > > With high-level consumer, the partition re-assignment
is
> > > > automatic
> > > > >> > upon
> > > > >> > > > > consumer failures.
> > > > >> > > > >
> > > > >> > > > > Guozhang
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang
<
> > > > >> > chen.apache.solr@gmail.com>
> > > > >> > > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Folks,
> > > > >> > > > > >  I have a process started at specific
time and read
> from a
> > > > >> specific
> > > > >> > > > > topic.
> > > > >> > > > > > I am currently using the High Level
API(consumer group)
> to
> > > > read
> > > > >> > from
> > > > >> > > > > > kafka(and will stop once there is nothing
in the topic
> by
> > > > >> > specifying
> > > > >> > > a
> > > > >> > > > > > timeout). i am most concerned about
error recovery in
> > > multiple
> > > > >> > thread
> > > > >> > > > > > context. If one thread dies, will other
running bolt
> > threads
> > > > >> picks
> > > > >> > up
> > > > >> > > > the
> > > > >> > > > > > failed message? Or I have to start another
thread in
> order
> > > to
> > > > >> pick
> > > > >> > up
> > > > >> > > > the
> > > > >> > > > > > failed message? What would be  a good
practice to ensure
> > the
> > > > >> > message
> > > > >> > > > can
> > > > >> > > > > be
> > > > >> > > > > > processed at least once?
> > > > >> > > > > >
> > > > >> > > > > > Note that all threads are using the
same group id.
> > > > >> > > > > >
> > > > >> > > > > > Thanks,
> > > > >> > > > > > Chen
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > --
> > > > >> > > > > -- Guozhang
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > > --
> > > > >> > > -- Guozhang
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message