kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wesley Chow <...@chartbeat.com>
Subject Re: Hash partition of key with skew
Date Tue, 03 May 2016 15:51:08 GMT
I’m not the OP, but in our case, we sometimes want data locality. For example, suppose that
we have 100 consumers that are building up a cache of customer -> data mapping. If customer
data is spread randomly across all partitions then a query for that customer’s data would
have to hit all 100 consumers. If customer data exhibits some locality, then queries for that
data only hit a subset of consumers.

Wes


> On May 3, 2016, at 11:18 AM, Tauzell, Dave <Dave.Tauzell@surescripts.com> wrote:
> 
> Do you need the messages to be ordered in some way?   Why pass a key if you don't want
all the messages to go to one partition?
> 
> -Dave
> 
> Dave Tauzell | Senior Software Engineer | Surescripts
> O: 651.855.3042 | www.surescripts.com <http://www.surescripts.com/> |   Dave.Tauzell@surescripts.com
<mailto:Dave.Tauzell@surescripts.com>
> Connect with us: Twitter I LinkedIn I Facebook I YouTube
> 
> 
> -----Original Message-----
> From: Wesley Chow [mailto:wes@chartbeat.com <mailto:wes@chartbeat.com>]
> Sent: Tuesday, May 03, 2016 9:51 AM
> To: users@kafka.apache.org <mailto:users@kafka.apache.org>
> Subject: Re: Hash partition of key with skew
> 
> I’ve come up with a couple solutions since we too have a power law distribution. However,
we have not put anything into practice.
> 
> Fixed Slicing
> 
> One simple thing to do is to take each key and slice it into some fixed number of partitions.
So your function might be:
> 
> (hash(key) % num) + (hash(key) % 10)
> 
> In order to distribute it across 10 partitions. Or:
> 
> hash(key + ‘0’) % num
> hash(key + ‘1’) % num
> …
> hash(key + ‘9’) % num
> 
> 
> Hyperspace Hashing
> 
> If your data is multi-dimensional, then you might find hyperspace hashing useful. I’ll
give a simple example, but it’s easy to generalize. Suppose that you have two dimensions
you’d like to partition on: customer id (C) and city location (L). You’d like to be able
to subscribe to all data for some subset of customers, and you’d also like to be able to
subscribe to all data for some subset of locations. Suppose that this data goes into a topic
with 256 partitions.
> 
> For any piece of data, you’d construct the partition it goes to like so:
> 
> ((hash(C) % 16) << 4) + ((hash(L) % 16)
> 
> What that is basically saying is take C and map them to 16 different spots, and set it
as the high 4 bits of an 8 bit int. Then take the location, map it to 16 different spots,
and set it as the lower 4 bits of the int. The resulting number is the partition that piece
of data goes to.
> 
> Now if you want one particular C, you subscribe to the 16 partitions that contain that
C. If you want some particular L, you subscribe to the 16 partitions that contain that L.
> 
> You can extend this scheme to an arbitrary number of dimensions subject to the number
of partitions in the topic, and you can vary the number of bits that any particular dimension
takes. This scheme suffers from a combinatorial explosion of partitions if you really want
to query on lots of different dimensions, but you can see the Hyperdex paper for clues on
how to deal with this.
> 
> 
> Unbalanced Hashing
> 
> It’s easy to generate ok but not great hash functions. One is DJB hash, which relies
on two empirically determined constants:
> 
> http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
<http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function><http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
<http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function>>
> 
> (5381 and 33 in the above example)
> 
> If you can do offline analysis, and your distribution doesn’t change over time, then
you can basically exhaustively search for two values that produce a hash function that better
distributes the load.
> 
> 
> Greedy Knapsack
> 
> But if you’re ok doing offline analysis and generating your own hash function, then
you can create one that’s simply a hard coded list of mappings for the heaviest keys, and
then defaults to a regular hash for the rest. The easiest way to programmatically do this
is to use a greedy algorithm:
> 
>  for each heavy key, k:
>    assign k to the partition with the least assigned weight
> 
> 
> The advantage to fixed slicing and hyperspace hashing is that you don’t have to know
your distribution a priori, and it generally scales well as you increase the number of keys.
The disadvantage is that one key’s data is split across multiple partitions.
> 
> The advantage to unbalanced hashing and greedy knapsack is that you can get close to
an optimal partitioning scheme and all of one key resides in one partition. The downside is
that you need to do partition mapping management as your distribution changes over time.
> 
> Hopefully that gives you some ideas!
> 
> Wes
> 
> 
> 
>> On May 3, 2016, at 9:09 AM, Jens Rantil <jens.rantil@tink.se> wrote:
>> 
>> Hi,
>> 
>> Not sure if this helps, but the way Loggly seem to do it is to have a
>> separate topic for "noisy neighbors". See [1].
>> 
>> [1]
>> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreakable-
>> messaging-better-log-management/
>> 
>> Cheers,
>> Jens
>> 
>> On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth.ht@gmail.com> wrote:
>> 
>>> Hello,
>>> 
>>> Is there a recommendation for handling producer side partitioning
>>> based on a key with skew?
>>> We want to partition on something like clientId. Problem is, this key
>>> has an uniform distribution.
>>> Its equally likely to see a key with 3k occurrence/day vs 100k/day vs
>>> 65million/day.
>>> Cardinality of key is around 1500 and there are approx 1 billion
>>> records per day.
>>> Partitioning by hashcode(key)%numOfPartition will create a few "hot
>>> partitions" and cause a few brokers(and consumer threads) to be overloaded.
>>> May be these partitions with heavy load are evenly distributed among
>>> brokers, may be they are not.
>>> 
>>> I read KIP-22
>>> <
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expose+a+P
>>> artitioner+interface+in+the+new+producer
>>>> 
>>> that
>>> explains how one could write a custom partitioner.
>>> I'd like to know how it was used to solve such data skew.
>>> We can compute some statistics on key distribution offline and use it
>>> in the partitioner.
>>> Is that a good idea? Or is it way too much logic for a partitioner?
>>> Anything else to consider?
>>> Any thoughts or reference will be helpful.
>>> 
>>> Thanks,
>>> Srikanth
>>> 
>> --
>> 
>> Jens Rantil
>> Backend Developer @ Tink
>> 
>> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden For urgent matters
>> you can reach me at +46-708-84 18 32.
> 
> This e-mail and any files transmitted with it are confidential, may contain sensitive
information, and are intended solely for the use of the individual or entity to whom they
are addressed. If you have received this e-mail in error, please notify the sender by reply
e-mail immediately and destroy all copies of the e-mail and any attachments.


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message