samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roger Hoover <roger.hoo...@gmail.com>
Subject Re: How to deal with scaling?
Date Fri, 26 Sep 2014 21:34:28 GMT
Thanks, Jakob.

On Fri, Sep 26, 2014 at 2:21 PM, Jakob Homan <jghoman@gmail.com> wrote:

> This is pretty close to how things are laid out.  The data from the 'public
> facing' kafka clusters are Mirror-made into Samza-specific Kafka clusters,
> which are colocated (though not necessarily on the same box) as the YARN
> resources.  Data produced through the Samza jobs is written to the Samza
> cluster and then  mirror-made to other clusters for consumption.  This
> approach has the advantage of keeping the Samza processes separate,
> controlled and out of the production path.  The disadvantage is more
> complexity, machines and a tiny bit of latency via the mirror making, but
> overall this approach is pretty rock solid.
>
> -Jakob
>
>
> On Fri, Sep 26, 2014 at 2:14 PM, Roger Hoover <roger.hoover@gmail.com>
> wrote:
>
> > Chris,
> >
> > Would mind giving some advice on my deployment question below?
> >
> > "Do you recommend having two separate Kafka clusters?  In the "public"
> > cluster, brokers would be deployed on machines by themselves.  Then you
> > have another Kafka cluster for Samza in which the brokers are co-located
> > with YARN NodeManagers on each machine.  With this approach, Samza
> > topologies would consume from and ultimately publish to topics on the
> > "public" cluster.  All of the internal topics like repartitioning,
> > changelog, etc. would be hidden away in the Kafka cluster dedicated to
> > Samza."
> >
> > Thanks,
> >
> > Roger
> >
> > On Fri, Sep 26, 2014 at 11:20 AM, Roger Hoover <roger.hoover@gmail.com>
> > wrote:
> >
> > > Chris,
> > >
> > > Thanks for the great answers.  It's helping me clear up my thinking...
> > >
> > > On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini <
> > > criccomini@linkedin.com.invalid> wrote:
> > >
> > >> Hey Roger,
> > >>
> > >> > If the job's input topics are partitioned by key, then you cannot
> add
> > >> >more partitions without corrupting existing state.
> > >>
> > >> This is correct.
> > >>
> > >> > Does this come up for people in practice?
> > >>
> > >> It does come up occasionally for us. Thus far, we usually just run a
> > Kafka
> > >> topic-partition expansion (thereby trashing the semantics of the
> > >> partitioning) and restart the job. Inconsistent output is then emitted
> > for
> > >> a while. We do this only when we agree that inconsistent output is
> > >> tolerable.
> > >>
> > >
> > > Thanks.  This might be a reasonable in many cases (not sure yet).
> > >
> > >
> > >>
> > >> Another thing we do for this is over-partition our Kafka topics when
> > we're
> > >> concerned about growth.
> > >>
> > >> Both of these solutions are admittedly hacky. As you said, the ideal
> > >> solution would be some kind of automatic migration. It seems possible
> > that
> > >> the AM (job coordinator) might be able to manage this, especially of
> we
> > >> had a pre-packaged "repartition job" that it could trigger. I haven't
> > >> thought about this in detail, though.
> > >>
> > >> > Deploy jobs to repartition inputs and changelog topics into the new
> > >> >topics
> > >>
> > >> The changelog topic seems problematic to me. It seems that they key
> used
> > >> in the changelog might not always be directly related to the
> > partitioning
> > >> of the input topic. For example,  if you have a StreamTask that is
> > >> consuming a single input partition, and keeping a count in the state
> > store
> > >> of all messages that it sees, how do you repartition this changelog?
> In
> > >> the new world, the keys for the single partition that it's consuming
> > could
> > >> be spread across many different partitions, and the count is pretty
> much
> > >> meaningless, since it can't be split up by key.
> > >>
> > >> It almost feels like state has to be totally reset to safely do an
> input
> > >> partition expansion under all cases. In a sense, you have to treat the
> > new
> > >> job as a job that's completely new, and start it from scratch.
> > >>
> > >
> > > Ah, you're right.  I think there's no way to migrate state in general.
> > If
> > > a job is saving any kind of aggregate state then that's an irreversible
> > > operation that was done on the old partition.  There's not enough
> > > information to "repartition" the results.
> > >
> > > Just to be more explicit about  "starting it from scratch".  The only
> way
> > > to do this theoretically correctly, I think, would be to have the newly
> > > partitioned job start with no state and playback it's input topics from
> > the
> > > beginning of time.
> > >
> > >
> > >
> > >>
> > >> > Change job config to point to new topics and restart the job
> > >>
> > >> One problem with this is going to be the case where you don't control
> > the
> > >> producers for the old input topic. They'd either have to be migrated
> to
> > >> produce to the new input topic for your job, or you'd have to
> > permanently
> > >> run the repartition job to move data from the original topic to the
> > >> currently expanded topic. Keeping the repartition job is not all that
> > wild
> > >> of an idea. Most Samza topologies we run have some form of a
> repartition
> > >> job that runs permanently at the beginning of their flow.
> > >>
> > >
> > > I was thinking about repartitioning as a good design pattern as well.
> > > Having your job always repartition the input decouples it from the it's
> > > upstream topic dependencies.  This brings me to another question about
> > > deployment.  Do you recommend having two separate Kafka clusters?  In
> the
> > > "public" cluster, brokers would be deployed on machines by themselves.
> > > Then you have another Kafka cluster for Samza in which the brokers are
> > > co-located with YARN NodeManagers on each machine.  With this approach,
> > > Samza topologies would consume from and ultimately publish to topics on
> > the
> > > "public" cluster.  All of the internal topics like repartitioning,
> > > changelog, etc. would be hidden away in the Kafka cluster dedicated to
> > > Samza.
> > >
> > >
> > >>
> > >> > All meaningfully-partitioned topics would need to include their keys
> > in
> > >> >the stream
> > >>
> > >> True. Somewhat tangential to this is the case where the key that's
> been
> > >> used is not the one your job wishes to partition by. In this case, a
> > >> repartition job would be required as well.
> > >>
> > >> > This would be problematic as the order of the dictionary keys can
> > change
> > >> >but would still mean the same thing.  In order to use JSON as a serde
> > for
> > >> >keys, you'd need to enforce a sort order on dictionaries.
> > >>
> > >> I struggled with this as well. We basically need a forced ordering for
> > the
> > >> JSON keys in SAMZA-348. Originally, I was thinking of making the
> > key/value
> > >> messages just a simple string with a delimiter. Something like
> > >> <type>:<key> for the key and <host>:<source>:<blah>
for the value.
> This
> > >> approach is also much more compact than JSON. The problem with the
> > latter
> > >> approach is that it doesn't easily allow for hierarchical key/value
> > pairs.
> > >>
> > >
> > > I've been constructing string keys in my jobs so far as you mentioned
> but
> > > it adds extra boilerplate to the code.  It would be nice if there were
> an
> > > automatic way to do it.
> > >
> > >
> > >>
> > >> Cheers,
> > >> Chris
> > >>
> > >> On 9/24/14 4:55 PM, "Roger Hoover" <roger.hoover@gmail.com> wrote:
> > >>
> > >> >Hi all,
> > >> >
> > >> >So it seems like one of the first decisions that you have to make
> when
> > >> >creating a Samza job is how many partitions to have in your input
> > topics.
> > >> >This will dictate how many tasks are created and how many changelog
> > >> >partitions get created.  It's great that you can independently change
> > the
> > >> >number of Samza containers that get deployed but what do you do once
> > you
> > >> >reach the max (# containers == # tasks)?
> > >> >
> > >> >If the job's input topics are partitioned by key, then you cannot add
> > >> more
> > >> >partitions without corrupting existing state.  Does this come up for
> > >> >people
> > >> >in practice?  How do you handle it?
> > >> >
> > >> >Just trying to think it through, it seems like you need a procedure
> > >> >something like this:
> > >> >
> > >> >1) Create new topics to hold the same data but with more partitions
> > >> >(inputs, outputs, and changelog topics)
> > >> >2) Deploy jobs to repartition inputs and changelog topics into the
> new
> > >> >topics
> > >> >3) When caught up, stop the running job
> > >> >4) Change job config to point to new topics and restart the job (if
> all
> > >> >topics are new, this can be done while previous job run is still
> active
> > >> >using new job.id)
> > >> >5) Change downstream jobs to use new output topic if necessary.
> Doing
> > >> >this
> > >> >in a safe way might be hard.
> > >> >
> > >> >Ideally at some point, this process could be automated.  I was
> > wondering
> > >> >whether a generic task could be written for step #2 but I think it
> > would
> > >> >require a couple of constraints:
> > >> >
> > >> >1) All meaningfully-partitioned topics would need to include their
> keys
> > >> in
> > >> >the stream.  In Kafka, this is optional unless you enable compaction
> > but
> > >> >for this to work generically, it would have to be mandatory in Samza
> > for
> > >> >any stream for which partitions have meaning (not using random or
> > >> >round-robin partitioning).
> > >> >2) The partition keys should be re-hashable based on their raw byte
> > >> >representation so that the repartition task would not have to know
> how
> > to
> > >> >deserialize the keys in order to compute their new partition.  At
> first
> > >> >glance, this doesn't seem too onerous but I saw in the Config Stream
> > >> >proposal (SAMZA-348) that keys might be JSON:
> > >> >
> > >>
> > >>
> >
> >{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-tha
> > >> >t-is-really-long.1000"
> > >> >}
> > >> >
> > >> >This would be problematic as the order of the dictionary keys can
> > change
> > >> >but would still mean the same thing.  In order to use JSON as a serde
> > for
> > >> >keys, you'd need to enforce a sort order on dictionaries.
> > >> >
> > >> >I'm curious what others do about this or what your thoughts are.
> > Thanks,
> > >> >
> > >> >Roger
> > >>
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message