kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Evan Huus <evan.h...@shopify.com>
Subject Re: New and old producers partition messages differently
Date Sun, 26 Apr 2015 15:51:05 GMT
Related to this topic: why the choice of murmur2 over murmur3? I'm not
super-familiar with the differences between the two, but I'd assume murmur3
would be faster or have a more even distribution or something.

Evan

P.S. Also, there appear to be many murmur3 implementations for other
languages, whereas murmur2 is much less common.

On Sun, Apr 26, 2015 at 10:57 AM, Jay Kreps <jay.kreps@gmail.com> wrote:

> This was actually intentional.
>
> The problem with relying on hashCode is that
> (1) it is often a very bad hash function,
> (2) it is not guaranteed to be consistent from run to run (i.e. if you
> restart the jvm the value of hashing the same key can change!),
> (3) it is not available outside the jvm so non-java producers can't use the
> same function.
>
> In general at the moment different producers don't use the same hash code
> so I think this is not quite as bad as it sounds. Though it would be good
> to standardize things.
>
> I think the most obvious thing we could do here would be to do a much
> better job of advertising this in the docs, though, so people don't get
> bitten by it.
>
> -Jay
>
> On Fri, Apr 24, 2015 at 5:48 PM, James Cheng <jcheng@tivo.com> wrote:
>
> > Hi,
> >
> > I was playing with the new producer in 0.8.2.1 using partition keys
> > ("semantic partitioning" I believe is the phrase?). I noticed that the
> > default partitioner in 0.8.2.1 does not partition items the same way as
> the
> > old 0.8.1.1 default partitioner was doing. For a test item, the old
> > producer was sending it to partition 0, whereas the new producer was
> > sending it to partition 4.
> >
> > Digging in the code, it appears that the partitioning logic is different
> > between the old and new producers. Both of them hash the key, but they
> use
> > different hashing algorithms.
> >
> > Old partitioner:
> > ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
> >
> >   def partition(key: Any, numPartitions: Int): Int = {
> >     Utils.abs(key.hashCode) % numPartitions
> >   }
> >
> > New partitioner:
> >
> >
> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
> >
> >         } else {
> >             // hash the key to choose a partition
> >             return Utils.abs(Utils.murmur2(record.key())) %
> numPartitions;
> >         }
> >
> > Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
> > isn't the same logic as hashCode, especially since hashCode is
> > overrideable).
> >
> > Was it intentional that the hashing algorithm would change between the
> old
> > and new producer? If so, was this documented? I don't know if anyone was
> > relying on the old default partitioner, as opposed to going round-robin
> or
> > using their own custom partitioner. Do you expect it to change in the
> > future? I'm guessing that one of the main reasons to have a custom
> hashing
> > algorithm is so that you are full control of the partitioning and can
> keep
> > it stable (as opposed to being reliant on hashCode()).
> >
> > Thanks,
> > -James
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message