samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremy p <athomewithagroove...@gmail.com>
Subject Re: How to deal with bootstrapping
Date Thu, 16 Apr 2015 20:01:53 GMT
Thanks to everybody for the responses!

Yi : The queue must be processed in order, which means that I cannot use
Ben and Guozhang's approach.

However, it is not necessary that all rules be processed at the same offset
and at the same speed.  This is why I considered a solution where we had a
separate job for each rule.  The problem with that solution is that we
could have thousands of these rules, which would mean thousands of jobs.
These jobs would be really lightweight and would require very few system
resources.  However, I don't know if having thousands of jobs would break
YARN.

For now, it sounds like Yan's solution would be the best. However, I have a
few questions about it.  For now, let's call the original job the
Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
solution, as I understand it :

The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We start
the All-Rules-Job.  The All-Rules-Job will only apply new rules until it
gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job gets
to the Last-Processed-Old-Rules offset, it sends a kill signal to
Old-Rules-Job along a control stream.  Old-Rules-Job terminates itself.
Then the All-Rules-Job applies both old and new rules to every message that
comes in.

My questions :

Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every
time it processes a message?  How does the Old-Rules-Job expose the
Last-Processed-Rules offset to the All-Rules-Job?  Would the
Last-Processed-Rules offset be the absolute offset within a topic, and not
the offset within a partition?  Is there a way to find out a message's
absolute offset within a topic?

Thanks again for all the help!

--Jeremy


On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan <nickpan47@gmail.com> wrote:

