samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roger Hoover <roger.hoo...@gmail.com>
Subject Re: Trying to achieve deterministic behavior on recovery/rewind
Date Tue, 09 Sep 2014 22:53:22 GMT
I realized that I'm not clear on the terminology. Is a job a packaged
collection of tasks?  If so, then the config called "job.name" is a little
confusing and perhaps should be task.name?



On Tue, Sep 9, 2014 at 3:43 PM, Roger Hoover <roger.hoover@gmail.com> wrote:

> Hi Chris,
>
> I think the config is not too bad as it.  Hey, it's not XML!!!
>
> Maybe there are ways to make it better.  These are some things that come
> to mind for me but I haven't really though through:
>
> - What about a way to specify a DAG for the job? From the developer's
> point of view, she mostly cares of the data flow.  Maybe there could a
> pluggable naming schema for topics in between jobs so that you don't have
> to explicitly name them???  You'd want a nice way to specify this.  YAML??
>  Using job-name:
>
> wikipedia-feed
>   - wikipedia-parser
>       - wikipedia-stats
>
> Ideally, that would be enough to wire everything together???
>
> - Support a programatic, code-level API for building, validating and
> deploying jobs?  Hopefully, this would make it possible to build
> higher-level frameworks on top that could dynamically generate jobs.  I
> don't know if I'd ever want to do this but if the API is there, you never
> know what will spring up.
> - Support for validation during build and during runtime initialization to
> catch errors early.
> - Can sensible defaults make the config less verbose?
>   - What about on/off switches for things like metrics and checkpointing?
>  If don't specify otherwise, you get the default metrics package and Kafka
> checkpointing.
>
>
> On Mon, Sep 8, 2014 at 3:36 PM, Chris Riccomini <
> criccomini@linkedin.com.invalid> wrote:
>
>> Hey Roger,
>>
>> This summary sounds pretty reasonable to me, on the Samza side, anyway. I
>> can't comment on the others, as I don't have a great understanding of
>> their internals.
>>
>> I agree with you about the knock on Samza's config system. It's something
>> I think we'll need to resolve, but we don't have a concrete proposal on
>> how, yet. Given that you've had a look at these three systems, do you have
>> any feedback on what you'd like to see. The topic-name issue is a
>> relatively specific problem, but maybe you have thoughts on a higher-level
>> design that you'd like to have?
>>
>> Right now, we have:
>>
>> 1. We mix wiring and config in config files. Wiring in configs leads to
>> runtime errors. We have no code-level "builder" for assembling a job, and
>> no DI framework.
>> 2. Everything is a k-v string pair. Nothing is hierarchical.
>> 3. Config is exposed to developers via a single Config object, that's
>> nothing more than a Map<String, String> with some helper methods.
>>
>> Do you have any thoughts on what you liked from the other frameworks, and
>> what you'd like to see Samza do in this area?
>>
>> Cheers,
>> Chris
>>
>> On 9/8/14 2:23 PM, "Roger Hoover" <roger.hoover@gmail.com> wrote:
>>
>> >Hi Chris,
>> >
>> >We've been looking at Samza for a number of use cases ranging from system
>> >monitoring to user activity analysis for a pretty large retail website.
>> > For system monitoring, for example, we want to push system performance
>> >events into Kafka then have Samza jobs enrich these events with
>> additional
>> >context such as information about the deployment topology and/or the user
>> >who triggered it.  Then we want to feed the enriched streams into various
>> >data stores including Elasticsearch and Hadoop for analysis.
>> >
>> >I've been diving into Samza, Spark Streaming, and Storm.  So far, I'm
>> >impressed with the simplicity of Samza.  I feel like it's the easiest to
>> >reason about and can scale up to larger teams where not everyone is an
>> >expert on the framework.  I may be wrong about some of my conclusions so
>> >I'm interested in anyone else wants to share their experience.
>> >
>> >Samza
>> >  - The simplest to reason about from a developer's perspective.  Tasks
>> >are
>> >single threaded and the API is very simple to understand.  Local state
>> >management API simplifies the developer's job a lot and should give
>> >excellent performance.  Scalability is well defined (parallelism is
>> >determined by number of Kafka partitions + number of tasks).  Topologies
>> >are explicit.  You create tasks for each step and wire them together.
>> >  - The simplest to operate because each task is independent from the
>> >others.  Kafka protects against backpressure and provides fine grain
>> fault
>> >tolerance.  Seems unlikely that a poorly written job/task will hurt more
>> >than itself.  Other than Kafka, there are minimal network dependencies.
>> > There are relatively few moving parts.
>> >  - The framework is pretty lightweight, relying Kafka and YARN to do
>> most
>> >of the work.  Fantastic documentation!
>> >  - The primary downside that I see is that there is a lot of wiring
>> >involved and may be easy to mess it up.  For example, topic names need to
>> >specified both in the code and the config and need to match across
>> >tasks/jobs.  You probably won't know that you've screwed up until you
>> find
>> >a task is not receiving any messages.  You also have to change both the
>> >code and the config if you change the name of a topic.  In larger
>> >organizations, these activities might be done by people on different
>> >teams.
>> >
>> >Storm
>> >- Trident will do some auto-wiring of topologies but it's hard to reason
>> >able the generated topologies, especially transactional behavior.
>> >- Managing non-local state can be tricky to get right.
>> >- There are lots of moving parts that have to be tuned (LMAX disruptor
>> >queues, 0mq queues)
>> >- Ships closures around the cluster.  Makes it hard to work with things
>> >like JRuby.
>> >- Framework provides things like cluster management/scheduling that can
>> be
>> >done elsewhere (YARN or Mesos)
>> >
>> >Spark Streaming
>> >- Quite a few things are implicit.  You write jobs as if they were going
>> >to
>> >execute in a single process.  These get broken into stages and scheduled
>> >on
>> >the cluster.  You need a pretty good understanding of this process to
>> >write, debug, and tune these jobs.  I think it's fairly easy to
>> >accidentally include a reference to a large Java object in worker code
>> and
>> >have it shipped across the network unintentionally.
>> >- All the code/objects that run on workers must be serializable.  I'm
>> >guessing this can be quite annoying.  Samza ships the jar to the worker
>> so
>> >I think it should be able to run anything (JRuby for example).
>> >- The driver process is a single point of failure, the only place were
>> >some
>> >job state is saved.
>> >- Seems operationally risky.  As far as I can tell, a slow consumer tie
>> up
>> >the entire cluster.  This is because there is no backpressure that stops
>> a
>> >driver from producing more and more DStreams, even if consumers cannot
>> >keep
>> >up.  The tuning guide suggests that everytime you make a change to a job,
>> >you need to tune your settings again (
>> >
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#perf
>> >ormance-tuning
>> >).
>> >
>> >Cheers,
>> >
>> >Roger
>> >
>> >
>> >On Fri, Sep 5, 2014 at 4:14 PM, Chris Riccomini <
>> >criccomini@linkedin.com.invalid> wrote:
>> >
>> >> Hey Roger,
>> >>
>> >> > After thinking more about it, I don't think we can get to the
>> >> >deterministic behavior we talked about until Kafka supports idempotent
>> >> >producers.
>> >>
>> >> Yes, I agree. To get fully deterministic behavior, you do need
>> >>idempotent
>> >> producers + deterministic message ordering (or Kafka transactionality)
>> >>at
>> >> the Kafka level. We plan on relying on Kafka transactionality when the
>> >> patch is committed. Without this, it's possible to output two different
>> >> "correct" answers (for my definition of correct), or two of the same
>> >> "correct" answers (for your definition of correct).
>> >>
>> >> > I created a JIRA ticket in case it helps spur action or keep the
>> >> >conversation from getting lost.
>> >>
>> >> Awesome, thanks!
>> >>
>> >> Out of curiosity, what are you using Samza for? You seem to have quite
>> a
>> >> deep understanding of it. :)
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 9/4/14 10:06 AM, "Roger Hoover" <roger.hoover@gmail.com> wrote:
>> >>
>> >> >Chris, Chinmay,
>> >> >
>> >> >After thinking more about it, I don't think we can get to the
>> >> >deterministic
>> >> >behavior we talked about until Kafka supports idempotent producers.
>> >>The
>> >> >reason is that duplicate messages mean that we can't rely on strong
>> >> >ordering.  If a duplicate of update U1 can show up anytime, then we
>> can
>> >> >never rely on ordered updates because we might see U1 then U2 then U1
>> >> >again.  I guess we could try to handle this at the application layer
>> >>but
>> >> >not at the Samza layer yet, I think.
>> >> >
>> >> >Nonetheless, some of the changes we discussed may help get closer and
>> >> >still
>> >> >make sense to implement.  I created a JIRA ticket in case it helps
>> spur
>> >> >action or keep the conversation from getting lost.  If you don't find
>> >>the
>> >> >ticket useful at this time, feel free to close it.
>> >> >
>> >> >https://issues.apache.org/jira/browse/SAMZA-405
>> >> >
>> >> >Cheers,
>> >> >
>> >> >Roger
>> >> >
>> >> >
>> >> >On Wed, Sep 3, 2014 at 11:26 AM, Chinmay Soman
>> >> ><csoman@linkedin.com.invalid>
>> >> >wrote:
>> >> >
>> >> >> > bootstrapping is ever necessary on recovery or rewind.  Seems
like
>> >> >>it's
>> >> >> only needed for cold start.
>> >> >>
>> >> >> I think you're right. Either ways, it looks like there should be
>> >> >> additional support for this.
>> >> >> ________________________________________
>> >> >> From: Roger Hoover [roger.hoover@gmail.com]
>> >> >> Sent: Wednesday, September 03, 2014 9:43 AM
>> >> >> To: dev@samza.incubator.apache.org
>> >> >> Subject: Re: Trying to achieve deterministic behavior on
>> >>recovery/rewind
>> >> >>
>> >> >> Chinmay,
>> >> >>
>> >> >> Thank you for the feedback.  Responses inline.
>> >> >>
>> >> >> On Tue, Sep 2, 2014 at 2:09 PM, Chinmay Soman
>> >> >><csoman@linkedin.com.invalid
>> >> >> >
>> >> >> wrote:
>> >> >>
>> >> >> > That's interesting !
>> >> >> >
>> >> >> > 1) Deterministic reading from a bootstrap stream:
>> >> >> > We could define a changelog for the local state (which in
turn is
>> >> >> > populated using a bootstrap stream). If the job fails at this
>> >>point,
>> >> >> > ideally, it should be restored using a changelog stream (instead
>> of
>> >> >> > bootstrapping again) in order for the job to be deterministic
(as
>> >>you
>> >> >> > suggest). Thus there could be a check which either selects
the
>> >> >>bootstrap
>> >> >> > mode or the changelog restore mode (depending on whether a
>> >>changelog
>> >> >> > exists).  I'm not sure if this check exists today (I would
guess
>> >>no).
>> >> >> >
>> >> >>
>> >> >> Yes, I was wondering about this for event-table join.   If you're
>> >>only
>> >> >> storing the join table in the local store then the change log stream
>> >>and
>> >> >> the bootstrap stream are duplicates of each other.  One of them
is
>> >> >> unnecessary unless you add additional state to the local store.
 In
