samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeremy p <athomewithagroove...@gmail.com>
Subject Re: How to deal with bootstrapping
Date Thu, 16 Apr 2015 20:51:12 GMT
Thank you for the response.  Does this mean the Old-Rules-Job would need to
maintain a Last-Processed-Old-Rules offset for each partition?

On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black <b@b3k.us> wrote:

> Offsets are per partition. The alternative would have poor scaling behavior
> for both brokers and consumers.
>
> On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <athomewithagroovebox@gmail.com>
> wrote:
>
> > Thanks to everybody for the responses!
> >
> > Yi : The queue must be processed in order, which means that I cannot use
> > Ben and Guozhang's approach.
> >
> > However, it is not necessary that all rules be processed at the same
> offset
> > and at the same speed.  This is why I considered a solution where we had
> a
> > separate job for each rule.  The problem with that solution is that we
> > could have thousands of these rules, which would mean thousands of jobs.
> > These jobs would be really lightweight and would require very few system
> > resources.  However, I don't know if having thousands of jobs would break
> > YARN.
> >
> > For now, it sounds like Yan's solution would be the best. However, I
> have a
> > few questions about it.  For now, let's call the original job the
> > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
> > solution, as I understand it :
> >
> > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We start
> > the All-Rules-Job.  The All-Rules-Job will only apply new rules until it
> > gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job gets
> > to the Last-Processed-Old-Rules offset, it sends a kill signal to
> > Old-Rules-Job along a control stream.  Old-Rules-Job terminates itself.
> > Then the All-Rules-Job applies both old and new rules to every message
> that
> > comes in.
> >
> > My questions :
> >
> > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every
> > time it processes a message?  How does the Old-Rules-Job expose the
> > Last-Processed-Rules offset to the All-Rules-Job?  Would the
> > Last-Processed-Rules offset be the absolute offset within a topic, and
> not
> > the offset within a partition?  Is there a way to find out a message's
> > absolute offset within a topic?
> >
> > Thanks again for all the help!
> >
> > --Jeremy
> >
> >
> > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan <nickpan47@gmail.com> wrote:
> >
> > > Hi, Jeremy,
> > >
> > > I saw the following requirements from your use case:
> > >
> > > 1) New rules need to be dynamically added w/ creating too many Samza
> jobs
> > > (e.g. 1 Samza job per new rule is too much)
> > > 2) Old rules need to continue processing when new rules are added
> > >
> > > I want to ask a few more questions regarding to your requirements:
> > >
> > > Q.1) Is it required that for a new rule, the bootstrap processing of
> > > messages from offset 0 to Last-Processed-Old-Rules has to be done
> before
> > > the new rules can be applied to messages from offset
> > > Last-Processed-Old-Rules?
> > > Q.2) Is it required that after bootstrap, all rules are processing the
> > > message at the same offset w/ the same speed?
> > >
> > > If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we
> > will
> > > have to slow down or stop the jobs for the old rules s.t. the jobs
> > running
> > > both new and old rules can catch up, as Yan pointed out. If answers to
> > both
> > > questions above are no (which I doubt since you need to build-up
> certain
> > > "history" for the new rule before you can apply it to later messages),
> > you
> > > can take Ben/Guozhang's approach w/o coordination between the two jobs.
> > >
> > > Now the interesting case is that your answer to Q.1 is yes, and to Q.2
> is
> > > no, which essentially post a request that your job will need to keep
> > > multiple independent consumer offsets per rule and let them move w/
> their
> > > own speed. Or, at least one bootstrap consumer, and one normal
> processing
> > > consumer on the same system stream partition within a single job. I
> don't
> > > think that Samza support this now. And the only work around is Yan's
> > > solution which requires coordination between two jobs.
> > >
> > > -Yi
> > >
> > > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang <yanfang724@gmail.com>
> wrote:
> > >
> > > > you are able to call coordinator.shutdown to shut the job down after
> it
> > > > reaches the offset.
> > > >
> > > > Thanks,
> > > >
> > > > Fang, Yan
> > > > yanfang724@gmail.com
> > > >
> > > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > > > I feel Ben's solution a bit simpler that you just need to restart
> > your
> > > > > current job with both rules on the check pointed offset, and start
> a
> > > new
> > > > > job from offset 0 with only the new rule and it will stop at the
> > > checkout
> > > > > pointed offset. But of course it requires the second job to be able
> > to
> > > > > shutdown itself upon some specific offset which I am not sure if
it
> > is
> > > > > already supported.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang <yanfang724@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Jeremy,
> > > > > >
> > > > > > In order to reach this goal, we have to assume that the job
with
> > new
> > > > > rules
> > > > > > can always catch up with the one with old rules. Otherwise,
I
> think
> > > we
> > > > do
> > > > > > not have the choice but running a lot of jobs simultaneously.
> > > > > >
> > > > > > Under our assumption, we have job1 with old rules running, and
> now
> > > add
> > > > > job2
> > > > > > which integrates old rules and new rules to run. Job2 frequently
> > > > > > checks the Last-Processed-Old-Rules
> > > > > > offset from job1 (because job1 is running too), and it only
> applies
> > > new
> > > > > > rule to the data until catch up with the Last-Processed-Old-Rules
> > > > offset.
> > > > > > Then it sends signal to the job1 and shutdown job1, and applies
> all
> > > > rules
> > > > > > to the stream.
> > > > > >
> > > > > > In terms of how to shutdown the job1, here is one solution
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3CCFE93D17.2D24B%25criccomini@linkedin.com%3E
> > > > > > >
> > > > > > provided by Chris - e.g. you can have a control stream to get
> job1
> > > > > > shutdown. Samza will provide this kind of stream after SAMZA-348
> > > > > > <https://issues.apache.org/jira/browse/SAMZA-348>, which
is
> under
> > > > active
> > > > > > development.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Fang, Yan
> > > > > > yanfang724@gmail.com
> > > > > >
> > > > > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <
> > > > > athomewithagroovebox@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hello Yan,
> > > > > > >
> > > > > > > Thank you for the suggestion!  I think your solution would
> work,
> > > > > > however, I
> > > > > > > am afraid it would create a performance problem for our
users.
> > > > > > >
> > > > > > > Let's say we kill the Classifier task, and create a new
> > Classifier
> > > > task
> > > > > > > with both the existing rules and new rules. We get the
offset
> of
> > > the
> > > > > > > latest-processed message for the old rules.  Let's call
this
> > offset
> > > > > > > Last-Processed-Old-Rules.  We ignore messages
> > > > > > > before Last-Processed-Old-Rules for the old rules.  We
> configure
> > > the
> > > > > new
> > > > > > > Classifier task to be a bootstrap task.
> > > > > > >
> > > > > > > Let's say we have users who are watching the output topics,
and
> > > they
> > > > > are
> > > > > > > expecting near-realtime updates.  They won't see any updates
> for
> > > the
> > > > > old
> > > > > > > rules until our task has passed the Last-Processed-Old-Rules
> > > offset.
> > > > > If
> > > > > > we
> > > > > > > have a lot of messages in that topic, that could take a
long
> > time.
> > > > > This
> > > > > > is
> > > > > > > why I was hoping there would be a way to bootstrap the
new
> rules
> > > > while
> > > > > > > we're still processing the old rules.  Do you think there
is a
> > way
> > > to
> > > > > do
> > > > > > > that?
> > > > > > >
> > > > > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <
> yanfang724@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jeremy,
> > > > > > > >
> > > > > > > > If my understanding is correct, whenever you add a
new rule,
> > you
> > > > want
> > > > > > to
> > > > > > > > apply this rule to the historical data. Right?
> > > > > > > >
> > > > > > > > If you do not care about duplication, you can create
a new
> task
> > > > that
> > > > > > > > contains existing rules and new rules. Configure bootstrap.
> > This
> > > > will
> > > > > > > apply
> > > > > > > > all the rules from the beginning of the input stream.
The
> > > > shortcoming
> > > > > > is
> > > > > > > > you will get duplicated results for old rules.
> > > > > > > >
> > > > > > > > If you can not tolerate the shortcoming, 1) get the
offset of
> > the
> > > > > > > > latest-processed message of old rules. 2) In your
new task,
> > > ignore
> > > > > > > messages
> > > > > > > > before that offset for the old rules. 3) bootstrap.
> > > > > > > >
> > > > > > > > Hope this helps. Maybe your use case is more complicated?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Fang, Yan
> > > > > > > > yanfang724@gmail.com
> > > > > > > >
> > > > > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> > > > > > > athomewithagroovebox@gmail.com
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > So, I'm wanting to use Samza for a project I'm
working on,
> > but
> > > I
> > > > > keep
> > > > > > > > > running into a problem with bootstrapping.
> > > > > > > > >
> > > > > > > > > Let's say there's a Kafka topic called Numbers
that I want
> to
> > > > > consume
> > > > > > > > with
> > > > > > > > > Samza.  Let's say each message has a single integer
in it,
> > and
> > > I
> > > > > want
> > > > > > > to
> > > > > > > > > classify it as even or odd.  So I have two topics
that I'm
> > > using
> > > > > for
> > > > > > > > > output, one called Even and one called Odd. 
I write a
> simple
> > > > > stream
> > > > > > > task
> > > > > > > > > called Classifier that consumes the Numbers topic,
examines
> > > each
> > > > > > > incoming
> > > > > > > > > integer and writes it back out to Even or Odd.
> > > > > > > > >
> > > > > > > > > Now, let's say I want to be able to add classifications
> > > > > dynamically,
> > > > > > > > like :
> > > > > > > > > "divisible by three", "divisible by four", or
"numbers that
> > > > appear
> > > > > in
> > > > > > > my
> > > > > > > > > date of birth".  And let's say I have an API
I can query
> that
> > > > gives
> > > > > > me
> > > > > > > > all
> > > > > > > > > the assignment rules, such as "when a number
is divisble by
> > 3,
> > > > > write
> > > > > > it
> > > > > > > > out
> > > > > > > > > to a topic called 'divisible_by_three'", or "when
a number
> > > > appears
> > > > > in
> > > > > > > the
> > > > > > > > > string 12/12/1981, write it to the 'my_birthday'
topic".
> So
> > > now
> > > > I
> > > > > > > > rewrite
> > > > > > > > > my stream task to query this API for assignment
rules.  It
> > > reads
> > > > > > > integers
> > > > > > > > > from the Numbers topic and writes them back out
to one or
> > more
> > > > > output
> > > > > > > > > topics, according to the assignment rules.
> > > > > > > > >
> > > > > > > > > Now, let's make this even more complicated. 
When I add a
> new
> > > > > > > > > classification, I want to go back to the very
beginning of
> > the
> > > > > > Numbers
> > > > > > > > > topic and classify them accordingly.  Once we've
consumed
> all
> > > the
> > > > > old
> > > > > > > > > "historical" integers, I want to apply this classification
> > new
> > > > > > integers
> > > > > > > > as
> > > > > > > > > they come in.
> > > > > > > > >
> > > > > > > > > And this is where I get stuck.
> > > > > > > > >
> > > > > > > > > One thing I can do : when I want to add a new
> > classification, I
> > > > can
> > > > > > > > create
> > > > > > > > > a bootstrap job by setting the
> > > > > > > > > "systems.kafka.streams.numbers.samza.offset.default"
> property
> > > to
> > > > > > > > "oldest".
> > > > > > > > > And that's great, but the problem is, once I've
"caught
> up",
> > > I'd
> > > > > like
> > > > > > > to
> > > > > > > > > kill the bootstrap job and just let the Classifier
handle
> > this
> > > > new
> > > > > > > > > assignment.  So, I'd want to do some kind of
handover from
> > the
> > > > > > > bootstrap
> > > > > > > > > job to the Classifier job.  But how to do this?
> > > > > > > > >
> > > > > > > > > So, the question I must ask is this : Is Samza
even an
> > > appopriate
> > > > > way
> > > > > > > to
> > > > > > > > > solve this problem?  Has this problem ever come
up for
> > anybody
> > > > > else?
> > > > > > > How
> > > > > > > > > have they solved it?  I would really like to
use Samza
> > because
> > > it
> > > > > > seems
> > > > > > > > > like an appopriate technology, and I'd really
really really
> > > > really
> > > > > > like
> > > > > > > > to
> > > > > > > > > avoid re-inventing the wheel.
> > > > > > > > >
> > > > > > > > > A couple solutions I came up with :
> > > > > > > > >
> > > > > > > > > 1) The simple solution.  Have a separate Samza
job for each
> > > > > > > > > classification.  If I want to add a new classification,
I
> > > create
> > > > a
> > > > > > new
> > > > > > > > job
> > > > > > > > > and set it up as a bootstrap job.  This would
solve the
> > > problem.
> > > > > > > > However,
> > > > > > > > > we may want to have many, many classifications.
 It could