> Hi, Jeremy,
>
> I saw the following requirements from your use case:
>
> 1) New rules need to be dynamically added w/ creating too many Samza jobs
> (e.g. 1 Samza job per new rule is too much)
> 2) Old rules need to continue processing when new rules are added
>
> I want to ask a few more questions regarding to your requirements:
>
> Q.1) Is it required that for a new rule, the bootstrap processing of
> messages from offset 0 to Last-Processed-Old-Rules has to be done before
> the new rules can be applied to messages from offset
> Last-Processed-Old-Rules?
> Q.2) Is it required that after bootstrap, all rules are processing the
> message at the same offset w/ the same speed?
>
> If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we will
> have to slow down or stop the jobs for the old rules s.t. the jobs running
> both new and old rules can catch up, as Yan pointed out. If answers to both
> questions above are no (which I doubt since you need to build-up certain
> "history" for the new rule before you can apply it to later messages), you
> can take Ben/Guozhang's approach w/o coordination between the two jobs.
>
> Now the interesting case is that your answer to Q.1 is yes, and to Q.2 is
> no, which essentially post a request that your job will need to keep
> multiple independent consumer offsets per rule and let them move w/ their
> own speed. Or, at least one bootstrap consumer, and one normal processing
> consumer on the same system stream partition within a single job. I don't
> think that Samza support this now. And the only work around is Yan's
> solution which requires coordination between two jobs.
>
> -Yi
>
> On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang <yanfang724@gmail.com> wrote:
>
> > you are able to call coordinator.shutdown to shut the job down after it
> > reaches the offset.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang724@gmail.com
> >
> > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > I feel Ben's solution a bit simpler that you just need to restart your
> > > current job with both rules on the check pointed offset, and start a
> new
> > > job from offset 0 with only the new rule and it will stop at the
> checkout
> > > pointed offset. But of course it requires the second job to be able to
> > > shutdown itself upon some specific offset which I am not sure if it is
> > > already supported.
> > >
> > > Guozhang
> > >
> > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang <yanfang724@gmail.com>
> wrote:
> > >
> > > > Hi Jeremy,
> > > >
> > > > In order to reach this goal, we have to assume that the job with new
> > > rules
> > > > can always catch up with the one with old rules. Otherwise, I think
> we
> > do
> > > > not have the choice but running a lot of jobs simultaneously.
> > > >
> > > > Under our assumption, we have job1 with old rules running, and now
> add
> > > job2
> > > > which integrates old rules and new rules to run. Job2 frequently
> > > > checks the Last-Processed-Old-Rules
> > > > offset from job1 (because job1 is running too), and it only applies
> new
> > > > rule to the data until catch up with the Last-Processed-Old-Rules
> > offset.
> > > > Then it sends signal to the job1 and shutdown job1, and applies all
> > rules
> > > > to the stream.
> > > >
> > > > In terms of how to shutdown the job1, here is one solution
> > > > <
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3CCFE93D17.2D24B%25criccomini@linkedin.com%3E
> > > > >
> > > > provided by Chris - e.g. you can have a control stream to get job1
> > > > shutdown. Samza will provide this kind of stream after SAMZA-348
> > > > <https://issues.apache.org/jira/browse/SAMZA-348>, which is under
> > active
> > > > development.
> > > >
> > > > Thanks,
> > > >
> > > > Fang, Yan
> > > > yanfang724@gmail.com
> > > >
> > > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <
> > > athomewithagroovebox@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hello Yan,
> > > > >
> > > > > Thank you for the suggestion!  I think your solution would work,
> > > > however, I
> > > > > am afraid it would create a performance problem for our users.
> > > > >
> > > > > Let's say we kill the Classifier task, and create a new Classifier
> > task
> > > > > with both the existing rules and new rules. We get the offset of
> the
> > > > > latest-processed message for the old rules.  Let's call this offset
> > > > > Last-Processed-Old-Rules.  We ignore messages
> > > > > before Last-Processed-Old-Rules for the old rules.  We configure
> the
> > > new
> > > > > Classifier task to be a bootstrap task.
> > > > >
> > > > > Let's say we have users who are watching the output topics, and
> they
> > > are
> > > > > expecting near-realtime updates.  They won't see any updates for
> the
> > > old
> > > > > rules until our task has passed the Last-Processed-Old-Rules
> offset.
> > > If
> > > > we
> > > > > have a lot of messages in that topic, that could take a long time.
> > > This
> > > > is
> > > > > why I was hoping there would be a way to bootstrap the new rules
> > while
> > > > > we're still processing the old rules.  Do you think there is a way
> to
> > > do
> > > > > that?
> > > > >
> > > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <yanfang724@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Jeremy,
> > > > > >
> > > > > > If my understanding is correct, whenever you add a new rule,
you
> > want
> > > > to
> > > > > > apply this rule to the historical data. Right?
> > > > > >
> > > > > > If you do not care about duplication, you can create a new task
> > that
> > > > > > contains existing rules and new rules. Configure bootstrap.
This
> > will
> > > > > apply
> > > > > > all the rules from the beginning of the input stream. The
> > shortcoming
> > > > is
> > > > > > you will get duplicated results for old rules.
> > > > > >
> > > > > > If you can not tolerate the shortcoming, 1) get the offset of
the
> > > > > > latest-processed message of old rules. 2) In your new task,
> ignore
> > > > > messages
> > > > > > before that offset for the old rules. 3) bootstrap.
> > > > > >
> > > > > > Hope this helps. Maybe your use case is more complicated?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Fang, Yan
> > > > > > yanfang724@gmail.com
> > > > > >
> > > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> > > > > athomewithagroovebox@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > So, I'm wanting to use Samza for a project I'm working
on, but
> I
> > > keep
> > > > > > > running into a problem with bootstrapping.
> > > > > > >
> > > > > > > Let's say there's a Kafka topic called Numbers that I want
to
> > > consume
> > > > > > with
> > > > > > > Samza.  Let's say each message has a single integer in
it, and
> I
> > > want
> > > > > to
> > > > > > > classify it as even or odd.  So I have two topics that
I'm
> using
> > > for
> > > > > > > output, one called Even and one called Odd.  I write a
simple
> > > stream
> > > > > task
> > > > > > > called Classifier that consumes the Numbers topic, examines
> each
> > > > > incoming
> > > > > > > integer and writes it back out to Even or Odd.
> > > > > > >
> > > > > > > Now, let's say I want to be able to add classifications
> > > dynamically,
> > > > > > like :
> > > > > > > "divisible by three", "divisible by four", or "numbers
that
> > appear
> > > in
> > > > > my
> > > > > > > date of birth".  And let's say I have an API I can query
that
> > gives
> > > > me
> > > > > > all
> > > > > > > the assignment rules, such as "when a number is divisble
by 3,
> > > write
> > > > it
> > > > > > out
> > > > > > > to a topic called 'divisible_by_three'", or "when a number
> > appears
> > > in
> > > > > the
> > > > > > > string 12/12/1981, write it to the 'my_birthday' topic".
 So
