kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka question
Date Tue, 07 Apr 2015 17:06:06 GMT
Jack,

Okay I see your point now. I was originally thinking that in each run, you
1) first create the topic, 2) start producing to the topic, 3) start
consuming from the topic, and then 4) delete the topic, stop producers /
consumers before complete, but it sounds like you actually only create the
topic once.

If that is the case and you always use a different group id, then yes with
the current consumer you have to make sure that at the boundary of each
run, when you stop the consumers you also have to halt the producers from
continue producing until the starting of the next run. In the new consumer
that we are currently developing, it allows you to specify the starting
offset for your consumption and you could then do some offset check
pointing outside Kafka on the consumer side and use the check pointed
offsets when you resume in each run.

You can find the new consumer's API here (check position() / seek()
specifically) and let me know if you think that will work for your case.

http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Guozhang

On Mon, Apr 6, 2015 at 8:39 PM, Jack <jacklu@gmail.com> wrote:

> How about the first run then? If we use "largest" as "auto.offset.reset"
> value, what value will these consumers get? I assume it will point to the
> latest position in the log. Is that true? Just you know, we can't have a
> warm up run so that the later runs can use the committed offset by that
> run.
>
> To give you a little bit more context, for every run, we create a unique
> group.id so essentially, we want the offset to point to a safe position so
> that consumer won't miss any messages appended after that point. So is
> there a way other than setting "auto.offset.reset" to "smallest" which we
> know it works, but it took forever to get the data (since the log is long).
>
> Thanks again.
>
> -Jack
>
> On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Did you turn on automatic offset committing? If yes then this issue
> should
> > not happen as later runs will just consume data from the last committed
> > offset.
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 5:16 PM, Jack <jacklu@gmail.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > > When I switched to auto.offset.reset to smallest, it will work.
> However,
> > it
> > > will generate a lot of data and it will slow down the verification.
> > >
> > > Thanks,
> > >
> > > -Jack
> > >
> > > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > > > Jack,
> > > >
> > > > Could you just change "auto.offset.reset" to smallest and see if this
> > > issue
> > > > goes away? It is not related to the producer end.
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <jacklu@gmail.com> wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > Thanks so much for replying, first of all.
> > > > >
> > > > > Here is the config we have:
> > > > >
> > > > > group.id -> 'some unique id'
> > > > > zookeeper.connect -> 'zookeeper host'
> > > > > auto.commit.enabled -> false
> > > > > 'auto.offset.reset' -> largest
> > > > > consumer.timeout.ms -> -1
> > > > > fetch.message.max.bytes -> 10M
> > > > >
> > > > > So it seems like we need to make sure the submitted future returns
> > > before
> > > > > performing action actions which eventually generate the message we
> > > > expect.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > -Jack
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangguoz@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Jack,
> > > > > >
> > > > > > Your theory is correct if your consumer config set
> > auto.offset.reset
> > > to
> > > > > > latest and you do not have any committed offsets before. Could
> you
> > > list
> > > > > > your consumer configs and see if that is the case?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jacklu@gmail.com>
wrote:
> > > > > >
> > > > > > > Hi folks,
> > > > > > >
> > > > > > > I have a quick question.
> > > > > > >
> > > > > > > We are using 0.8.1 and running into this weird problem.
We are
> > > using
> > > > > > > HighLevelConsumer for this topic. We created 64 partitions
for
> > this
> > > > > > > message.
> > > > > > >
> > > > > > > In our service, we first create a Consumer object as usual,
and
> > > then
> > > > we
> > > > > > > went ahead, calls 'createMessageStreans' with
> > > Map('topic_name'->64).
> > > > It
> > > > > > > returns us a Se[KafkaStream], For each stream object in
the
> > > sequence,
> > > > > we
> > > > > > > submit a task like the following to the pool.
> > > > > > >
> > > > > > > threadpool.submit(new Runnable {
> > > > > > >    override def run() = {
> > > > > > >       stream.iterator().foreach { msg => ...}
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > > The problem we ran into is that after all the above
> established,
> > > any
> > > > > > > message showing up in kafka, we should be able to get it
from
> > > > consumer
> > > > > > > side. But in reality, for some reason, occasionally, we
don't
> see
> > > > these
> > > > > > > message (we do see these message in the log though).
> > > > > > >
> > > > > > >  Some team members believe that the stream might get a
later
> > > offset,
> > > > > thus
> > > > > > > not being able to see the earlier messages.
> > > > > > >
> > > > > > > I really doubt that statement and want to see if anyone
could
> > shed
> > > > any
> > > > > > > light upon this?
> > > > > > >
> > > > > > > One possible theory from me is that the offset won't be
given
> > until
> > > > > > > stream.iterator().next is called, but since the task submission
> > is
> > > > > > > asynchronous (we don't wait for each submission and then
> produce
> > > > > message
> > > > > > to
> > > > > > > kafka), that could get us a later offset, which might not
> > contains
> > > > the
> > > > > > > message we want). One possible solution to that is perform
any
> > > action
> > > > > > which
> > > > > > > produce messages to kafka, after all these submitted tasks
> > returns.
> > > > > > >
> > > > > > > Any thoughts?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > -Jack
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message