samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roger Hoover <roger.hoo...@gmail.com>
Subject Re: How to deal with scaling?
Date Mon, 29 Sep 2014 20:35:24 GMT
Very helpful.  Thanks for sharing, Chris.

On Mon, Sep 29, 2014 at 1:08 PM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey Roger,
>
> Sorry for the late reply. Trying to load balance across multiple
> obligations.
>
> > Just to be more explicit about  "starting it from scratch".  The only
> >way to do this theoretically correctly, I think, would be to have the
> >newly partitioned job start with no state and playback it's input topics
> >from the beginning of time.
>
> Correct. Though, it's worth considering what "beginning of time" means.
> When you start any Samza job for the first time, you don't generally play
> it all data since the beginning of time. It usually just picks up from
> time T, and moves forward from there. In this case, I think the time T
> that you'd want to pick up from would probably depend on the logic of the
> job. It could be "now", "last checkpoint of the old job", "a week ago",
> etc.
>
> > This brings me to another question about deployment.  Do you recommend
> >having two separate Kafka clusters?  In the "public" cluster, brokers
> >would be deployed on machines by themselves. Then you have another Kafka
> >cluster for Samza in which the brokers are co-located with YARN
> >NodeManagers on each machine.  With this approach, Samza topologies would
> >consume from and ultimately publish to topics on the "public" cluster.
> >All of the internal topics like repartitioning, changelog, etc. would be
> >hidden away in the Kafka cluster dedicated to Samza.
>
> As Jakob said, this is how we've been running over the past couple of
> years. We use MirrorMaker to pull data back and forth between the two
> clusters (be careful for cycles, though). Recently, we moved the NMs off
> of the Kafka broker boxes for the Samza grid. The main reason for this was
> that the stateful Samza jobs were using page cache as a side-effect of
> interacting with the LevelDB state. This caused a degradation in
> performance for the Kafka brokers that were running along side the Samza
> jobs. Kafka is very page cache sensitive, since it uses it as an in-memory
> buffer for the most recent N minutes of messages. By pulling the jobs off
> into their own boxes, we were able to solve some performance issues that
> we were seeing with the Kafka brokers.
>
> The locality gains that we were seeing by running both jobs and brokers
> together was never measured. I believe it's probably pretty negligible
> (and degrades as the cluster size increases). Thoughts on locality are
> here:
>
> https://issues.apache.org/jira/browse/SAMZA-335
>
>
> Kafka's partitioning model does not lend itself all that well to locality
> optimization (vs. a block store like HDFS).
>
> Anyway, food for thought.
>
> Cheers,
> Chris
>
> On 9/26/14 11:20 AM, "Roger Hoover" <roger.hoover@gmail.com> wrote:
>
> >Chris,
> >
> >Thanks for the great answers.  It's helping me clear up my thinking...
> >
> >On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini <
> >criccomini@linkedin.com.invalid> wrote:
> >
> >> Hey Roger,
> >>
> >> > If the job's input topics are partitioned by key, then you cannot add
> >> >more partitions without corrupting existing state.
> >>
> >> This is correct.
> >>
> >> > Does this come up for people in practice?
> >>
> >> It does come up occasionally for us. Thus far, we usually just run a
> >>Kafka
> >> topic-partition expansion (thereby trashing the semantics of the
> >> partitioning) and restart the job. Inconsistent output is then emitted
> >>for
> >> a while. We do this only when we agree that inconsistent output is
> >> tolerable.
> >>
> >
> >Thanks.  This might be a reasonable in many cases (not sure yet).
> >
> >
> >>
> >> Another thing we do for this is over-partition our Kafka topics when
> >>we're
> >> concerned about growth.
> >>
> >> Both of these solutions are admittedly hacky. As you said, the ideal
> >> solution would be some kind of automatic migration. It seems possible
> >>that
> >> the AM (job coordinator) might be able to manage this, especially of we
> >> had a pre-packaged "repartition job" that it could trigger. I haven't
> >> thought about this in detail, though.
> >>
> >> > Deploy jobs to repartition inputs and changelog topics into the new
> >> >topics
> >>
> >> The changelog topic seems problematic to me. It seems that they key used
> >> in the changelog might not always be directly related to the
> >>partitioning
> >> of the input topic. For example,  if you have a StreamTask that is
> >> consuming a single input partition, and keeping a count in the state
> >>store
> >> of all messages that it sees, how do you repartition this changelog? In
> >> the new world, the keys for the single partition that it's consuming
> >>could
> >> be spread across many different partitions, and the count is pretty much
> >> meaningless, since it can't be split up by key.
> >>
> >> It almost feels like state has to be totally reset to safely do an input
> >> partition expansion under all cases. In a sense, you have to treat the
> >>new
> >> job as a job that's completely new, and start it from scratch.
> >>
> >
> >Ah, you're right.  I think there's no way to migrate state in general.  If
> >a job is saving any kind of aggregate state then that's an irreversible
> >operation that was done on the old partition.  There's not enough
> >information to "repartition" the results.
> >
> >Just to be more explicit about  "starting it from scratch".  The only way
> >to do this theoretically correctly, I think, would be to have the newly
> >partitioned job start with no state and playback it's input topics from
> >the
> >beginning of time.
> >
> >
> >
> >>
> >> > Change job config to point to new topics and restart the job
> >>
> >> One problem with this is going to be the case where you don't control
> >>the
> >> producers for the old input topic. They'd either have to be migrated to
> >> produce to the new input topic for your job, or you'd have to
> >>permanently
> >> run the repartition job to move data from the original topic to the
> >> currently expanded topic. Keeping the repartition job is not all that
> >>wild
> >> of an idea. Most Samza topologies we run have some form of a repartition
> >> job that runs permanently at the beginning of their flow.
> >>
> >
> >I was thinking about repartitioning as a good design pattern as well.
> >Having your job always repartition the input decouples it from the it's
> >upstream topic dependencies.  This brings me to another question about
> >deployment.  Do you recommend having two separate Kafka clusters?  In the
> >"public" cluster, brokers would be deployed on machines by themselves.
> >Then you have another Kafka cluster for Samza in which the brokers are
> >co-located with YARN NodeManagers on each machine.  With this approach,
> >Samza topologies would consume from and ultimately publish to topics on
> >the
> >"public" cluster.  All of the internal topics like repartitioning,
> >changelog, etc. would be hidden away in the Kafka cluster dedicated to
> >Samza.
> >
> >
> >>
> >> > All meaningfully-partitioned topics would need to include their keys
> >>in
> >> >the stream
> >>
> >> True. Somewhat tangential to this is the case where the key that's been
> >> used is not the one your job wishes to partition by. In this case, a
> >> repartition job would be required as well.
> >>
> >> > This would be problematic as the order of the dictionary keys can
> >>change
> >> >but would still mean the same thing.  In order to use JSON as a serde
> >>for
> >> >keys, you'd need to enforce a sort order on dictionaries.
> >>
> >> I struggled with this as well. We basically need a forced ordering for
> >>the
> >> JSON keys in SAMZA-348. Originally, I was thinking of making the
> >>key/value
> >> messages just a simple string with a delimiter. Something like
> >> <type>:<key> for the key and <host>:<source>:<blah>
for the value. This
> >> approach is also much more compact than JSON. The problem with the
> >>latter
> >> approach is that it doesn't easily allow for hierarchical key/value
> >>pairs.
> >>
> >
> >I've been constructing string keys in my jobs so far as you mentioned but
> >it adds extra boilerplate to the code.  It would be nice if there were an
> >automatic way to do it.
> >
> >
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 9/24/14 4:55 PM, "Roger Hoover" <roger.hoover@gmail.com> wrote:
> >>
> >> >Hi all,
> >> >
> >> >So it seems like one of the first decisions that you have to make when
> >> >creating a Samza job is how many partitions to have in your input
> >>topics.
> >> >This will dictate how many tasks are created and how many changelog
> >> >partitions get created.  It's great that you can independently change
> >>the
> >> >number of Samza containers that get deployed but what do you do once
> >>you
> >> >reach the max (# containers == # tasks)?
> >> >
> >> >If the job's input topics are partitioned by key, then you cannot add
> >>more
> >> >partitions without corrupting existing state.  Does this come up for
> >> >people
> >> >in practice?  How do you handle it?
> >> >
> >> >Just trying to think it through, it seems like you need a procedure
> >> >something like this:
> >> >
> >> >1) Create new topics to hold the same data but with more partitions
> >> >(inputs, outputs, and changelog topics)
> >> >2) Deploy jobs to repartition inputs and changelog topics into the new
> >> >topics
> >> >3) When caught up, stop the running job
> >> >4) Change job config to point to new topics and restart the job (if all
> >> >topics are new, this can be done while previous job run is still active
> >> >using new job.id)
> >> >5) Change downstream jobs to use new output topic if necessary.  Doing
> >> >this
> >> >in a safe way might be hard.
> >> >
> >> >Ideally at some point, this process could be automated.  I was
> >>wondering
> >> >whether a generic task could be written for step #2 but I think it
> >>would
> >> >require a couple of constraints:
> >> >
> >> >1) All meaningfully-partitioned topics would need to include their
> >>keys in
> >> >the stream.  In Kafka, this is optional unless you enable compaction
> >>but
> >> >for this to work generically, it would have to be mandatory in Samza
> >>for
> >> >any stream for which partitions have meaning (not using random or
> >> >round-robin partitioning).
> >> >2) The partition keys should be re-hashable based on their raw byte
> >> >representation so that the repartition task would not have to know how
> >>to
> >> >deserialize the keys in order to compute their new partition.  At first
> >> >glance, this doesn't seem too onerous but I saw in the Config Stream
> >> >proposal (SAMZA-348) that keys might be JSON:
> >> >
> >>
> >>>{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-t
> >>>ha
> >> >t-is-really-long.1000"
> >> >}
> >> >
> >> >This would be problematic as the order of the dictionary keys can
> >>change
> >> >but would still mean the same thing.  In order to use JSON as a serde
> >>for
> >> >keys, you'd need to enforce a sort order on dictionaries.
> >> >
> >> >I'm curious what others do about this or what your thoughts are.
> >>Thanks,
> >> >
> >> >Roger
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message