kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Wang <chen.apache.s...@gmail.com>
Subject Re: error recovery in multiple thread reading from Kafka with HighLevel api
Date Fri, 08 Aug 2014 23:29:20 GMT
Guozhang,
Just curious, do you guys already have a java version of the
ConsumerOffsetChecker
https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
so that I could use it in my storm topology?
Chen


On Fri, Aug 8, 2014 at 2:03 PM, Chen Wang <chen.apache.solr@gmail.com>
wrote:

> ah..my bad..didn't notice i have put two auto.commit.interval.ms in the
> config. After fixing it it now behaves as expected.:-)
> Thanks again!!
> Chen
>
>
>
> On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> Chen,
>>
>> Your auto.commit.interval.ms is set to 1 sec, which may be too small.
>> Could
>> you try with larger numbers, like 10000?
>>
>> Guozhang
>>
>>
>> On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <chen.apache.solr@gmail.com>
>> wrote:
>>
>> > Guozhang,
>> > I just did a simple test, and kafka does not seem to do what it is
>> supposed
>> > to do:
>> > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions,
>> and
>> > throw Runtime exception on all the even numbered messages. (2, 4, 6,..)
>> >
>> >   while (it.hasNext()){
>> >
>> >        String message =  new String(it.next().message());
>> >
>> >        System.out.println("message received" + message);
>> >
>> >        int messageInt = Integer.parseInt(message);
>> >
>> >        if(messageInt % 2 == 0){
>> >
>> >         // crash all the even numbered message
>> >
>> >         throw new RuntimeException("mesasge " + message + " failed");
>> >
>> >        }
>> >
>> >        }}
>> >
>> > My config is like this;
>> >
>> >     props.put("zookeeper.connect", a_zookeeper);
>> >
>> >         props.put("group.id", a_groupId);
>> >
>> >         props.put("zookeeper.session.timeout.ms", "4000");
>> >
>> >         props.put("zookeeper.sync.time.ms", "200");
>> >
>> >         props.put("auto.commit.interval.ms", "1000");
>> >
>> >         props.put("consumer.timeout.ms","6000");
>> >
>> >         props.put("autocommit.interval.ms", "360000");
>> >
>> >         props.put("auto.offset.reset","smallest");
>> >
>> >
>> > I started 10 threads, but it seems that whenever I get the even numbered
>> > message, the thread crashes, then I restart them, it starts read from
>> the
>> > next message: so in the first batch:
>> >
>> > message received1
>> >
>> > message received2
>> >
>> > Then I start again:
>> >
>> > message received3
>> >
>> > message received4
>> >
>> >
>> > As you can see, message 2 is not replayed. Is this expected? I
>> > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>> > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its
>> consistent
>> > with the testing result.(even numbered failed messages are not re
>> > retrieved)
>> >
>> > What i am missing here?
>> >
>> > Chen
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangguoz@gmail.com>
>> wrote:
>> >
>> > > Chen,
>> > >
>> > > You can use the ConsumerOffsetChecker tool.
>> > >
>> > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <
>> chen.apache.solr@gmail.com>
>> > > wrote:
>> > >
>> > > > sounds like a good idea! I think i will go with the high level
>> consumer
>> > > > then.
>> > > > Another question along with this design is that is there a way to
>> check
>> > > the
>> > > > lag for a consumer group for a topic? Upon machine crashes and
>> > restarts,
>> > > I
>> > > > want to only continue reading from a certain topic if the lag is NOT
>> > 0. I
>> > > > know I could depend on the time out("consumer.timeout.ms") to check
>> > > > whether
>> > > > there is still data in the topic, but wondering whether there is
>> more
>> > > > elegant way.
>> > > > Thanks much for the help, Guozhang!
>> > > >
>> > > > Chen
>> > > >
>> > > >
>> > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangguoz@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Using simple consumer you then need to take care of consumer
>> failure
>> > > > > detection and partition reassignment yourself. But you would
have
>> > more
>> > > > > flexibility of the offsets.
>> > > > >
>> > > > > If each time processing incur errors the corresponding consumer
>> > thread
>> > > > will
>> > > > > fail also (i.e. will not be involved in the rebalance and hence
>> > commit
>> > > > > offsets) and you could live with data duplicates, then you can
>> just
>> > > > enable
>> > > > > auto offset commits with say, 10 secs period. We usually have
even
>> > > larger
>> > > > > period, like minutes.
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > >
>> > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <
>> > chen.apache.solr@gmail.com
>> > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Maybe i could batch the messages before commit.., e.g committing
>> > > every
>> > > > 10
>> > > > > > second.this is what the auto commit does anyway and  I could
>> live
>> > > with
>> > > > > > duplicate data.
>> > > > > > What do u think?
>> > > > > >
>> > > > > > I would then also seem to need a monitoring daemon to check
the
>> lag
>> > > to
>> > > > > > restart the consumer during machine crashes..
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <
>> > > chen.apache.solr@gmail.com
>> > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Thanks,Guozhang,
>> > > > > > > So if I switch to SimpleConsumer, will these problems
be taken
>> > care
>> > > > of
>> > > > > > > already? I would assume that I will need to manage
all the
>> offset
>> > > by
>> > > > > > > myself, including the error recovery logic, right?
>> > > > > > > Chen
>> > > > > > >
>> > > > > > >
>> > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <
>> > wangguoz@gmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > >> Hello Chen,
>> > > > > > >>
>> > > > > > >> 1. Manually commit offsets does have the risk of
duplicates,
>> > > > consider
>> > > > > > the
>> > > > > > >> following pattern:
>> > > > > > >>
>> > > > > > >> message = consumer.next();
>> > > > > > >> process(message);
>> > > > > > >> consumer.commit();
>> > > > > > >>
>> > > > > > >> the rebalance can happen between line 2 and 3,
where the
>> message
>> > > has
>> > > > > > been
>> > > > > > >> processed but offset not being committed, if another
consumer
>> > > picks
>> > > > up
>> > > > > > >> this
>> > > > > > >> partition after the rebalance, it may re-consume
this message
>> > > again.
>> > > > > > With
>> > > > > > >> auto.commit turned on, offsets will always be committed
>> before
>> > the
>> > > > > > >> consumers release ownership of partitions during
rebalances.
>> > > > > > >>
>> > > > > > >> In the 0.9 consumer design, we have fixed this
issue by
>> > > introducing
>> > > > > the
>> > > > > > >> onPartitionDeassigned callback, you can take a
look at its
>> > current
>> > > > API
>> > > > > > >> here:
>> > > > > > >>
>> > > > > > >>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
>> > > > > > >>
>> > > > > > >> 2. Commit offsets too often does have an overhead
since it is
>> > > going
>> > > > to
>> > > > > > >> Zookeeper, and ZK is not write-scalable. We are
also fixing
>> that
>> > > > issue
>> > > > > > by
>> > > > > > >> moving the offset management from ZK to kafka servers.
This
>> is
>> > > > already
>> > > > > > >> checked in trunk, and will be included in 0.8.2
release.
>> > > > > > >>
>> > > > > > >> Guozhang
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <
>> > > > chen.apache.solr@gmail.com
>> > > > > >
>> > > > > > >> wrote:
>> > > > > > >>
>> > > > > > >> > Guozhang,
>> > > > > > >> > Just to make it clear:
>> > > > > > >> > If I have 10 threads with the same consumer
group id, read
>> the
>> > > > topic
>> > > > > > T.
>> > > > > > >> The
>> > > > > > >> > auto commit is turned off, and commitOffset
is called only
>> > when
>> > > > the
>> > > > > > >> message
>> > > > > > >> > is processed successfully.
>> > > > > > >> > If thread 1 dies when processing message from
partition P1,
>> > and
>> > > > the
>> > > > > > last
>> > > > > > >> > offset is Offset1.   Then kafka will ensure
that one of the
>> > > other
>> > > > > > >> running 9
>> > > > > > >> > threads will automatically pick up the message
on
>> partition P1
>> > > > from
>> > > > > > >> Offset1
>> > > > > > >> > ? will the thread have the risk of reading
the same message
>> > more
>> > > > > than
>> > > > > > >> once?
>> > > > > > >> >
>> > > > > > >> > Also I would assume commit offset for each
message is a bit
>> > > heavy.
>> > > > > > What
>> > > > > > >> you
>> > > > > > >> > guys usually do for error handling during
reading kafka?
>> > > > > > >> > Thanks much!
>> > > > > > >> > Chen
>> > > > > > >> >
>> > > > > > >> >
>> > > > > > >> >
>> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang
<
>> > > wangguoz@gmail.com
>> > > > >
>> > > > > > >> wrote:
>> > > > > > >> >
>> > > > > > >> > > Yes, in that case you can turn of auto
commit and call
>> > > > > commitOffsets
>> > > > > > >> > > manually after processing is finished.
commitOffsets()
>> will
>> > > only
>> > > > > > write
>> > > > > > >> > the
>> > > > > > >> > > offset of the partitions that the consumer
is currently
>> > > > fetching,
>> > > > > so
>> > > > > > >> > there
>> > > > > > >> > > is no need to coordinate this operation.
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen
Wang <
>> > > > > > chen.apache.solr@gmail.com
>> > > > > > >> >
>> > > > > > >> > > wrote:
>> > > > > > >> > >
>> > > > > > >> > > > But with the auto commit turned
on, I am risking off
>> > losing
>> > > > the
>> > > > > > >> failed
>> > > > > > >> > > > message, right? should I turn off
the auto commit, and
>> > only
>> > > > > commit
>> > > > > > >> the
>> > > > > > >> > > > offset when the message is processed
successfully..But
>> > that
>> > > > > would
>> > > > > > >> > require
>> > > > > > >> > > > the coordination between threads
in order to know what
>> is
>> > > the
>> > > > > > right
>> > > > > > >> > > timing
>> > > > > > >> > > > to commit offset..
>> > > > > > >> > > >
>> > > > > > >> > > >
>> > > > > > >> > > >
>> > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM,
Guozhang Wang <
>> > > > > wangguoz@gmail.com
>> > > > > > >
>> > > > > > >> > > wrote:
>> > > > > > >> > > >
>> > > > > > >> > > > > Hello Chen,
>> > > > > > >> > > > >
>> > > > > > >> > > > > With high-level consumer, the
partition
>> re-assignment is
>> > > > > > automatic
>> > > > > > >> > upon
>> > > > > > >> > > > > consumer failures.
>> > > > > > >> > > > >
>> > > > > > >> > > > > Guozhang
>> > > > > > >> > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41
PM, Chen Wang <
>> > > > > > >> > chen.apache.solr@gmail.com>
>> > > > > > >> > > > > wrote:
>> > > > > > >> > > > >
>> > > > > > >> > > > > > Folks,
>> > > > > > >> > > > > >  I have a process started
at specific time and read
>> > > from a
>> > > > > > >> specific
>> > > > > > >> > > > > topic.
>> > > > > > >> > > > > > I am currently using the
High Level API(consumer
>> > group)
>> > > to
>> > > > > > read
>> > > > > > >> > from
>> > > > > > >> > > > > > kafka(and will stop once
there is nothing in the
>> topic
>> > > by
>> > > > > > >> > specifying
>> > > > > > >> > > a
>> > > > > > >> > > > > > timeout). i am most concerned
about error recovery
>> in
>> > > > > multiple
>> > > > > > >> > thread
>> > > > > > >> > > > > > context. If one thread
dies, will other running
>> bolt
>> > > > threads
>> > > > > > >> picks
>> > > > > > >> > up
>> > > > > > >> > > > the
>> > > > > > >> > > > > > failed message? Or I have
to start another thread
>> in
>> > > order
>> > > > > to
>> > > > > > >> pick
>> > > > > > >> > up
>> > > > > > >> > > > the
>> > > > > > >> > > > > > failed message? What would
be  a good practice to
>> > ensure
>> > > > the
>> > > > > > >> > message
>> > > > > > >> > > > can
>> > > > > > >> > > > > be
>> > > > > > >> > > > > > processed at least once?
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > Note that all threads
are using the same group id.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > Thanks,
>> > > > > > >> > > > > > Chen
>> > > > > > >> > > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > > --
>> > > > > > >> > > > > -- Guozhang
>> > > > > > >> > > > >
>> > > > > > >> > > >
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > > --
>> > > > > > >> > > -- Guozhang
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> --
>> > > > > > >> -- Guozhang
>> > > > > > >>
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message