samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roger Hoover <roger.hoo...@gmail.com>
Subject Re: Trying to achieve deterministic behavior on recovery/rewind
Date Mon, 08 Sep 2014 21:23:56 GMT
Hi Chris,

We've been looking at Samza for a number of use cases ranging from system
monitoring to user activity analysis for a pretty large retail website.
 For system monitoring, for example, we want to push system performance
events into Kafka then have Samza jobs enrich these events with additional
context such as information about the deployment topology and/or the user
who triggered it.  Then we want to feed the enriched streams into various
data stores including Elasticsearch and Hadoop for analysis.

I've been diving into Samza, Spark Streaming, and Storm.  So far, I'm
impressed with the simplicity of Samza.  I feel like it's the easiest to
reason about and can scale up to larger teams where not everyone is an
expert on the framework.  I may be wrong about some of my conclusions so
I'm interested in anyone else wants to share their experience.

Samza
  - The simplest to reason about from a developer's perspective.  Tasks are
single threaded and the API is very simple to understand.  Local state
management API simplifies the developer's job a lot and should give
excellent performance.  Scalability is well defined (parallelism is
determined by number of Kafka partitions + number of tasks).  Topologies
are explicit.  You create tasks for each step and wire them together.
  - The simplest to operate because each task is independent from the
others.  Kafka protects against backpressure and provides fine grain fault
tolerance.  Seems unlikely that a poorly written job/task will hurt more
than itself.  Other than Kafka, there are minimal network dependencies.
 There are relatively few moving parts.
  - The framework is pretty lightweight, relying Kafka and YARN to do most
of the work.  Fantastic documentation!
  - The primary downside that I see is that there is a lot of wiring
involved and may be easy to mess it up.  For example, topic names need to
specified both in the code and the config and need to match across
tasks/jobs.  You probably won't know that you've screwed up until you find
a task is not receiving any messages.  You also have to change both the
code and the config if you change the name of a topic.  In larger
organizations, these activities might be done by people on different teams.

Storm
- Trident will do some auto-wiring of topologies but it's hard to reason
able the generated topologies, especially transactional behavior.
- Managing non-local state can be tricky to get right.
- There are lots of moving parts that have to be tuned (LMAX disruptor
queues, 0mq queues)
- Ships closures around the cluster.  Makes it hard to work with things
like JRuby.
- Framework provides things like cluster management/scheduling that can be
done elsewhere (YARN or Mesos)

Spark Streaming
- Quite a few things are implicit.  You write jobs as if they were going to
execute in a single process.  These get broken into stages and scheduled on
the cluster.  You need a pretty good understanding of this process to
write, debug, and tune these jobs.  I think it's fairly easy to
accidentally include a reference to a large Java object in worker code and
have it shipped across the network unintentionally.
- All the code/objects that run on workers must be serializable.  I'm
guessing this can be quite annoying.  Samza ships the jar to the worker so
I think it should be able to run anything (JRuby for example).
- The driver process is a single point of failure, the only place were some
job state is saved.
- Seems operationally risky.  As far as I can tell, a slow consumer tie up
the entire cluster.  This is because there is no backpressure that stops a
driver from producing more and more DStreams, even if consumers cannot keep
up.  The tuning guide suggests that everytime you make a change to a job,
you need to tune your settings again (
https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
).

Cheers,

Roger


