kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Compton <daniel.compton.li...@gmail.com>
Subject Re: New and old producers partition messages differently
Date Sun, 26 Apr 2015 08:54:39 GMT
I would support a configuration flag to be added in the short term, say
until 0.9. In the long term, hashcode may change out from underneath people
anyway, so delaying moving to Murmur for too long is likely to still end up
in pain.

Leaving that configuration around long term increases code and test surface
area which isn't great, although I can certainly see that it could be
necessary for some orgs.

It would probably also be worth documenting the scenarios under which the
old default partitioner may change how it partitions items, so people are
aware of its limitations.
On Sun, 26 Apr 2015 at 5:22 pm Gwen Shapira <gshapira@cloudera.com> wrote:

> Ouch. That can be a painful discovery after a client upgrade. It can
> break a lot of app code.
>
> I can see the reason for custom hash algorithm (lots of db products do
> this, usually for stability, but sometimes for other hash properties
> (Oracle has some cool guarantees around modifying number of partitions
> and data movement)).
>
> I'm wondering if, in the interest of painless upgrades, we should add
> a configuration flag for topics - old.hash.algorithm that will keep
> existing behavior. Sounds like a rather ugly hack (and things can
> still break in new versions of Java), but I can't see a better
> alternative at the moment.
>
> Gwen
>
> On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jcheng@tivo.com> wrote:
> > Hi,
> >
> > I was playing with the new producer in 0.8.2.1 using partition keys
> ("semantic partitioning" I believe is the phrase?). I noticed that the
> default partitioner in 0.8.2.1 does not partition items the same way as the
> old 0.8.1.1 default partitioner was doing. For a test item, the old
> producer was sending it to partition 0, whereas the new producer was
> sending it to partition 4.
> >
> > Digging in the code, it appears that the partitioning logic is different
> between the old and new producers. Both of them hash the key, but they use
> different hashing algorithms.
> >
> > Old partitioner:
> > ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
> >
> >   def partition(key: Any, numPartitions: Int): Int = {
> >     Utils.abs(key.hashCode) % numPartitions
> >   }
> >
> > New partitioner:
> >
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
> >
> >         } else {
> >             // hash the key to choose a partition
> >             return Utils.abs(Utils.murmur2(record.key())) %
> numPartitions;
> >         }
> >
> > Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
> isn't the same logic as hashCode, especially since hashCode is
> overrideable).
> >
> > Was it intentional that the hashing algorithm would change between the
> old and new producer? If so, was this documented? I don't know if anyone
> was relying on the old default partitioner, as opposed to going round-robin
> or using their own custom partitioner. Do you expect it to change in the
> future? I'm guessing that one of the main reasons to have a custom hashing
> algorithm is so that you are full control of the partitioning and can keep
> it stable (as opposed to being reliant on hashCode()).
> >
> > Thanks,
> > -James
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message