samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roger Hoover <roger.hoo...@gmail.com>
Subject How to deal with scaling?
Date Wed, 24 Sep 2014 23:55:19 GMT
Hi all,

So it seems like one of the first decisions that you have to make when
creating a Samza job is how many partitions to have in your input topics.
This will dictate how many tasks are created and how many changelog
partitions get created.  It's great that you can independently change the
number of Samza containers that get deployed but what do you do once you
reach the max (# containers == # tasks)?

If the job's input topics are partitioned by key, then you cannot add more
partitions without corrupting existing state.  Does this come up for people
in practice?  How do you handle it?

Just trying to think it through, it seems like you need a procedure
something like this:

1) Create new topics to hold the same data but with more partitions
(inputs, outputs, and changelog topics)
2) Deploy jobs to repartition inputs and changelog topics into the new
topics
3) When caught up, stop the running job
4) Change job config to point to new topics and restart the job (if all
topics are new, this can be done while previous job run is still active
using new job.id)
5) Change downstream jobs to use new output topic if necessary.  Doing this
in a safe way might be hard.

Ideally at some point, this process could be automated.  I was wondering
whether a generic task could be written for step #2 but I think it would
require a couple of constraints:

1) All meaningfully-partitioned topics would need to include their keys in
the stream.  In Kafka, this is optional unless you enable compaction but
for this to work generically, it would have to be mandatory in Samza for
any stream for which partitions have meaning (not using random or
round-robin partitioning).
2) The partition keys should be re-hashable based on their raw byte
representation so that the repartition task would not have to know how to
deserialize the keys in order to compute their new partition.  At first
glance, this doesn't seem too onerous but I saw in the Config Stream
proposal (SAMZA-348) that keys might be JSON:

{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-that-is-really-long.1000"
}

This would be problematic as the order of the dictionary keys can change
but would still mean the same thing.  In order to use JSON as a serde for
keys, you'd need to enforce a sort order on dictionaries.

I'm curious what others do about this or what your thoughts are.  Thanks,

Roger

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message