samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: state management docs
Date Thu, 12 Sep 2013 00:28:38 GMT
Thanks for the feedback, comments inline!


On Mon, Sep 9, 2013 at 6:46 PM, Chris Riccomini <criccomini@linkedin.com>wrote:

> Hey Jay,
>
> Here are my notes.
>
> 1. I don't follow this:
>
> "However this means we have to recover up to a full window on fail-over.
> But this will not be very slow for large windows because of the amount of
> reprocessing. For larger windows or for effectively infinite windows it is
> better to make the in-process aggregation fault-tolerant rather than try
> to recompute it."
>
> Was this supposed to be, "But this will not be very slow for small
> windowsÅ "?
>

Yeah this has one too many negations fixed.


> 2. I'm not sure I understand what the example (user settings - user
> profiles) would be used for in the table-table join section.
>

Tried to improve, take a look and let me know what you think.

3. Recommend reversing the order of the phrasing in the table-stream join
> section to, "Example: Join page view data to user region information." For
> some reason my brain grasps it better when it's posed as stream->table
> join in the example.
>

Good point, clarified what is being "joined on".


> 4. "systems is to simply to periodically" .. one to too many? (ho ho :)
>

Fixed.

5. "mobile ui" -> "mobile UI"
>

Fixed.

6. "general feature: in general" .. Eliminate the second "in general".
>

Fixed.

7. I think this statement needs to be expanded upon: "In cases where we
> were querying the external database on each input message to join on
> additional data for our output stream we would now instead create an input
> stream coming from the remote database that captures the changes to the
> database."
>
> This is kind of key for table-table and table-stream joins, and it's not
> something that most people think about, or have. An example would suffice.
>

Agreed, I moved up the "Databases as input streams" section to right after
this example.

 8. "isolation issues goes away" -> "isolation issue goes away"
>

Fixed.


> 9. Should have a draw-backs section for the Samza approach. Namely lack of
> isolation within the machine (disk usage, iops, etc), and potentially slow
> restart time when recovering large state.
>

Added


> 10. "The store can abide by the same delivery and fault-tolerance
> guarantees that the Samza task itself does." .. Since it's change log is
> modeled as a stream.
>

Added that clarification.

11. "out-of-the-box and gives" -> "out-of-the-box that gives"
>

fixed


> 12. "let's us" -> "lets us"
>

fixed


> 13. "because is it an aggregation" -> "because it is an aggregation"
>

fixed


> 14. "stores.my-store.changelog=my-stream-name" Was there anything special
> about the change log stream? I recall there being some strange-ness with
> serdes, but I don't remember exactly what it was. Something like, if no
> serde is defined and the msg is a byte array, we don't double serialize?
>

Not that is relevant to the user, right?


> 15. Call out that the LevelDB-Java implementation that we're using is
> running with JNI. Just good to be aware, in case people see weird off-heap
> issues, or segfaults in their tasks.
>

Will do.