>> >>any
>> >> >> case, I'm wondering if bootstrapping is ever necessary on recovery
>> or
>> >> >> rewind.  Seems like it's only needed for cold start.
>> >> >>
>> >> >>
>> >> >> >
>> >> >> > 2) Deterministic changelog:
>> >> >> > You're right - there could be a (smallish) window where we
>> >>re-process
>> >> >> some
>> >> >> > of the input records on a container restart. This can happen
since
>> >>the
>> >> >> > changelog can be (a little ahead) of the last checkpoint for
a
>> >>given
>> >> >> input
>> >> >> > stream. However, I would argue the changelog is still
>> >>deterministic in
>> >> >> this
>> >> >> > case. Since currently Samza only guarantees at-least-once
>> >>semantics,
>> >> >>this
>> >> >> > seems to be OK
>> >> >>
>> >> >>
>> >> >>
>> >> >> Hmmm...seems like this will violate the correctness definition
that
>> >> >>Chris
>> >> >> outlined where there may be two or more "correct" choices but the
>> >>system
>> >> >> would guarantee that only one will be produced.  But now that you
>> >> >>mention
>> >> >> it, I don't know if that can ever been guaranteed with an
>> >>at-least-once
>> >> >> system.  If a task can always see duplicates, it may process the
>> >>first
>> >> >>with
>> >> >> it's local state in state S1 then modify it's state to S2 and then
>> >> >>process
>> >> >> the duplicate.
>> >> >>
>> >> >>
>> >> >>
>> >> >> >
>> >> >> > 3) Deterministic MessageChooser:
>> >> >> > The in-order problem could be avoided, if we restore the state
>> from
>> >> >>its
>> >> >> > changelog - which was originally populated by a 'bootstrap
>> stream'.
>> >> >>The
>> >> >> > task can then just pick up from where it left off (making
the
>> >>system
>> >> >> > deterministic). Having said that, there might be value in
writing
>> >>an
>> >> >> > 'EarliestFirstChooser'.
>> >> >> >
>> >> >>
>> >> >> Agreed.
>> >> >>
>> >> >> >
>> >> >> > Again, this is just my perception (which could be wrong -
I'm
>> still
>> >> >> > learning).
>> >> >> > C
>> >> >> >
>> >> >> > ________________________________________
>> >> >> > From: Roger Hoover [roger.hoover@gmail.com]
>> >> >> > Sent: Tuesday, September 02, 2014 8:52 AM
>> >> >> > To: dev@samza.incubator.apache.org
>> >> >> > Subject: Trying to achieve deterministic behavior on
>> >>recovery/rewind
>> >> >> >
>> >> >> > Hi Samza devs,
>> >> >> >
>> >> >> > I think this project has the best documentation I've even
seen!
>> >> >>Amazing
>> >> >> > job.  It's extremely well written and Hello Samza is a really
>> great
>> >> >> example
>> >> >> > that I was able to run + modify without issue.  It was a joy
>> >>reading
>> >> >>the
>> >> >> > docs and playing around with example.  Kudos!
>> >> >> >
>> >> >> > After thoroughly reading all the docs, I still have a few
>> questions
>> >> >>and
>> >> >> > would appreciate any feedback.
>> >> >> >
>> >> >> > I was thinking about how to support deterministic behavior
on
>> >> >>recovery or
>> >> >> > rewind.  Maybe it can't always be 100% deterministic but I
think
>> we
>> >> >>can
>> >> >> get
>> >> >> > close.  Have other people thought about this?  Is it desirable?
>> >> >> >
>> >> >> > For example, let's say we're joining two streams: orders and
>> >> >>user_info.
>> >> >> As
>> >> >> > orders come in, we use the user_id field of the order to lookup
>> >> >> additional
>> >> >> > information about the user and enrich the stream.  Say we're
>> >>keeping
>> >> >>all
>> >> >> > the user_info state in the local KV store.
>> >> >> >
>> >> >> > t1: User updates her email to "foo1@bar.com"
>> >> >> > t2: User buys a pair of jeans (product_id == 99)
>> >> >> > t3: User updates her email to "foo2@bar.com"
>> >> >> >
>> >> >> > In the case of normal operation (no delays in the user_info
>> >>stream),
>> >> >>the
>> >> >> > enriched record will be:
>> >> >> >
>> >> >> > {product_id: 99, email: "foo1@bar.com", ...}
>> >> >> >
>> >> >> > But say that our job fails before it can checkpoint and is
>> >>configured
>> >> >>to
>> >> >> > bootstrap from user_info.  When it gets restarted and bootstraps
>> >>from
>> >> >>the
>> >> >> > user_info stream, it will end up with the email set to
>> >>"foo2@bar.com"
>> >> >>in
>> >> >> > the local KV store.  Then it will reprocess the order event
and
>> >> >>produce
>> >> >> the
>> >> >> > "wrong" output:
>> >> >> >
>> >> >> > {product_id: 99, email: "foo2@bar.com", ...}
>> >> >> >
>> >> >> > I haven't verified that but the documentation says "a bootstrap
>> >>stream
>> >> >> > waits for the consumer to explicitly confirm that the stream
has
>> >>been
>> >> >> fully
>> >> >> > consumed."  Shouldn't it wait until it's consumed up the the
>> >> >>checkpoint
>> >> >> > offset for the bootsrap stream instead (when there is saved
>> >>checkpoint
>> >> >> > offset)?
>> >> >> >
>> >> >> > Likewise, for local state replicated in the change log.  During
>> the
>> >> >> > checkpoint process, Samza could include it's producer offset
in
>> the
>> >> >> > checkpoint data so that during recovery, the local state will
be
>> >> >>restored
>> >> >> > to a state that corresponds with it's offsets for the input
>> >>streams.
>> >> >> >  Everything would be coherent rather than having the input
streams
>> >> >> restored
>> >> >> > to checkpoint and local state restored to most recent value.
 I'm
