samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@linkedin.com.INVALID>
Subject Re: How to deal with scaling?
Date Fri, 26 Sep 2014 16:10:20 GMT
Hey Roger,

> If the job's input topics are partitioned by key, then you cannot add
>more partitions without corrupting existing state.

This is correct.

> Does this come up for people in practice?

It does come up occasionally for us. Thus far, we usually just run a Kafka
topic-partition expansion (thereby trashing the semantics of the
partitioning) and restart the job. Inconsistent output is then emitted for
a while. We do this only when we agree that inconsistent output is
tolerable.

Another thing we do for this is over-partition our Kafka topics when we're
concerned about growth.

Both of these solutions are admittedly hacky. As you said, the ideal
solution would be some kind of automatic migration. It seems possible that
the AM (job coordinator) might be able to manage this, especially of we
had a pre-packaged "repartition job" that it could trigger. I haven't
thought about this in detail, though.

> Deploy jobs to repartition inputs and changelog topics into the new
>topics

The changelog topic seems problematic to me. It seems that they key used
in the changelog might not always be directly related to the partitioning
of the input topic. For example,  if you have a StreamTask that is
consuming a single input partition, and keeping a count in the state store
of all messages that it sees, how do you repartition this changelog? In
the new world, the keys for the single partition that it's consuming could
be spread across many different partitions, and the count is pretty much
meaningless, since it can't be split up by key.

It almost feels like state has to be totally reset to safely do an input
partition expansion under all cases. In a sense, you have to treat the new
job as a job that's completely new, and start it from scratch.

> Change job config to point to new topics and restart the job

One problem with this is going to be the case where you don't control the
producers for the old input topic. They'd either have to be migrated to
produce to the new input topic for your job, or you'd have to permanently
run the repartition job to move data from the original topic to the
currently expanded topic. Keeping the repartition job is not all that wild
of an idea. Most Samza topologies we run have some form of a repartition
job that runs permanently at the beginning of their flow.

> All meaningfully-partitioned topics would need to include their keys in
>the stream

True. Somewhat tangential to this is the case where the key that's been
used is not the one your job wishes to partition by. In this case, a
repartition job would be required as well.

> This would be problematic as the order of the dictionary keys can change
>but would still mean the same thing.  In order to use JSON as a serde for
>keys, you'd need to enforce a sort order on dictionaries.

I struggled with this as well. We basically need a forced ordering for the
JSON keys in SAMZA-348. Originally, I was thinking of making the key/value
messages just a simple string with a delimiter. Something like
<type>:<key> for the key and <host>:<source>:<blah> for the
value. This
approach is also much more compact than JSON. The problem with the latter
approach is that it doesn't easily allow for hierarchical key/value pairs.

Cheers,
Chris

On 9/24/14 4:55 PM, "Roger Hoover" <roger.hoover@gmail.com> wrote:

>Hi all,
>
>So it seems like one of the first decisions that you have to make when
>creating a Samza job is how many partitions to have in your input topics.
>This will dictate how many tasks are created and how many changelog
>partitions get created.  It's great that you can independently change the
>number of Samza containers that get deployed but what do you do once you
>reach the max (# containers == # tasks)?
>
>If the job's input topics are partitioned by key, then you cannot add more
>partitions without corrupting existing state.  Does this come up for
>people
>in practice?  How do you handle it?
>
>Just trying to think it through, it seems like you need a procedure
>something like this:
>
>1) Create new topics to hold the same data but with more partitions
>(inputs, outputs, and changelog topics)
>2) Deploy jobs to repartition inputs and changelog topics into the new
>topics
>3) When caught up, stop the running job
>4) Change job config to point to new topics and restart the job (if all
>topics are new, this can be done while previous job run is still active
>using new job.id)
>5) Change downstream jobs to use new output topic if necessary.  Doing
>this
>in a safe way might be hard.
>
>Ideally at some point, this process could be automated.  I was wondering
>whether a generic task could be written for step #2 but I think it would
>require a couple of constraints:
>
>1) All meaningfully-partitioned topics would need to include their keys in
>the stream.  In Kafka, this is optional unless you enable compaction but
>for this to work generically, it would have to be mandatory in Samza for
>any stream for which partitions have meaning (not using random or
>round-robin partitioning).
>2) The partition keys should be re-hashable based on their raw byte
>representation so that the repartition task would not have to know how to
>deserialize the keys in order to compute their new partition.  At first
>glance, this doesn't seem too onerous but I saw in the Config Stream
>proposal (SAMZA-348) that keys might be JSON:
>
>{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-tha
>t-is-really-long.1000"
>}
>
>This would be problematic as the order of the dictionary keys can change
>but would still mean the same thing.  In order to use JSON as a serde for
>keys, you'd need to enforce a sort order on dictionaries.
>
>I'm curious what others do about this or what your thoughts are.  Thanks,
>
>Roger


Mime
View raw message