> now
> > I
> > > > > > rewrite
> > > > > > > my stream task to query this API for assignment rules.
 It
> reads
> > > > > integers
> > > > > > > from the Numbers topic and writes them back out to one
or more
> > > output
> > > > > > > topics, according to the assignment rules.
> > > > > > >
> > > > > > > Now, let's make this even more complicated.  When I add
a new
> > > > > > > classification, I want to go back to the very beginning
of the
> > > > Numbers
> > > > > > > topic and classify them accordingly.  Once we've consumed
all
> the
> > > old
> > > > > > > "historical" integers, I want to apply this classification
new
> > > > integers
> > > > > > as
> > > > > > > they come in.
> > > > > > >
> > > > > > > And this is where I get stuck.
> > > > > > >
> > > > > > > One thing I can do : when I want to add a new classification,
I
> > can
> > > > > > create
> > > > > > > a bootstrap job by setting the
> > > > > > > "systems.kafka.streams.numbers.samza.offset.default" property
> to
> > > > > > "oldest".
> > > > > > > And that's great, but the problem is, once I've "caught
up",
> I'd
> > > like
> > > > > to
> > > > > > > kill the bootstrap job and just let the Classifier handle
this
> > new
> > > > > > > assignment.  So, I'd want to do some kind of handover from
the
> > > > > bootstrap
> > > > > > > job to the Classifier job.  But how to do this?
> > > > > > >
> > > > > > > So, the question I must ask is this : Is Samza even an
> appopriate
> > > way
> > > > > to
> > > > > > > solve this problem?  Has this problem ever come up for
anybody
> > > else?
> > > > > How
> > > > > > > have they solved it?  I would really like to use Samza
because
> it
> > > > seems
> > > > > > > like an appopriate technology, and I'd really really really
> > really
> > > > like
> > > > > > to
> > > > > > > avoid re-inventing the wheel.
> > > > > > >
> > > > > > > A couple solutions I came up with :
> > > > > > >
> > > > > > > 1) The simple solution.  Have a separate Samza job for
each
> > > > > > > classification.  If I want to add a new classification,
I
> create
> > a
> > > > new
> > > > > > job
> > > > > > > and set it up as a bootstrap job.  This would solve the
> problem.
> > > > > > However,
> > > > > > > we may want to have many, many classifications.  It could
be as
> > > many
> > > > as
> > > > > > > 1,000,000, which would mean up to 1,000,000 simultaneously
> > running
> > > > > jobs.
> > > > > > > This could create a lot of overhead for YARN and Kafka.
> > > > > > >
> > > > > > > 2) My overly-complicated workaround solution.  Each assignment
> > rule
> > > > has
> > > > > > an
> > > > > > > "isnew" flag.  If it's a new classification that hasn't
fully
> > > > > > bootstrapped
> > > > > > > yet, the "isnew" flag is set to TRUE.  When my classifier
> queries
> > > the
> > > > > API
> > > > > > > for assignment rules, it ignores any rule with an "isnew"
flag.
> > > > When I
> > > > > > > want to add a new classification, I create a new bootstrap
job
> > for
> > > > that
> > > > > > > classification.  Every so often, maybe every few days or
so, if
> > all
> > > > of
> > > > > my
> > > > > > > bootstrap jobs have "caught up", I kill all of the bootstrap
> jobs
> > > and
> > > > > > > classifier jobs.  I set all the "isnew" flags to FALSE.
 Then I
> > > > restart
> > > > > > the
> > > > > > > classifier job.  This is kind of an ugly solution, and
I'm not
> > even
> > > > > sure
> > > > > > it
> > > > > > > would work.  For one thing, I'd need some way of knowing
if a
> > > > boostrap
> > > > > > job
> > > > > > > has "caught up".  Secondly, I'd essentially be restarting
the
> > > > > classifier
> > > > > > > job periodically, which just seems like an ugly solution.
 I
> > don't
> > > > like
> > > > > > it.
> > > > > > >
> > > > > > > 3) Some other kind of really complicated solution I haven't
> > thought
> > > > of
> > > > > > yet,
> > > > > > > probably involving locks, transactions, concurrancy, and
> > > interprocess
> > > > > > > communication.
> > > > > > >
> > > > > > > Thanks for reading this whole thing.  Please let me know
if you
> > > have
> > > > > any
> > > > > > > suggestions.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message