kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jack <jac...@gmail.com>
Subject Re: Kafka question
Date Tue, 07 Apr 2015 00:16:00 GMT
Hi Guozhang,

When I switched to auto.offset.reset to smallest, it will work. However, it
will generate a lot of data and it will slow down the verification.

Thanks,

-Jack

On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Jack,
>
> Could you just change "auto.offset.reset" to smallest and see if this issue
> goes away? It is not related to the producer end.
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 4:14 PM, Jack <jacklu@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > Thanks so much for replying, first of all.
> >
> > Here is the config we have:
> >
> > group.id -> 'some unique id'
> > zookeeper.connect -> 'zookeeper host'
> > auto.commit.enabled -> false
> > 'auto.offset.reset' -> largest
> > consumer.timeout.ms -> -1
> > fetch.message.max.bytes -> 10M
> >
> > So it seems like we need to make sure the submitted future returns before
> > performing action actions which eventually generate the message we
> expect.
> >
> > Cheers,
> >
> > -Jack
> >
> >
> >
> > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Jack,
> > >
> > > Your theory is correct if your consumer config set auto.offset.reset to
> > > latest and you do not have any committed offsets before. Could you list
> > > your consumer configs and see if that is the case?
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jacklu@gmail.com> wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I have a quick question.
> > > >
> > > > We are using 0.8.1 and running into this weird problem. We are using
> > > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > > message.
> > > >
> > > > In our service, we first create a Consumer object as usual, and then
> we
> > > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64).
> It
> > > > returns us a Se[KafkaStream], For each stream object in the sequence,
> > we
> > > > submit a task like the following to the pool.
> > > >
> > > > threadpool.submit(new Runnable {
> > > >    override def run() = {
> > > >       stream.iterator().foreach { msg => ...}
> > > >   }
> > > > }
> > > >
> > > > The problem we ran into is that after all the above established, any
> > > > message showing up in kafka, we should be able to get it from
> consumer
> > > > side. But in reality, for some reason, occasionally, we don't see
> these
> > > > message (we do see these message in the log though).
> > > >
> > > >  Some team members believe that the stream might get a later offset,
> > thus
> > > > not being able to see the earlier messages.
> > > >
> > > > I really doubt that statement and want to see if anyone could shed
> any
> > > > light upon this?
> > > >
> > > > One possible theory from me is that the offset won't be given until
> > > > stream.iterator().next is called, but since the task submission is
> > > > asynchronous (we don't wait for each submission and then produce
> > message
> > > to
> > > > kafka), that could get us a later offset, which might not contains
> the
> > > > message we want). One possible solution to that is perform any action
> > > which
> > > > produce messages to kafka, after all these submitted tasks returns.
> > > >
> > > > Any thoughts?
> > > >
> > > > Thanks,
> > > >
> > > > -Jack
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message