kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jack <jac...@gmail.com>
Subject Re: Kafka question
Date Tue, 07 Apr 2015 03:39:07 GMT
How about the first run then? If we use "largest" as "auto.offset.reset"
value, what value will these consumers get? I assume it will point to the
latest position in the log. Is that true? Just you know, we can't have a
warm up run so that the later runs can use the committed offset by that run.

To give you a little bit more context, for every run, we create a unique
group.id so essentially, we want the offset to point to a safe position so
that consumer won't miss any messages appended after that point. So is
there a way other than setting "auto.offset.reset" to "smallest" which we
know it works, but it took forever to get the data (since the log is long).

Thanks again.

-Jack

On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Did you turn on automatic offset committing? If yes then this issue should
> not happen as later runs will just consume data from the last committed
> offset.
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 5:16 PM, Jack <jacklu@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > When I switched to auto.offset.reset to smallest, it will work. However,
> it
> > will generate a lot of data and it will slow down the verification.
> >
> > Thanks,
> >
> > -Jack
> >
> > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Jack,
> > >
> > > Could you just change "auto.offset.reset" to smallest and see if this
> > issue
> > > goes away? It is not related to the producer end.
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <jacklu@gmail.com> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks so much for replying, first of all.
> > > >
> > > > Here is the config we have:
> > > >
> > > > group.id -> 'some unique id'
> > > > zookeeper.connect -> 'zookeeper host'
> > > > auto.commit.enabled -> false
> > > > 'auto.offset.reset' -> largest
> > > > consumer.timeout.ms -> -1
> > > > fetch.message.max.bytes -> 10M
> > > >
> > > > So it seems like we need to make sure the submitted future returns
> > before
> > > > performing action actions which eventually generate the message we
> > > expect.
> > > >
> > > > Cheers,
> > > >
> > > > -Jack
> > > >
> > > >
> > > >
> > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > > > Jack,
> > > > >
> > > > > Your theory is correct if your consumer config set
> auto.offset.reset
> > to
> > > > > latest and you do not have any committed offsets before. Could you
> > list
> > > > > your consumer configs and see if that is the case?
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jacklu@gmail.com> wrote:
> > > > >
> > > > > > Hi folks,
> > > > > >
> > > > > > I have a quick question.
> > > > > >
> > > > > > We are using 0.8.1 and running into this weird problem. We are
> > using
> > > > > > HighLevelConsumer for this topic. We created 64 partitions for
> this
> > > > > > message.
> > > > > >
> > > > > > In our service, we first create a Consumer object as usual,
and
> > then
> > > we
> > > > > > went ahead, calls 'createMessageStreans' with
> > Map('topic_name'->64).
> > > It
> > > > > > returns us a Se[KafkaStream], For each stream object in the
> > sequence,
> > > > we
> > > > > > submit a task like the following to the pool.
> > > > > >
> > > > > > threadpool.submit(new Runnable {
> > > > > >    override def run() = {
> > > > > >       stream.iterator().foreach { msg => ...}
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > > The problem we ran into is that after all the above established,
> > any
> > > > > > message showing up in kafka, we should be able to get it from
> > > consumer
> > > > > > side. But in reality, for some reason, occasionally, we don't
see
> > > these
> > > > > > message (we do see these message in the log though).
> > > > > >
> > > > > >  Some team members believe that the stream might get a later
> > offset,
> > > > thus
> > > > > > not being able to see the earlier messages.
> > > > > >
> > > > > > I really doubt that statement and want to see if anyone could
> shed
> > > any
> > > > > > light upon this?
> > > > > >
> > > > > > One possible theory from me is that the offset won't be given
> until
> > > > > > stream.iterator().next is called, but since the task submission
> is
> > > > > > asynchronous (we don't wait for each submission and then produce
> > > > message
> > > > > to
> > > > > > kafka), that could get us a later offset, which might not
> contains
> > > the
> > > > > > message we want). One possible solution to that is perform any
> > action
> > > > > which
> > > > > > produce messages to kafka, after all these submitted tasks
> returns.
> > > > > >
> > > > > > Any thoughts?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > -Jack
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message