kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mayuresh Gharat <gharatmayures...@gmail.com>
Subject Re: Problems getting offsets
Date Tue, 03 Nov 2015 01:53:43 GMT
Hi David,

My Bad. I did not understand your question correctly. Thanks Stevo for
detailed explanation.

Just incase, if you want checkout kafka-based offset management, this is a
very good presentation :
http://www.slideshare.net/jjkoshy/offset-management-in-kafka

Thanks,

Mayuresh

On Mon, Nov 2, 2015 at 12:51 AM, Stevo Slavić <sslavic@gmail.com> wrote:

> Here is a bit longer and more detailed, not necessarily better
> understandable explanation.
>
> When using Kafka for offsets storage, consumer offsets get stored in
> (compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every
> partition of consumer offsets topic could store offsets from multiple
> consumer groups, but offsets of a single consumer group will always be
> stored in (get mapped to) same partition of consumer offsets topic (e.g.
> consumer group x and y offsets could be both stored in partition 0, while
> consumer group z offsets could be stored in partition 49 of consumer
> offsets topic; even if consumer group x is consuming two different topics,
> offsets would be stored in same partition of consumer offsets topic). Like
> any other Kafka topic partition, one can read/write (consumer offsets)
> topic partitions only from their lead brokers, and every partition has only
> one lead broker. Lead broker of a partition where offsets are stored for
> given consumer group is called consumer coordinator broker for that
> consumer group. To fetch/commit offsets for consumer group, you first need
> to resolve consumer coordinator broker for that consumer group, and then
> send fetch/commit offsets to that broker (btw, it's dynamic, can change
> even after just being resolved which broker is coordinator broker for given
> consumer group, e.g. when initial lead broker is lost and a replica becomes
> a new lead). Until there is a leader assigned for a partition and broker
> who is leader is not yet aware of that assignment, topic partition cannot
> be read/written, it's offline - read/write requests made to that broker
> will return error code in response. Consumer offsets topic is not created
> automatically on broker/cluster startup - instead it gets created on first
> (direct or indirect) request for consumer offsets topic metadata... That
> lookup triggers creation of consumer offsets topic, parts of the topic
> creation process happen async to request for topic creation, and it can
> take time for topic creation to actually fully finish. Especially if you
> leave consumer offsets topic default settings (replication factor of 3, 50
> partitions) consumer offsets topic creation can take time - this is nice
> default for production cluster, but not for one used in integration tests
> with single broker.
>
> When you try to fetch (or commit) offset for the first time, broker
> internally retrieves metadata about consumer offsets topic, which initiates
> consumer offsets topic creation when it doesn't exist, but that first
> request to fetch offset (usually) fails, since chosen consumer coordinator
> broker is not yet aware that it is coordinator for that group, and sends
> back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException. Docs
> (see
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> ) state that "The broker returns this error code if it receives an offset
> fetch or commit request for a consumer group that it is not a coordinator
> for."
>
> You could modify your code to retry fetching offsets, but not infinitely,
> or you could trigger consumer offsets topic init before fetching offsets.
> For the init option you have (at least) two alternatives.
>
> Modify your test, before reading/committing offsets but not before creating
> your topic to init (trigger creation of) consumer offsets topic by looking
> up consumer offsets topic metadata, and add some delay to let consumer
> offsets topic be fully created (all of its default 50 partitions get leader
> assigned, broker aware, etc.).
>
> You could do this consumer offsets topic initialization before creating
> your topic, but then make sure that in broker configuration replication
> factor for consumer offsets topic is not higher than number of brokers (1
> in scenario your described) - otherwise consumer offsets topic creation
> will fail. It would fail since fresh broker in single broker cluster, if it
> receives a request for consumer offsets topic metadata, will fail to create
> consumer offsets topic, no matter how long delay after metadata lookup; it
> is not aware of how many live brokers there are in cluster and will just
> try to create consumer offsets topic with default replication factor of 3
> and that will fail, initial topic partitions replica assignment happens
> during topic creation and replication factor has to be <= than number of
> live brokers when topic is created. Consumer offsets topic creation does
> not fail (although it takes time to finish), if it is done after some other
> topic creation has been requested, because that other topic creation
> request makes broker aware (updates its cache) that it is a sole live
> broker in the cluster, and then consumer offsets topic creation will ignore
> requested/configured replication factor of 3 and will (silently) fallback
> and use replication factor of 1 (= number of live brokers in cluster).
>
> Maybe things would be cleaner if topic creation allowed non-live brokers to
> be used in replica assignment. Then not only would (consumer offsets) topic
> creation not fail if there are not enough of live brokers to meet
> configured replication factor, but even better Kafka cluster controller
> broker could do this initialization of consumer offsets topic if it doesn't
> already exist.
>
> Btw, looking up topic metadata in zookeeper and checking that all of its
> partitions have a leader assigned is not enough for topic partitions to be
> writable/readable/online - if broker is still not aware that it is the
> leader for a given partition (e.g. broker metadata cache is outdated), that
> partition would still not be writable/readable. One could lookup topic
> metadata from Kafka brokers, and more specifically from lead brokers of
> partitions to check if they are ware that they are lead brokers for that
> partition. Not even that is guarantee that publishing will be successful,
> since leadership can change at any moment, and other things can fail (e.g.
> unreliable network), so one has to handle errors. But, as you see problem
> is not with your topic, but with internal consumer offsets topic.
>
>
> On Mon, Nov 2, 2015 at 1:56 AM, Stevo Slavić <sslavic@gmail.com> wrote:
>
> > Hello David,
> >
> > In short, problem is not with your topic, it is with consumer offsets
> > topic initialization.
> >
> > You could modify your code to just retry fetching offsets (until
> > successful where successful is also return of -1 offset, or timeout), or
> > alternatively you could trigger consumer offsets topic init (by fetching
> > consumer offsets topic metadata from broker) before issuing offset fetch
> > request.
> >
> > For the init option you have (at least) two alternatives: do it after or
> > before request to create your topic.
> >
> > If you chose to do consumer offsets topic init before request to create
> > your (first) topic, make sure to configure broker with replication factor
> > of 1 for consumer offsets topic (offsets.topic.replication.factor config
> > property) otherwise consumer offsets topic init will fail. Do this config
> > change only for integration test with single broker, but not for
> production
> > cluster which should have 3 or more brokers anyway.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> >
> >
> > On Sun, Nov 1, 2015 at 9:38 PM, David Corbin <DCorbin@lancope.com>
> wrote:
> >
> >> Yes.  I know all of that.  I guess my original message was not clear.
> >>
> >> The topic metadata indicates that there is a leader.
> >>
> >> Please see my comments interspersed below.
> >> Thanks
> >> David Corbin
> >>
> >> On 10/29/15, 17:53, "Mayuresh Gharat" <gharatmayuresh15@gmail.com>
> wrote:
> >>
> >> > NotCoordinatorForConsumerException is for consumer side.
> >>
> >> I am attempting to consume messages, so this means I¹m getting an
> >> exception that might be reasonable for what I¹m doing.
> >>
> >> >I think if you
> >> >are using simpleConsumer, then you have to do your own offset
> management.
> >>
> >> I am attempting to manage my offsets myself; that¹s why I called
> >> SimpleConsumer#fetchOffsets() which is throwing the exception.  I assume
> >> you are not suggesting that my offset management should be done entirely
> >> ³outside the scope² of Kafka.  If that is true, why do 4 of 3 methods on
> >> SimpleConsumer refer to offsets?
> >>
> >>
> >> >
> >> >TopicMetadata tells you whether there is a leader for the topic
> >> >partitions,
> >> >replicas, ISR.
> >>
> >>
> >> Yes.  After I create the topic, I ³poll² the metadata until the one
> >> topic/partition has a leader, before moving on.  If the metadata says
> >> there is a leader, I don¹t understand whyI would get
> >> NotCoordinatorForConsumerException
> >>
> >> >
> >> >Thanks,
> >> >
> >> >Mayuresh
> >> >
> >> >On Wed, Oct 28, 2015 at 8:23 AM, David Corbin <DCorbin@lancope.com>
> >> wrote:
> >> >
> >> >> I'm working on a project that I hope to use the SimpleConsumer on.
> I'm
> >> >> trying to write some test code around our code that is wrapping the
> >> >> SimpleConsumer.
> >> >>
> >> >> The environment is 1 kafka broker, and 1 zookeeper
> >> >> The test goes like this:
> >> >>
> >> >> Create a topic ("foo", 1 partition, 1 replica)
> >> >> Create a Producer, and send a message
> >> >> Create a SimpleConsumer, and try to read the offset
> >> >>
> >> >> Failure with: NotCoordinatorForConsumerException
> >> >>
> >> >> If I continue to require for an extended period, I continue to get
> that
> >> >> exception.
> >> >>
> >> >> As near as I can tell, the topic metadata says there is a leader, but
> >> >>the
> >> >> the broker thinks it's being rebalanced.
> >> >> I've done the above test immediately after stop, clean out old data,
> >> and
> >> >> restart both zookeeper and kafka.
> >> >>
> >> >> Suggestions welcome.
> >> >>
> >> >>
> >> >
> >> >
> >> >--
> >> >-Regards,
> >> >Mayuresh R. Gharat
> >> >(862) 250-7125
> >>
> >>
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message