kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stevo Slavić <ssla...@gmail.com>
Subject Re: New consumer - poll/seek javadoc confusing, need clarification
Date Thu, 23 Jul 2015 09:03:25 GMT
Thanks Ewen for heads up.

It's great that seek is not needed in between poll when business goes as
usual.

In edge case, when my logic detects it needs to go back and reread events
from given position in history, I use seek. I found out that next poll
after seek will not respect offset used in seek. It is strange that event
Consumer.position returns same offset that seek has set for the consumer
instance, but poll still does not return messages starting from that offset.

E.g. say there are 5 messages published to a single partition of a single
topic. Consumer subscribes to that topic partition, with smallest/earliest
offset reset strategy configured, and consumer.position confirms that the
consumer is at position 0.
Then poll is issued and it returns all 5 messages. Logic processes
messages, detects it needs to go back in history to position 0, it does not
commit messages but calls seek to 0, position confirms consumer is at
offset 0. Next poll does not return any messages.

So seek is not really working what it should do, according to its javadoc:

/**
 * Overrides the fetch offsets that the consumer will use on the next
{@link #poll(long) poll(timeout)}. If this API
 * is invoked for the same partition more than once, the latest offset will
be used on the next poll(). Note that
 * you may lose data if this API is arbitrarily used in the middle of
consumption, to reset the fetch offsets
 */

I've checked also, calling seek multiple times does not help to get poll to
use offset set with last seek.
Could be something is wrong with poll implementation, making it not respect
offset set with seek.

Kind regards,
Stevo Slavic.


On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava <ewen@confluent.io>
wrote:

> It should just continue consuming using the existing offsets. It'll have to
> refresh metadata to pick up the leadership change, but once it does it can
> just pick up where consumption from the previous leader stopped -- all the
> ISRs should have the same data, so the new leader will have all the same
> data the previous leader had (assuming unclean leader election is not
> enabled).
>
> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jcheng@tivo.com> wrote:
>
> >
> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <ewen@confluent.io>
> > wrote:
> > >
> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <sslavic@gmail.com>
> wrote:
> > >
> > >> Hello Apache Kafka community,
> > >>
> > >> I find new consumer poll/seek javadoc a bit confusing. Just by reading
> > docs
> > >> I'm not sure what the outcome will be, what is expected in following
> > >> scenario:
> > >>
> > >> - kafkaConsumer is instantiated with auto-commit off
> > >> - kafkaConsumer.subscribe(someTopic)
> > >> - kafkaConsumer.position is called for every TopicPartition HLC is
> > actively
> > >> subscribed on
> > >>
> > >> and then when doing multiple poll calls in succession (without calling
> > >> commit), does seek have to be called in between poll calls to position
> > HLC
> > >> to skip what was read in previous poll, or does HLC keep that state
> > >> (position after poll) in memory, so that next poll (without seek in
> > between
> > >> two poll calls) will continue from where last poll stopped?
> > >>
> > >
> > > The position is tracked in-memory within the consumer, so as long as
> > there
> > > isn't a consumer rebalance, consumption will just proceed with
> subsequent
> > > messages (i.e. the behavior I think most people would find intuitive).
> > > However, if a rebalance occurs (another consumer instance joins the
> group
> > > or some leave), then a partition may be assigned to an different
> consumer
> > > instance that has no idea about the current position and will restart
> > based
> > > on the offset reset setting (because attempting to fetch the committed
> > > offset will fail since no offsets have been committed).
> > >
> >
> > Ewen,
> >
> > What happens if there is a broker failure and a new broker becomes the
> > partition leader? Does the high level consumer start listening to the new
> > partition leader at the in-memory position, or does it restart based on
> > saved offsets?
> >
> > Thanks,
> > -James
> >
> > > -Ewen
> > >
> > >
> > >> Could be it's just me not understanding this from javadoc. If not,
> maybe
> > >> javadoc can be improved to make this (even) more obvious.
> > >>
> > >> Kind regards,
> > >> Stevo Slavic.
> > >>
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> >
> >
>
>
> --
> Thanks,
> Ewen
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message