On Fri, Sep 5, 2014 at 4:14 PM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey Roger,
>
> > After thinking more about it, I don't think we can get to the
> >deterministic behavior we talked about until Kafka supports idempotent
> >producers.
>
> Yes, I agree. To get fully deterministic behavior, you do need idempotent
> producers + deterministic message ordering (or Kafka transactionality) at
> the Kafka level. We plan on relying on Kafka transactionality when the
> patch is committed. Without this, it's possible to output two different
> "correct" answers (for my definition of correct), or two of the same
> "correct" answers (for your definition of correct).
>
> > I created a JIRA ticket in case it helps spur action or keep the
> >conversation from getting lost.
>
> Awesome, thanks!
>
> Out of curiosity, what are you using Samza for? You seem to have quite a
> deep understanding of it. :)
>
> Cheers,
> Chris
>
> On 9/4/14 10:06 AM, "Roger Hoover" <roger.hoover@gmail.com> wrote:
>
> >Chris, Chinmay,
> >
> >After thinking more about it, I don't think we can get to the
> >deterministic
> >behavior we talked about until Kafka supports idempotent producers.  The
> >reason is that duplicate messages mean that we can't rely on strong
> >ordering.  If a duplicate of update U1 can show up anytime, then we can
> >never rely on ordered updates because we might see U1 then U2 then U1
> >again.  I guess we could try to handle this at the application layer but
> >not at the Samza layer yet, I think.
> >
> >Nonetheless, some of the changes we discussed may help get closer and
> >still
> >make sense to implement.  I created a JIRA ticket in case it helps spur
> >action or keep the conversation from getting lost.  If you don't find the
> >ticket useful at this time, feel free to close it.
> >
> >https://issues.apache.org/jira/browse/SAMZA-405
> >
> >Cheers,
> >
> >Roger
> >
> >
> >On Wed, Sep 3, 2014 at 11:26 AM, Chinmay Soman
> ><csoman@linkedin.com.invalid>
> >wrote:
> >
> >> > bootstrapping is ever necessary on recovery or rewind.  Seems like
> >>it's
> >> only needed for cold start.
> >>
> >> I think you're right. Either ways, it looks like there should be
> >> additional support for this.
> >> ________________________________________
> >> From: Roger Hoover [roger.hoover@gmail.com]
> >> Sent: Wednesday, September 03, 2014 9:43 AM
> >> To: dev@samza.incubator.apache.org
> >> Subject: Re: Trying to achieve deterministic behavior on recovery/rewind
> >>
> >> Chinmay,
> >>
> >> Thank you for the feedback.  Responses inline.
> >>
> >> On Tue, Sep 2, 2014 at 2:09 PM, Chinmay Soman
> >><csoman@linkedin.com.invalid
> >> >
> >> wrote:
> >>
> >> > That's interesting !
> >> >
> >> > 1) Deterministic reading from a bootstrap stream:
> >> > We could define a changelog for the local state (which in turn is
> >> > populated using a bootstrap stream). If the job fails at this point,
> >> > ideally, it should be restored using a changelog stream (instead of
> >> > bootstrapping again) in order for the job to be deterministic (as you
> >> > suggest). Thus there could be a check which either selects the
> >>bootstrap
> >> > mode or the changelog restore mode (depending on whether a changelog
> >> > exists).  I'm not sure if this check exists today (I would guess no).
> >> >
> >>
> >> Yes, I was wondering about this for event-table join.   If you're only
> >> storing the join table in the local store then the change log stream and
> >> the bootstrap stream are duplicates of each other.  One of them is
> >> unnecessary unless you add additional state to the local store.  In any
> >> case, I'm wondering if bootstrapping is ever necessary on recovery or
> >> rewind.  Seems like it's only needed for cold start.
> >>
> >>
> >> >
> >> > 2) Deterministic changelog:
> >> > You're right - there could be a (smallish) window where we re-process
> >> some
> >> > of the input records on a container restart. This can happen since the
> >> > changelog can be (a little ahead) of the last checkpoint for a given
> >> input
> >> > stream. However, I would argue the changelog is still deterministic in
> >> this
> >> > case. Since currently Samza only guarantees at-least-once semantics,
> >>this
> >> > seems to be OK
> >>
> >>
> >>
> >> Hmmm...seems like this will violate the correctness definition that
> >>Chris
> >> outlined where there may be two or more "correct" choices but the system
> >> would guarantee that only one will be produced.  But now that you
> >>mention
> >> it, I don't know if that can ever been guaranteed with an at-least-once
> >> system.  If a task can always see duplicates, it may process the first
> >>with
> >> it's local state in state S1 then modify it's state to S2 and then
> >>process
> >> the duplicate.
> >>
> >>
> >>
> >> >
> >> > 3) Deterministic MessageChooser:
> >> > The in-order problem could be avoided, if we restore the state from
> >>its
> >> > changelog - which was originally populated by a 'bootstrap stream'.
> >>The
> >> > task can then just pick up from where it left off (making the system
> >> > deterministic). Having said that, there might be value in writing an
> >> > 'EarliestFirstChooser'.
> >> >
> >>
> >> Agreed.
> >>
> >> >
> >> > Again, this is just my perception (which could be wrong - I'm still
> >> > learning).
> >> > C
> >> >
> >> > ________________________________________
> >> > From: Roger Hoover [roger.hoover@gmail.com]
> >> > Sent: Tuesday, September 02, 2014 8:52 AM
> >> > To: dev@samza.incubator.apache.org
> >> > Subject: Trying to achieve deterministic behavior on recovery/rewind
> >> >
> >> > Hi Samza devs,
> >> >
> >> > I think this project has the best documentation I've even seen!
> >>Amazing
> >> > job.  It's extremely well written and Hello Samza is a really great
> >> example
> >> > that I was able to run + modify without issue.  It was a joy reading
> >>the
> >> > docs and playing around with example.  Kudos!
> >> >
> >> > After thoroughly reading all the docs, I still have a few questions
> >>and
> >> > would appreciate any feedback.
> >> >
> >> > I was thinking about how to support deterministic behavior on
> >>recovery or
> >> > rewind.  Maybe it can't always be 100% deterministic but I think we
> >>can
> >> get
> >> > close.  Have other people thought about this?  Is it desirable?
> >> >
> >> > For example, let's say we're joining two streams: orders and
> >>user_info.
> >> As
> >> > orders come in, we use the user_id field of the order to lookup
> >> additional
> >> > information about the user and enrich the stream.  Say we're keeping
> >>all
> >> > the user_info state in the local KV store.
> >> >
> >> > t1: User updates her email to "foo1@bar.com"
> >> > t2: User buys a pair of jeans (product_id == 99)
> >> > t3: User updates her email to "foo2@bar.com"
> >> >
> >> > In the case of normal operation (no delays in the user_info stream),
> >>the
> >> > enriched record will be:
> >> >
> >> > {product_id: 99, email: "foo1@bar.com", ...}
> >> >
> >> > But say that our job fails before it can checkpoint and is configured
> >>to
> >> > bootstrap from user_info.  When it gets restarted and bootstraps from
> >>the
> >> > user_info stream, it will end up with the email set to "foo2@bar.com"
> >>in
> >> > the local KV store.  Then it will reprocess the order event and
> >>produce
> >> the
> >> > "wrong" output:
> >> >
> >> > {product_id: 99, email: "foo2@bar.com", ...}
> >> >
> >> > I haven't verified that but the documentation says "a bootstrap stream
> >> > waits for the consumer to explicitly confirm that the stream has been
> >> fully
> >> > consumed."  Shouldn't it wait until it's consumed up the the
> >>checkpoint
> >> > offset for the bootsrap stream instead (when there is saved checkpoint
> >> > offset)?
> >> >
> >> > Likewise, for local state replicated in the change log.  During the
> >> > checkpoint process, Samza could include it's producer offset in the
> >> > checkpoint data so that during recovery, the local state will be
> >>restored
> >> > to a state that corresponds with it's offsets for the input streams.
> >> >  Everything would be coherent rather than having the input streams
> >> restored
> >> > to checkpoint and local state restored to most recent value.  I'm
> >> assuming
> >> > that change log commits for local state and checkpoint are done
> >>together
> >> in
> >> > an atomic transaction so that they may not always match.
> >> >
> >> > The other missing piece is a nearly deterministic MessageChooser.
> >>During
> >> > recovery + rewind, all the messages in both streams are already
> >>present
> >> in
> >> > Kafka and we want a way to replay them in the same order as if they
> >>were
> >> > played in real-time.  The only way to approximate this behavior that I
> >> can
> >> > see is to use Kafka broker timestamps for each message.  Is it
> >>possible
> >> to
> >> > write an "EarliestFirstChooser" that always chooses the oldest message
> >> > available according to the timestamp it was received by the Kafka
> >>broker?
> >> >
> >> > I don't know if Kafka stores a timestamp with each message but I'm
> >> assuming
> >> > it does because it supports an API on the simple consumer called
> >> > getOffsetsBefore() that would seem to map from timestamps to offsets.
> >> >
> >> > Finally, a nit pick.  I'm using Samza 0.7.0 but the metrics data has
> >>the
> >> > version as {"samza-version": "0.0.1"}.  Is this intentional?
> >> >
> >> > If it makes sense, I can put in some JIRA tickets for this stuff...
> >> >
> >> > Cheers,
> >> >
> >> > Roger
> >> >
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message