>> >> >> assuming
>> >> >> > that change log commits for local state and checkpoint are
done
>> >> >>together
>> >> >> in
>> >> >> > an atomic transaction so that they may not always match.
>> >> >> >
>> >> >> > The other missing piece is a nearly deterministic MessageChooser.
>> >> >>During
>> >> >> > recovery + rewind, all the messages in both streams are already
>> >> >>present
>> >> >> in
>> >> >> > Kafka and we want a way to replay them in the same order as
if
>> they
>> >> >>were
>> >> >> > played in real-time.  The only way to approximate this behavior
>> >>that I
>> >> >> can
>> >> >> > see is to use Kafka broker timestamps for each message.  Is
it
>> >> >>possible
>> >> >> to
>> >> >> > write an "EarliestFirstChooser" that always chooses the oldest
>> >>message
>> >> >> > available according to the timestamp it was received by the
Kafka
>> >> >>broker?
>> >> >> >
>> >> >> > I don't know if Kafka stores a timestamp with each message
but I'm
>> >> >> assuming
>> >> >> > it does because it supports an API on the simple consumer
called
>> >> >> > getOffsetsBefore() that would seem to map from timestamps
to
>> >>offsets.
>> >> >> >
>> >> >> > Finally, a nit pick.  I'm using Samza 0.7.0 but the metrics
data
>> >>has
>> >> >>the
>> >> >> > version as {"samza-version": "0.0.1"}.  Is this intentional?
>> >> >> >
>> >> >> > If it makes sense, I can put in some JIRA tickets for this
>> stuff...
>> >> >> >
>> >> >> > Cheers,
>> >> >> >
>> >> >> > Roger
>> >> >> >
>> >> >>
>> >>
>> >>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message