>
> Cheers,
> Chris
>
> On 9/9/13 12:58 PM, "Jay Kreps" <jay.kreps@gmail.com> wrote:
>
> >1. Yeah I think maybe the confusion is that we give the examples of
> >stateful processing without saying why. I tried to make it a little more
> >clear.
> >2. Tried to make the transition a little more clear. Samza kind of assumes
> >at least passing familiarity with Kafka so to some extent this is
> >unavoidable, but I think the problem is that it isn't clear why we are
> >talking about kafka (a stream implementation).
> >3. Not sure how to clarify the diagram (any suggestions?), but hopefully
> >improved the text.
> >
> >Let me know if you feel this helps:
> >
> >diff --git
> >a/docs/learn/documentation/0.7.0/container/state-management.mdb/docs/learn
> >/documentation/0.7.0/container/
> >state-management.md
> >index c23c7c3..2f9b740 100644
> >--- a/docs/learn/documentation/0.7.0/container/state-management.md
> >+++ b/docs/learn/documentation/0.7.0/container/state-management.md
> >@@ -5,25 +5,25 @@ title: State Management
> >
> > One of the more interesting aspects of Samza is the ability for tasks to
> >store data locally and execute rich queries against it.
> >
> >-Of course simple filtering or single-row transformations can be done
> >without any need for collecting state. A simple analogy to SQL may make
> >make this more obvious. The select- and where-clauses of a SQL query don't
> >usually require state: these can be executed a row at a time on input data
> >and maintain state between rows. The rest of SQL, multi-row aggregations
> >and joins, require more support to execute correctly in a streaming
> >fashion. Samza doesn't provide a high-level language like SQL but it does
> >provide lower-level primitives that make streaming aggregation and joins
> >and other stateful processing easy to implement.
> >+Of course simple filtering or single-row transformations can be done
> >without any need for collecting state. A simple analogy to SQL may make
> >this more obvious. The select- and where-clauses of a SQL query don't
> >usually require state: these can be executed a row at a time on input data
> >and maintain state between rows. The rest of SQL, multi-row aggregations
> >and joins, require more support to execute correctly in a streaming
> >fashion. Samza doesn't provide a high-level language like SQL but it does
> >provide lower-level primitives that make streaming aggregation and joins
> >and other stateful processing easy to implement.
> >
> > Let's dive into how this works and why it is useful.
> >
> > ### Common use cases for stateful processing
> >
> >-First, let's look at some simplistic examples of stateful stream
> >processing that might be seen on a consumer website.
> >+First, let's look at some simplistic examples of stateful stream
> >processing that might be seen on a consumer website. Later in this
> >document
> >we'll go through specific details of using Samza's built-in key-value
> >storage capabilities to implement each of these applications.
> >
> > ##### Windowed aggregation
> >
> > Example: Counting the number of page views for each user per hour
> >
> >-This kind of windowed processing is common for ranking and relevance,
> >"trending topics", as well as simple real-time reporting and monitoring.
> >+This kind of windowed processing is common for ranking and relevance,
> >"trending topics", as well as simple real-time reporting and monitoring.
> >For small windows one can just maintain the aggregate in memory and
> >manually commit the task position only at window boundaries. However this
> >means we have to recover up to a full window on fail-over. But this will
> >not be very slow for large windows because of the amount of reprocessing.
> >For larger windows or for effectively infinite windows it is better to
> >make
> >the in-process aggregation fault-tolerant rather than try to recompute it.
> >
> > ##### Table-table join
> >
> > Example: Join a table of user profiles to a table of user\_settings by
> >user\_id and emit the joined stream
> >
> >-This example is somewhat simplistic: one might wonder why you would want
> >to join two tables in a stream processing system. However consider a more
> >realistic example: real-time data normalization. E-commerce companies need
> >to handle product imports, web-crawlers need to update their [database of
> >the web](labs.yahoo.com/files/YahooWebmap.pdf), and social networks need
> >to normalize and index social data for search. Each of these processing
> >flows are emensely complex and contain many complex processing stages that
> >effectively join together and normalize many data sources into a single
> >clean feed.
> >+This example is somewhat simplistic: one might wonder why you would want
> >to join two tables in a stream processing system. However consider a more
> >realistic example: real-time data normalization. E-commerce companies need
> >to handle product imports, web-crawlers need to update their [database of
> >the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social
> networks
> >need to normalize and index social data for search. Each of these
> >processing flows are immensely complex and contain many complex processing
> >stages that effectively join together and normalize many data sources into
> >a single clean feed.
> >
> > ##### Table-stream join
> >
> >@@ -53,7 +53,7 @@ This approach works well enough if the in-memory state
> >consists of only a few va
> >
> > #### Using an external store
> >
> >-In the absence of built-in support a common pattern for stateful
> >processing is to push any state that would be accumulated between rows
> >into
> >an external database or key-value store. You get something that looks like
> >this:
> >+In the absence of built-in support a common pattern for stateful
> >processing is to push any state that would be accumulated between rows
> >into
> >an external database or key-value store. The database holds aggregates or
> >the dataset being queried to enrich the incoming stream. You get something
> >that looks like this:
> >
> >
> >![state-kv-store](/img/0.7.0/learn/documentation/container/stream_job_and_
> >db.png)
> >
> >@@ -63,7 +63,7 @@ Samza allows this style of processing (nothing will stop
> >you from querying a rem
> >
> > To understand why this is useful let's first understand some of the
> >drawbacks of making remote queries in a stream processing job:
> >
> >-1. **Performance**: The first major drawback of making remote queries is
> >that they are slow and expensive. A Kafka stream can deliver hundreds of
> >thousands or even millions of messages per second per CPU core because it
> >transfers large chunks of data at a time. But a remote database query is a
> >more expensive proposition. Though the database may be partitioned and
> >scalable this partitioning doesn't match the partitioning of the job into
> >tasks so batching becomes much less effective. As a result you would
> >expect
> >to get a few thousand queries per second per core for remote requests.
> >This
> >means that adding a processing stage that uses an external database will
> >often reduce the throughput by several orders of magnitude.
> >+1. **Performance**: The first major drawback of making remote queries is
> >that they are slow and expensive. For example, a Kafka stream can deliver
> >hundreds of thousands or even millions of messages per second per CPU core
> >because it transfers large chunks of data at a time. But a remote database
> >query is a more expensive proposition. Though the database may be
> >partitioned and scalable this partitioning doesn't match the partitioning
> >of the job into tasks so batching becomes much less effective. As a result
> >you would expect to get a few thousand queries per second per core for
> >remote requests. This means that adding a processing stage that uses an
> >external database will often reduce the throughput by several orders of
> >magnitude.
> > 1. **Isolation**: If your database or service is also running live
> >processing, mixing in asynchronous processing can be quite dangerous. A
> >scalable stream processing system can run with very high parallelism. If
> >such a job comes down (say for a code push) it queues up data for
> >processing, when it restarts it will potentially have a large backlog of
> >data to process. Since the job may actually have very high parallelism
> >this
> >can result in huge load spikes, many orders of magnitude higher than
> >steady
> >state load. If this load is mixed with live queries (i.e. the queries used
> >to build web pages or render mobile ui or anything else that has a user
> >waiting on the other end) then you may end up causing a denial-of-service
> >attack on your live service.
> > 1. **Query Capabilities**: Many scalable databases expose very limited
> >query interfaces--only supporting simple key-value lookups. Doing the
> >equivalent of a "full table scan" or rich traversal may not be practical
> >in
> >this model.
> > 1. **Correctness**: If your task keeps counts or otherwise modifies state
> >in a remote store how is this rolled back if the task fails?
> >@@ -80,6 +80,8 @@ You can think of this as taking the remote table out of
> >the remote database and
> >
> > Note that now the state is physically on the same machine as the tasks,
> >and each task has access only to its local partition. However the
> >combination of stateful tasks with the normal partitioning capabilities
> >Samza offers makes this a very general feature: in general you just
> >repartition on the key by which you want to split your processing and then
> >you have full local access to the data within storage in that partition.
> >
> >+In cases where we were querying the external database on each input
> >message to join on additional data for our output stream we would now
> >instead create an input stream coming from the remote database that
> >captures the changes to the database.
> >+
> > Let's look at how this addresses the problems of the remote store:
> >
> > 1. This fixes the performance issues of remote queries because the data
> >is
> >now local, what would otherwise be a remote query may now just be a lookup
> >against local memory or disk (we ship a [LevelDB](
> >https://code.google.com/p/leveldb)-based store which is described in
> >detail
> >below).
> >
> >
> >
> >
> >
> >On Sun, Sep 8, 2013 at 12:21 PM, Tejas Patil
> ><tejas.patil.cs@gmail.com>wrote:
> >
> >> I am probably the dumbest person on this list in terms of technical
> >> know-how, but hey,.. if I can understand the doc, then most people would
> >> understand easily :)
> >>
> >> Some comments:
> >>
> >> (1) What does a typical task state consist of ? An explicit example of
> >> "task state" would be helpful. There are couple of good examples in the
> >>doc
> >> but none of them say "hey, for this use case the task state is .."
> >>
> >> (2) "The problems of remote stores" -> "Performance":
> >> Before this point, there was no reference of Kafka at all in the doc and
> >> you suddenly start comparing things with Kafka stream. People w/o any
> >>Kafka
> >> background would not get that part.
> >>
> >> (3) "Approaches to managing task state" -> "Using an external store"
> >> The figure gave me an impression that tasks' o/p goes to 2 places: o/p
> >> stream and external store. However reading further made me realize that
> >>we
> >> just the task state to the external DB
> >>  which is different from o/p stream...right ?
> >>
> >> Trivial things:
> >> - "A simple analogy to SQL may make make this more obvious." : Word
> >>"make"
> >> occurs twice
> >> - The hyperlink for "database of the web" is not working
> >>
> >> Thanks,
> >> Tejas
> >>
> >>
> >> On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
> >>
> >> > I took a pass at improving the state management documentation
> >>(talking to
> >> > people, I don't think anyone understood what we were saying):
> >> >
> >> >
> >>
> >>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
> >>te-management.html
> >> >
> >> > I would love to get some feedback on this, especially from anyone who
> >> > doesn't already know Samza. Does this make any sense? Does it tell you
> >> what
> >> > you need to know in the right order?
> >> >
> >> > -Jay
> >> >
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message