samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <>
Subject Re: Map-side join/broadcast streams
Date Mon, 15 Dec 2014 20:59:08 GMT
Hey Roger,

I believe that the OutgoingMessageEnvelope API allows you to specify a
partition key independent of the key for the message.

  public OutgoingMessageEnvelope(SystemStream systemStream, String
keySerializerName, String messageSerializerName, Object partitionKey,
Object key, Object message)

This API can be used to directly specify a partition. For example, if you
want to force the message to go to partition 7, you'd specify partitionKey
as 7. Kafka's default partitioner does a hash/mod:

This is a noop for your use case, and should then pass directly down to
the proper partition. This should allow you to forcibly set an partition,
while still keeping the message's key separately.


On 12/15/14 10:17 AM, "Roger Hoover" <> wrote:

>Hi all,
>I appreciate any advice on how best to do this:
>I have a very small dimension table that I want to join with a
>event stream.  Partitioning the event stream for the join seems overly
>complicated for this use case.  What I really want is for all tasks of my
>stream join job to consume all partitions of the dimension stream.
>I see that there's an issue open for this and that it's not supported yet:
>As a work around, I was thinking of creating another topic for the
>dimension stream with an equal number of partitions as the event stream
>having each partition of that stream contain a full copy of the dimension
>table.  To do this, I think I need a job to copy (fan out) messages from
>the single upstream partition of the dimension stream to all partitions of
>this new topic.
>The issue is that the Samza API for OutgoingMessageEnvelope doesn't let me
>specify a partition id for an outbound message, only a partition key.
>What's the best way to ensure that a given message goes to a particular
>partition id?
>1) Take advantage of the Kafka default partitioner using the key object's
>hashCode and make the key object hash to the partition id.  Is this too
>brittle in relying on the DefaultPartitioner implementation of the old
>Kafka producer API?
>2) Create a custom Kafka partitioner and enable it using the
>"partitioner.class" setting?
>3) Is there another way?

View raw message