samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dan Di Spaltro <dan.dispal...@gmail.com>
Subject Re: Trying to achieve deterministic behavior on recovery/rewind
Date Wed, 10 Sep 2014 20:15:17 GMT
As Roger said, (this is a great exchange btw), I really was attracted to
Samza by the idea that I could stitch together flows at Runtime vs defining
a bolt up front.  However, the config does seem verbose, I would imagine
that the stitching doesn't change between dev and production, which makes
me feel like it should be something more compile time.  Maybe even like a
fluent builder, or some code generation. It kinda feels like you should
treat it like an AST.

On Mon, Sep 8, 2014 at 3:36 PM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey Roger,
>
> This summary sounds pretty reasonable to me, on the Samza side, anyway. I
> can't comment on the others, as I don't have a great understanding of
> their internals.
>
> I agree with you about the knock on Samza's config system. It's something
> I think we'll need to resolve, but we don't have a concrete proposal on
> how, yet. Given that you've had a look at these three systems, do you have
> any feedback on what you'd like to see. The topic-name issue is a
> relatively specific problem, but maybe you have thoughts on a higher-level
> design that you'd like to have?
>
> Right now, we have:
>
> 1. We mix wiring and config in config files. Wiring in configs leads to
> runtime errors. We have no code-level "builder" for assembling a job, and
> no DI framework.
> 2. Everything is a k-v string pair. Nothing is hierarchical.
> 3. Config is exposed to developers via a single Config object, that's
> nothing more than a Map<String, String> with some helper methods.
>
> Do you have any thoughts on what you liked from the other frameworks, and
> what you'd like to see Samza do in this area?
>
> Cheers,
> Chris
>
> On 9/8/14 2:23 PM, "Roger Hoover" <roger.hoover@gmail.com> wrote:
>
> >Hi Chris,
> >
> >We've been looking at Samza for a number of use cases ranging from system
> >monitoring to user activity analysis for a pretty large retail website.
> > For system monitoring, for example, we want to push system performance
> >events into Kafka then have Samza jobs enrich these events with additional
> >context such as information about the deployment topology and/or the user
> >who triggered it.  Then we want to feed the enriched streams into various
> >data stores including Elasticsearch and Hadoop for analysis.
> >
> >I've been diving into Samza, Spark Streaming, and Storm.  So far, I'm
> >impressed with the simplicity of Samza.  I feel like it's the easiest to
> >reason about and can scale up to larger teams where not everyone is an
> >expert on the framework.  I may be wrong about some of my conclusions so
> >I'm interested in anyone else wants to share their experience.
> >
> >Samza
> >  - The simplest to reason about from a developer's perspective.  Tasks
> >are
> >single threaded and the API is very simple to understand.  Local state
> >management API simplifies the developer's job a lot and should give
> >excellent performance.  Scalability is well defined (parallelism is
> >determined by number of Kafka partitions + number of tasks).  Topologies
> >are explicit.  You create tasks for each step and wire them together.
> >  - The simplest to operate because each task is independent from the
> >others.  Kafka protects against backpressure and provides fine grain fault
> >tolerance.  Seems unlikely that a poorly written job/task will hurt more
> >than itself.  Other than Kafka, there are minimal network dependencies.
> > There are relatively few moving parts.
> >  - The framework is pretty lightweight, relying Kafka and YARN to do most
> >of the work.  Fantastic documentation!
> >  - The primary downside that I see is that there is a lot of wiring
> >involved and may be easy to mess it up.  For example, topic names need to
> >specified both in the code and the config and need to match across
> >tasks/jobs.  You probably won't know that you've screwed up until you find
> >a task is not receiving any messages.  You also have to change both the
> >code and the config if you change the name of a topic.  In larger
> >organizations, these activities might be done by people on different
> >teams.
> >
> >Storm
> >- Trident will do some auto-wiring of topologies but it's hard to reason
> >able the generated topologies, especially transactional behavior.
> >- Managing non-local state can be tricky to get right.
> >- There are lots of moving parts that have to be tuned (LMAX disruptor
> >queues, 0mq queues)
> >- Ships closures around the cluster.  Makes it hard to work with things
> >like JRuby.
> >- Framework provides things like cluster management/scheduling that can be
> >done elsewhere (YARN or Mesos)
> >
> >Spark Streaming
> >- Quite a few things are implicit.  You write jobs as if they were going
> >to
> >execute in a single process.  These get broken into stages and scheduled
> >on
> >the cluster.  You need a pretty good understanding of this process to
> >write, debug, and tune these jobs.  I think it's fairly easy to
> >accidentally include a reference to a large Java object in worker code and
> >have it shipped across the network unintentionally.
> >- All the code/objects that run on workers must be serializable.  I'm
> >guessing this can be quite annoying.  Samza ships the jar to the worker so
> >I think it should be able to run anything (JRuby for example).
> >- The driver process is a single point of failure, the only place were
> >some
> >job state is saved.
> >- Seems operationally risky.  As far as I can tell, a slow consumer tie up
> >the entire cluster.  This is because there is no backpressure that stops a
> >driver from producing more and more DStreams, even if consumers cannot
> >keep
> >up.  The tuning guide suggests that everytime you make a change to a job,
> >you need to tune your settings again (
> >
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#perf
> >ormance-tuning
> >).
> >
> >Cheers,
> >
> >Roger
> >
> >
> >On Fri, Sep 5, 2014 at 4:14 PM, Chris Riccomini <
> >criccomini@linkedin.com.invalid> wrote:
> >
> >> Hey Roger,
> >>
> >> > After thinking more about it, I don't think we can get to the
> >> >deterministic behavior we talked about until Kafka supports idempotent
> >> >producers.
> >>
> >> Yes, I agree. To get fully deterministic behavior, you do need
> >>idempotent
> >> producers + deterministic message ordering (or Kafka transactionality)
> >>at
> >> the Kafka level. We plan on relying on Kafka transactionality when the
> >> patch is committed. Without this, it's possible to output two different
> >> "correct" answers (for my definition of correct), or two of the same
> >> "correct" answers (for your definition of correct).
> >>
> >> > I created a JIRA ticket in case it helps spur action or keep the
> >> >conversation from getting lost.
> >>
> >> Awesome, thanks!
> >>
> >> Out of curiosity, what are you using Samza for? You seem to have quite a
> >> deep understanding of it. :)
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 9/4/14 10:06 AM, "Roger Hoover" <roger.hoover@gmail.com> wrote:
> >>
> >> >Chris, Chinmay,
> >> >
> >> >After thinking more about it, I don't think we can get to the
> >> >deterministic
> >> >behavior we talked about until Kafka supports idempotent producers.
> >>The
> >> >reason is that duplicate messages mean that we can't rely on strong
> >> >ordering.  If a duplicate of update U1 can show up anytime, then we can
> >> >never rely on ordered updates because we might see U1 then U2 then U1
> >> >again.  I guess we could try to handle this at the application layer
> >>but
> >> >not at the Samza layer yet, I think.
> >> >
> >> >Nonetheless, some of the changes we discussed may help get closer and
> >> >still
> >> >make sense to implement.  I created a JIRA ticket in case it helps spur
> >> >action or keep the conversation from getting lost.  If you don't find
> >>the
> >> >ticket useful at this time, feel free to close it.
> >> >
> >> >https://issues.apache.org/jira/browse/SAMZA-405
> >> >
> >> >Cheers,
> >> >
> >> >Roger
> >> >
> >> >
> >> >On Wed, Sep 3, 2014 at 11:26 AM, Chinmay Soman
> >> ><csoman@linkedin.com.invalid>
> >> >wrote:
> >> >
> >> >> > bootstrapping is ever necessary on recovery or rewind.  Seems
like
> >> >>it's
> >> >> only needed for cold start.
> >> >>
> >> >> I think you're right. Either ways, it looks like there should be
> >> >> additional support for this.
> >> >> ________________________________________
> >> >> From: Roger Hoover [roger.hoover@gmail.com]
> >> >> Sent: Wednesday, September 03, 2014 9:43 AM
> >> >> To: dev@samza.incubator.apache.org
> >> >> Subject: Re: Trying to achieve deterministic behavior on
> >>recovery/rewind
> >> >>
> >> >> Chinmay,
> >> >>
> >> >> Thank you for the feedback.  Responses inline.
> >> >>
> >> >> On Tue, Sep 2, 2014 at 2:09 PM, Chinmay Soman
> >> >><csoman@linkedin.com.invalid
> >> >> >
> >> >> wrote:
> >> >>
> >> >> > That's interesting !
> >> >> >
> >> >> > 1) Deterministic reading from a bootstrap stream:
> >> >> > We could define a changelog for the local state (which in turn
is
> >> >> > populated using a bootstrap stream). If the job fails at this
> >>point,
> >> >> > ideally, it should be restored using a changelog stream (instead
of
> >> >> > bootstrapping again) in order for the job to be deterministic
(as
> >>you
> >> >> > suggest). Thus there could be a check which either selects the
> >> >>bootstrap
> >> >> > mode or the changelog restore mode (depending on whether a
> >>changelog
> >> >> > exists).  I'm not sure if this check exists today (I would guess
> >>no).
> >> >> >
> >> >>
> >> >> Yes, I was wondering about this for event-table join.   If you're
> >>only
> >> >> storing the join table in the local store then the change log stream
> >>and
> >> >> the bootstrap stream are duplicates of each other.  One of them is
> >> >> unnecessary unless you add additional state to the local store.  In
> >>any
> >> >> case, I'm wondering if bootstrapping is ever necessary on recovery
or
> >> >> rewind.  Seems like it's only needed for cold start.
> >> >>
> >> >>
> >> >> >
> >> >> > 2) Deterministic changelog:
> >> >> > You're right - there could be a (smallish) window where we
> >>re-process
> >> >> some
> >> >> > of the input records on a container restart. This can happen since
> >>the
> >> >> > changelog can be (a little ahead) of the last checkpoint for a
> >>given
> >> >> input
> >> >> > stream. However, I would argue the changelog is still
> >>deterministic in
> >> >> this
> >> >> > case. Since currently Samza only guarantees at-least-once
> >>semantics,
> >> >>this
> >> >> > seems to be OK
> >> >>
> >> >>
> >> >>
> >> >> Hmmm...seems like this will violate the correctness definition that
> >> >>Chris
> >> >> outlined where there may be two or more "correct" choices but the
> >>system
> >> >> would guarantee that only one will be produced.  But now that you
> >> >>mention
> >> >> it, I don't know if that can ever been guaranteed with an
> >>at-least-once
> >> >> system.  If a task can always see duplicates, it may process the
> >>first
> >> >>with
> >> >> it's local state in state S1 then modify it's state to S2 and then
> >> >>process
> >> >> the duplicate.
> >> >>
> >> >>
> >> >>
> >> >> >
> >> >> > 3) Deterministic MessageChooser:
> >> >> > The in-order problem could be avoided, if we restore the state
from
> >> >>its
> >> >> > changelog - which was originally populated by a 'bootstrap stream'.
> >> >>The
> >> >> > task can then just pick up from where it left off (making the
> >>system
> >> >> > deterministic). Having said that, there might be value in writing
> >>an
> >> >> > 'EarliestFirstChooser'.
> >> >> >
> >> >>
> >> >> Agreed.
> >> >>
> >> >> >
> >> >> > Again, this is just my perception (which could be wrong - I'm
still
> >> >> > learning).
> >> >> > C
> >> >> >
> >> >> > ________________________________________
> >> >> > From: Roger Hoover [roger.hoover@gmail.com]
> >> >> > Sent: Tuesday, September 02, 2014 8:52 AM
> >> >> > To: dev@samza.incubator.apache.org
> >> >> > Subject: Trying to achieve deterministic behavior on
> >>recovery/rewind
> >> >> >
> >> >> > Hi Samza devs,
> >> >> >
> >> >> > I think this project has the best documentation I've even seen!
> >> >>Amazing
> >> >> > job.  It's extremely well written and Hello Samza is a really
great
> >> >> example
> >> >> > that I was able to run + modify without issue.  It was a joy
> >>reading
> >> >>the
> >> >> > docs and playing around with example.  Kudos!
> >> >> >
> >> >> > After thoroughly reading all the docs, I still have a few questions
> >> >>and
> >> >> > would appreciate any feedback.
> >> >> >
> >> >> > I was thinking about how to support deterministic behavior on
> >> >>recovery or
> >> >> > rewind.  Maybe it can't always be 100% deterministic but I think
we
> >> >>can
> >> >> get
> >> >> > close.  Have other people thought about this?  Is it desirable?
> >> >> >
> >> >> > For example, let's say we're joining two streams: orders and
> >> >>user_info.
> >> >> As
> >> >> > orders come in, we use the user_id field of the order to lookup
> >> >> additional
> >> >> > information about the user and enrich the stream.  Say we're
> >>keeping
> >> >>all
> >> >> > the user_info state in the local KV store.
> >> >> >
> >> >> > t1: User updates her email to "foo1@bar.com"
> >> >> > t2: User buys a pair of jeans (product_id == 99)
> >> >> > t3: User updates her email to "foo2@bar.com"
> >> >> >
> >> >> > In the case of normal operation (no delays in the user_info
> >>stream),
> >> >>the
> >> >> > enriched record will be:
> >> >> >
> >> >> > {product_id: 99, email: "foo1@bar.com", ...}
> >> >> >
> >> >> > But say that our job fails before it can checkpoint and is
> >>configured
> >> >>to
> >> >> > bootstrap from user_info.  When it gets restarted and bootstraps
> >>from
> >> >>the
> >> >> > user_info stream, it will end up with the email set to
> >>"foo2@bar.com"
> >> >>in
> >> >> > the local KV store.  Then it will reprocess the order event and
> >> >>produce
> >> >> the
> >> >> > "wrong" output:
> >> >> >
> >> >> > {product_id: 99, email: "foo2@bar.com", ...}
> >> >> >
> >> >> > I haven't verified that but the documentation says "a bootstrap
> >>stream
> >> >> > waits for the consumer to explicitly confirm that the stream has
> >>been
> >> >> fully
> >> >> > consumed."  Shouldn't it wait until it's consumed up the the
> >> >>checkpoint
> >> >> > offset for the bootsrap stream instead (when there is saved
> >>checkpoint
> >> >> > offset)?
> >> >> >
> >> >> > Likewise, for local state replicated in the change log.  During
the
> >> >> > checkpoint process, Samza could include it's producer offset in
the
> >> >> > checkpoint data so that during recovery, the local state will
be
> >> >>restored
> >> >> > to a state that corresponds with it's offsets for the input
> >>streams.
> >> >> >  Everything would be coherent rather than having the input streams
> >> >> restored
> >> >> > to checkpoint and local state restored to most recent value. 
I'm
> >> >> assuming
> >> >> > that change log commits for local state and checkpoint are done
> >> >>together
> >> >> in
> >> >> > an atomic transaction so that they may not always match.
> >> >> >
> >> >> > The other missing piece is a nearly deterministic MessageChooser.
> >> >>During
> >> >> > recovery + rewind, all the messages in both streams are already
> >> >>present
> >> >> in
> >> >> > Kafka and we want a way to replay them in the same order as if
they
> >> >>were
> >> >> > played in real-time.  The only way to approximate this behavior
> >>that I
> >> >> can
> >> >> > see is to use Kafka broker timestamps for each message.  Is it
> >> >>possible
> >> >> to
> >> >> > write an "EarliestFirstChooser" that always chooses the oldest
> >>message
> >> >> > available according to the timestamp it was received by the Kafka
> >> >>broker?
> >> >> >
> >> >> > I don't know if Kafka stores a timestamp with each message but
I'm
> >> >> assuming
> >> >> > it does because it supports an API on the simple consumer called
> >> >> > getOffsetsBefore() that would seem to map from timestamps to
> >>offsets.
> >> >> >
> >> >> > Finally, a nit pick.  I'm using Samza 0.7.0 but the metrics data
> >>has
> >> >>the
> >> >> > version as {"samza-version": "0.0.1"}.  Is this intentional?
> >> >> >
> >> >> > If it makes sense, I can put in some JIRA tickets for this stuff...
> >> >> >
> >> >> > Cheers,
> >> >> >
> >> >> > Roger
> >> >> >
> >> >>
> >>
> >>
>
>


-- 
Dan Di Spaltro

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message