kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: error recovery in multiple thread reading from Kafka with HighLevel api
Date Sat, 09 Aug 2014 16:57:26 GMT
We do not have java implementation of the operational tools yet.

Guozhang


On Fri, Aug 8, 2014 at 4:29 PM, Chen Wang <chen.apache.solr@gmail.com>
wrote:

> Guozhang,
> Just curious, do you guys already have a java version of the
> ConsumerOffsetChecker
>
> https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
> so that I could use it in my storm topology?
> Chen
>
>
> On Fri, Aug 8, 2014 at 2:03 PM, Chen Wang <chen.apache.solr@gmail.com>
> wrote:
>
> > ah..my bad..didn't notice i have put two auto.commit.interval.ms in the
> > config. After fixing it it now behaves as expected.:-)
> > Thanks again!!
> > Chen
> >
> >
> >
> > On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> >> Chen,
> >>
> >> Your auto.commit.interval.ms is set to 1 sec, which may be too small.
> >> Could
> >> you try with larger numbers, like 10000?
> >>
> >> Guozhang
> >>
> >>
> >> On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <chen.apache.solr@gmail.com>
> >> wrote:
> >>
> >> > Guozhang,
> >> > I just did a simple test, and kafka does not seem to do what it is
> >> supposed
> >> > to do:
> >> > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions,
> >> and
> >> > throw Runtime exception on all the even numbered messages. (2, 4,
> 6,..)
> >> >
> >> >   while (it.hasNext()){
> >> >
> >> >        String message =  new String(it.next().message());
> >> >
> >> >        System.out.println("message received" + message);
> >> >
> >> >        int messageInt = Integer.parseInt(message);
> >> >
> >> >        if(messageInt % 2 == 0){
> >> >
> >> >         // crash all the even numbered message
> >> >
> >> >         throw new RuntimeException("mesasge " + message + " failed");
> >> >
> >> >        }
> >> >
> >> >        }}
> >> >
> >> > My config is like this;
> >> >
> >> >     props.put("zookeeper.connect", a_zookeeper);
> >> >
> >> >         props.put("group.id", a_groupId);
> >> >
> >> >         props.put("zookeeper.session.timeout.ms", "4000");
> >> >
> >> >         props.put("zookeeper.sync.time.ms", "200");
> >> >
> >> >         props.put("auto.commit.interval.ms", "1000");
> >> >
> >> >         props.put("consumer.timeout.ms","6000");
> >> >
> >> >         props.put("autocommit.interval.ms", "360000");
> >> >
> >> >         props.put("auto.offset.reset","smallest");
> >> >
> >> >
> >> > I started 10 threads, but it seems that whenever I get the even
> numbered
> >> > message, the thread crashes, then I restart them, it starts read from
> >> the
> >> > next message: so in the first batch:
> >> >
> >> > message received1
> >> >
> >> > message received2
> >> >
> >> > Then I start again:
> >> >
> >> > message received3
> >> >
> >> > message received4
> >> >
> >> >
> >> > As you can see, message 2 is not replayed. Is this expected? I
> >> > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >> > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its
> >> consistent
> >> > with the testing result.(even numbered failed messages are not re
> >> > retrieved)
> >> >
> >> > What i am missing here?
> >> >
> >> > Chen
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangguoz@gmail.com>
> >> wrote:
> >> >
> >> > > Chen,
> >> > >
> >> > > You can use the ConsumerOffsetChecker tool.
> >> > >
> >> > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <
> >> chen.apache.solr@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > sounds like a good idea! I think i will go with the high level
> >> consumer
> >> > > > then.
> >> > > > Another question along with this design is that is there a way
to
> >> check
> >> > > the
> >> > > > lag for a consumer group for a topic? Upon machine crashes and
> >> > restarts,
> >> > > I
> >> > > > want to only continue reading from a certain topic if the lag
is
> NOT
> >> > 0. I
> >> > > > know I could depend on the time out("consumer.timeout.ms") to
> check
> >> > > > whether
> >> > > > there is still data in the topic, but wondering whether there
is
> >> more
> >> > > > elegant way.
> >> > > > Thanks much for the help, Guozhang!
> >> > > >
> >> > > > Chen
> >> > > >
> >> > > >
> >> > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <
> wangguoz@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Using simple consumer you then need to take care of consumer
> >> failure
> >> > > > > detection and partition reassignment yourself. But you would
> have
> >> > more
> >> > > > > flexibility of the offsets.
> >> > > > >
> >> > > > > If each time processing incur errors the corresponding consumer
> >> > thread
> >> > > > will
> >> > > > > fail also (i.e. will not be involved in the rebalance and
hence
> >> > commit
> >> > > > > offsets) and you could live with data duplicates, then you
can
> >> just
> >> > > > enable
> >> > > > > auto offset commits with say, 10 secs period. We usually
have
> even
> >> > > larger
> >> > > > > period, like minutes.
> >> > > > >
> >> > > > > Guozhang
> >> > > > >
> >> > > > >
> >> > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <
> >> > chen.apache.solr@gmail.com
> >> > > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Maybe i could batch the messages before commit.., e.g
> committing
> >> > > every
> >> > > > 10
> >> > > > > > second.this is what the auto commit does anyway and
 I could
