kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Evan Huus <evan.h...@shopify.com>
Subject Re: Topicmetadata response miss some partitions information sometimes
Date Mon, 02 Mar 2015 02:42:20 GMT
My concern is more with the partitioner that determines the partition of
the message. IIRC, it does something like "hash(key) mod #partitions" in
the normal case, which means if the # of partitions changes because some of
them are offline, then certain messages will be sent to the wrong (online)
partition, no?

Evan

On Sun, Mar 1, 2015 at 9:36 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Evan,
>
> In the java producer, partition id of the message is determined in the
> send() call and then the data is appended to the corresponding batch buffer
> (one buffer for each partition), i.e. the partition id will never change
> once it is decided. If the partition becomes offline after this, the send
> call will fail and then retry. In the end the message will either exhaust
> all retries and be dropped on the floor or the partition becomes online
> again and the metadata gets refreshed, and message send retry successfully.
> Hence, if some of your partitions becomes offline for too long some data
> will be lost on the producer side (unless you set infinite retry, of
> course).
>
> Guozhang
>
> On Sun, Mar 1, 2015 at 5:25 AM, Evan Huus <evan.huus@shopify.com> wrote:
>
> > On Sun, Mar 1, 2015 at 1:46 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Hi Honghai,
> > >
> > > 1. If a partition has no leader (i.e. all of its replicas are down) it
> > will
> > > become offline, and hence the metadata response will not have this
> > > partition's info.
> > >
> >
> > If I am understanding this correctly, then this is a problem (in certain
> > cases) for the producer because of an ambiguity.
> >
> > If a producer using hash-partitioning receives partitions 1-4 and begins
> > producing, then updates its metadata and receives only partitions 1-2,
> > there are two possible scenarios it cannot distinguish between:
> >
> > 1. The user reduced the number of partitions for this topic. The producer
> > should continue producing and distribute all new messages between the two
> > remaining partitions.
> > 2. Two of the four partitions are entirely offline. The producer should
> > continue to distribute messages among all four partitions (to maintain
> the
> > consistency of the hashing) but two of the four partitions will simply
> > fail.
> >
> > Whichever behaviour the producer chooses, in the other scenario it will
> > incorrectly distribute messages among the partitions, thus breaking the
> > hash-partitioner guarantee.
> >
> > If all of the replicas are down for a partition, why do metadata requests
> > not simply return that partition with LeaderNotAvailable?
> >
> > Thanks,
> > Evan
> >
> >
> > > 2. Any of the brokers cache metadata and hence can handle the metadata
> > > request. It's just that their cache are updated asynchronously and
> hence
> > > when there is a update to the metadata, some brokers may got the new
> > > metadata value a bit eariler than others.
> > >
> > > On Thu, Feb 26, 2015 at 7:21 PM, ChenHongHai <
> > waldenchenkafka@outlook.com>
> > > wrote:
> > >
> > > > We have one topic with 4 partitions, but sometimes only get metadata
> > of 2
> > > > partitions, did anyone meet this kind of situation before?If some
> > > partition
> > > > has no leader at that moment, will it cause this problem?     How to
> > make
> > > > some partition has no leader?If 6 brokers has some partitions of the
> > > topic,
> > > > will they return same result?  Do I need try all of them and merge
> the
> > > > result?
> > > >
> > > >      SimpleConsumer consumer = consumerPool.getConsumer(seed.host,
> > > > seed.port, connectionTimeOut, consumerBufferSize,
> > > "refreshPartitionMeta");
> > > >               List<String> topics = new ArrayList<String>()
{{
> > > >       add(topic);                }};
> > TopicMetadataResponse
> > > > resp = consumer.send(new TopicMetadataRequest(topics));
> > > > List<TopicMetadata> metaData = resp.topicsMetadata();
> > > >                 for (TopicMetadata item : metaData) {
> > > > if(item.errorCode() != kafka.common.ErrorMapping.NoError())
> > > >         LOG.error(String.format("Something wrong with topic metadata
> > for
> > > > topic: %s error code: %d ", item.topic(), item.errorCode() ));
> > > >       for (PartitionMetadata part : item.partitionsMetadata()) {
> > > >             partitionMeta.put(part.partitionId(), part);
> > > >                      }                }
> > > >
> > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message