> be
> > as
> > > > > many
> > > > > > as
> > > > > > > > > 1,000,000, which would mean up to 1,000,000 simultaneously
> > > > running
> > > > > > > jobs.
> > > > > > > > > This could create a lot of overhead for YARN
and Kafka.
> > > > > > > > >
> > > > > > > > > 2) My overly-complicated workaround solution.
 Each
> > assignment
> > > > rule
> > > > > > has
> > > > > > > > an
> > > > > > > > > "isnew" flag.  If it's a new classification that
hasn't
> fully
> > > > > > > > bootstrapped
> > > > > > > > > yet, the "isnew" flag is set to TRUE.  When my
classifier
> > > queries
> > > > > the
> > > > > > > API
> > > > > > > > > for assignment rules, it ignores any rule with
an "isnew"
> > flag.
> > > > > > When I
> > > > > > > > > want to add a new classification, I create a
new bootstrap
> > job
> > > > for
> > > > > > that
> > > > > > > > > classification.  Every so often, maybe every
few days or
> so,
> > if
> > > > all
> > > > > > of
> > > > > > > my
> > > > > > > > > bootstrap jobs have "caught up", I kill all of
the
> bootstrap
> > > jobs
> > > > > and
> > > > > > > > > classifier jobs.  I set all the "isnew" flags
to FALSE.
> > Then I
> > > > > > restart
> > > > > > > > the
> > > > > > > > > classifier job.  This is kind of an ugly solution,
and I'm
> > not
> > > > even
> > > > > > > sure
> > > > > > > > it
> > > > > > > > > would work.  For one thing, I'd need some way
of knowing
> if a
> > > > > > boostrap
> > > > > > > > job
> > > > > > > > > has "caught up".  Secondly, I'd essentially be
restarting
> the
> > > > > > > classifier
> > > > > > > > > job periodically, which just seems like an ugly
solution.
> I
> > > > don't
> > > > > > like
> > > > > > > > it.
> > > > > > > > >
> > > > > > > > > 3) Some other kind of really complicated solution
I haven't
> > > > thought
> > > > > > of
> > > > > > > > yet,
> > > > > > > > > probably involving locks, transactions, concurrancy,
and
> > > > > interprocess
> > > > > > > > > communication.
> > > > > > > > >
> > > > > > > > > Thanks for reading this whole thing.  Please
let me know if
> > you
> > > > > have
> > > > > > > any
> > > > > > > > > suggestions.
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message