> >> live
> >> > > with
> >> > > > > > duplicate data.
> >> > > > > > What do u think?
> >> > > > > >
> >> > > > > > I would then also seem to need a monitoring daemon
to check
> the
> >> lag
> >> > > to
> >> > > > > > restart the consumer during machine crashes..
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
> >> > > chen.apache.solr@gmail.com
> >> > > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Thanks,Guozhang,
> >> > > > > > > So if I switch to SimpleConsumer, will these problems
be
> taken
> >> > care
> >> > > > of
> >> > > > > > > already? I would assume that I will need to manage
all the
> >> offset
> >> > > by
> >> > > > > > > myself, including the error recovery logic, right?
> >> > > > > > > Chen
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang
<
> >> > wangguoz@gmail.com>
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hello Chen,
> >> > > > > > >>
> >> > > > > > >> 1. Manually commit offsets does have the risk
of
> duplicates,
> >> > > > consider
> >> > > > > > the
> >> > > > > > >> following pattern:
> >> > > > > > >>
> >> > > > > > >> message = consumer.next();
> >> > > > > > >> process(message);
> >> > > > > > >> consumer.commit();
> >> > > > > > >>
> >> > > > > > >> the rebalance can happen between line 2 and
3, where the
> >> message
> >> > > has
> >> > > > > > been
> >> > > > > > >> processed but offset not being committed,
if another
> consumer
> >> > > picks
> >> > > > up
> >> > > > > > >> this
> >> > > > > > >> partition after the rebalance, it may re-consume
this
> message
> >> > > again.
> >> > > > > > With
> >> > > > > > >> auto.commit turned on, offsets will always
be committed
> >> before
> >> > the
> >> > > > > > >> consumers release ownership of partitions
during
> rebalances.
> >> > > > > > >>
> >> > > > > > >> In the 0.9 consumer design, we have fixed
this issue by
> >> > > introducing
> >> > > > > the
> >> > > > > > >> onPartitionDeassigned callback, you can take
a look at its
> >> > current
> >> > > > API
> >> > > > > > >> here:
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >> > > > > > >>
> >> > > > > > >> 2. Commit offsets too often does have an overhead
since it
> is
> >> > > going
> >> > > > to
> >> > > > > > >> Zookeeper, and ZK is not write-scalable. We
are also fixing
> >> that
> >> > > > issue
> >> > > > > > by
> >> > > > > > >> moving the offset management from ZK to kafka
servers. This
> >> is
> >> > > > already
> >> > > > > > >> checked in trunk, and will be included in
0.8.2 release.
> >> > > > > > >>
> >> > > > > > >> Guozhang
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang
<
> >> > > > chen.apache.solr@gmail.com
> >> > > > > >
> >> > > > > > >> wrote:
> >> > > > > > >>
> >> > > > > > >> > Guozhang,
> >> > > > > > >> > Just to make it clear:
> >> > > > > > >> > If I have 10 threads with the same consumer
group id,
> read
> >> the
> >> > > > topic
> >> > > > > > T.
> >> > > > > > >> The
> >> > > > > > >> > auto commit is turned off, and commitOffset
is called
> only
> >> > when
> >> > > > the
> >> > > > > > >> message
> >> > > > > > >> > is processed successfully.
> >> > > > > > >> > If thread 1 dies when processing message
from partition
> P1,
> >> > and
> >> > > > the
> >> > > > > > last
> >> > > > > > >> > offset is Offset1.   Then kafka will
ensure that one of
> the
> >> > > other
> >> > > > > > >> running 9
> >> > > > > > >> > threads will automatically pick up the
message on
> >> partition P1
> >> > > > from
> >> > > > > > >> Offset1
> >> > > > > > >> > ? will the thread have the risk of reading
the same
> message
> >> > more
> >> > > > > than
> >> > > > > > >> once?
> >> > > > > > >> >
> >> > > > > > >> > Also I would assume commit offset for
each message is a
> bit
> >> > > heavy.
> >> > > > > > What
> >> > > > > > >> you
> >> > > > > > >> > guys usually do for error handling during
reading kafka?
> >> > > > > > >> > Thanks much!
> >> > > > > > >> > Chen
> >> > > > > > >> >
> >> > > > > > >> >
> >> > > > > > >> >
> >> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang
Wang <
> >> > > wangguoz@gmail.com
> >> > > > >
> >> > > > > > >> wrote:
> >> > > > > > >> >
> >> > > > > > >> > > Yes, in that case you can turn of
auto commit and call
> >> > > > > commitOffsets
> >> > > > > > >> > > manually after processing is finished.
commitOffsets()
> >> will
> >> > > only
> >> > > > > > write
> >> > > > > > >> > the
> >> > > > > > >> > > offset of the partitions that the
consumer is currently
> >> > > > fetching,
> >> > > > > so
> >> > > > > > >> > there
> >> > > > > > >> > > is no need to coordinate this operation.
> >> > > > > > >> > >
> >> > > > > > >> > >
> >> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM,
Chen Wang <
> >> > > > > > chen.apache.solr@gmail.com
> >> > > > > > >> >
> >> > > > > > >> > > wrote:
> >> > > > > > >> > >
> >> > > > > > >> > > > But with the auto commit turned
on, I am risking off
> >> > losing
> >> > > > the
> >> > > > > > >> failed
> >> > > > > > >> > > > message, right? should I turn
off the auto commit,
> and
> >> > only
> >> > > > > commit
> >> > > > > > >> the
> >> > > > > > >> > > > offset when the message is
processed
> successfully..But
> >> > that
> >> > > > > would
> >> > > > > > >> > require
> >> > > > > > >> > > > the coordination between threads
in order to know
> what
> >> is
> >> > > the
> >> > > > > > right
> >> > > > > > >> > > timing
> >> > > > > > >> > > > to commit offset..
> >> > > > > > >> > > >
> >> > > > > > >> > > >
> >> > > > > > >> > > >
> >> > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54
PM, Guozhang Wang <
> >> > > > > wangguoz@gmail.com
> >> > > > > > >
> >> > > > > > >> > > wrote:
> >> > > > > > >> > > >
> >> > > > > > >> > > > > Hello Chen,
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > With high-level consumer,
the partition
> >> re-assignment is
> >> > > > > > automatic
> >> > > > > > >> > upon
> >> > > > > > >> > > > > consumer failures.
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > Guozhang
> >> > > > > > >> > > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > On Thu, Aug 7, 2014 at
4:41 PM, Chen Wang <
> >> > > > > > >> > chen.apache.solr@gmail.com>
> >> > > > > > >> > > > > wrote:
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > > Folks,
> >> > > > > > >> > > > > >  I have a process
started at specific time and
> read
> >> > > from a
> >> > > > > > >> specific
> >> > > > > > >> > > > > topic.
> >> > > > > > >> > > > > > I am currently using
the High Level API(consumer
> >> > group)
> >> > > to
> >> > > > > > read
> >> > > > > > >> > from
> >> > > > > > >> > > > > > kafka(and will stop
once there is nothing in the
> >> topic
> >> > > by
> >> > > > > > >> > specifying
> >> > > > > > >> > > a
> >> > > > > > >> > > > > > timeout). i am most
concerned about error
> recovery
> >> in
> >> > > > > multiple
> >> > > > > > >> > thread
> >> > > > > > >> > > > > > context. If one thread
dies, will other running
> >> bolt
> >> > > > threads
> >> > > > > > >> picks
> >> > > > > > >> > up
> >> > > > > > >> > > > the
> >> > > > > > >> > > > > > failed message? Or
I have to start another thread
> >> in
> >> > > order
> >> > > > > to
> >> > > > > > >> pick
> >> > > > > > >> > up
> >> > > > > > >> > > > the
> >> > > > > > >> > > > > > failed message? What
would be  a good practice to
> >> > ensure
> >> > > > the
> >> > > > > > >> > message
> >> > > > > > >> > > > can
> >> > > > > > >> > > > > be
> >> > > > > > >> > > > > > processed at least
once?
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > > > Note that all threads
are using the same group
> id.
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > > > Thanks,
> >> > > > > > >> > > > > > Chen
> >> > > > > > >> > > > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > >
> >> > > > > > >> > > > > --
> >> > > > > > >> > > > > -- Guozhang
> >> > > > > > >> > > > >
> >> > > > > > >> > > >
> >> > > > > > >> > >
> >> > > > > > >> > >
> >> > > > > > >> > >
> >> > > > > > >> > > --
> >> > > > > > >> > > -- Guozhang
> >> > > > > > >> > >
> >> > > > > > >> >
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> --
> >> > > > > > >> -- Guozhang
> >> > > > > > >>
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message