samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@linkedin.com>
Subject Re: state management docs
Date Tue, 10 Sep 2013 01:46:11 GMT
Hey Jay,

Here are my notes.

1. I don't follow this:

"However this means we have to recover up to a full window on fail-over.
But this will not be very slow for large windows because of the amount of
reprocessing. For larger windows or for effectively infinite windows it is
better to make the in-process aggregation fault-tolerant rather than try
to recompute it."

Was this supposed to be, "But this will not be very slow for small
windowsÅ "?

2. I'm not sure I understand what the example (user settings - user
profiles) would be used for in the table-table join section.

3. Recommend reversing the order of the phrasing in the table-stream join
section to, "Example: Join page view data to user region information." For
some reason my brain grasps it better when it's posed as stream->table
join in the example.

4. "systems is to simply to periodically" .. one to too many? (ho ho :)

5. "mobile ui" -> "mobile UI"

6. "general feature: in general" .. Eliminate the second "in general".

7. I think this statement needs to be expanded upon: "In cases where we
were querying the external database on each input message to join on
additional data for our output stream we would now instead create an input
stream coming from the remote database that captures the changes to the
database."

This is kind of key for table-table and table-stream joins, and it's not
something that most people think about, or have. An example would suffice.

8. "isolation issues goes away" -> "isolation issue goes away"

9. Should have a draw-backs section for the Samza approach. Namely lack of
isolation within the machine (disk usage, iops, etc), and potentially slow
restart time when recovering large state.

10. "The store can abide by the same delivery and fault-tolerance
guarantees that the Samza task itself does." .. Since it's change log is
modeled as a stream.

11. "out-of-the-box and gives" -> "out-of-the-box that gives"

12. "let's us" -> "lets us"

13. "because is it an aggregation" -> "because it is an aggregation"

14. "stores.my-store.changelog=my-stream-name" Was there anything special
about the change log stream? I recall there being some strange-ness with
serdes, but I don't remember exactly what it was. Something like, if no
serde is defined and the msg is a byte array, we don't double serialize?

15. Call out that the LevelDB-Java implementation that we're using is
running with JNI. Just good to be aware, in case people see weird off-heap
issues, or segfaults in their tasks.

Cheers,
Chris

On 9/9/13 12:58 PM, "Jay Kreps" <jay.kreps@gmail.com> wrote:

>1. Yeah I think maybe the confusion is that we give the examples of
>stateful processing without saying why. I tried to make it a little more
>clear.
>2. Tried to make the transition a little more clear. Samza kind of assumes
>at least passing familiarity with Kafka so to some extent this is
>unavoidable, but I think the problem is that it isn't clear why we are
>talking about kafka (a stream implementation).
>3. Not sure how to clarify the diagram (any suggestions?), but hopefully
>improved the text.
>
>Let me know if you feel this helps:
>
>diff --git 
>a/docs/learn/documentation/0.7.0/container/state-management.mdb/docs/learn
>/documentation/0.7.0/container/
>state-management.md
>index c23c7c3..2f9b740 100644
>--- a/docs/learn/documentation/0.7.0/container/state-management.md
>+++ b/docs/learn/documentation/0.7.0/container/state-management.md
>@@ -5,25 +5,25 @@ title: State Management
>
> One of the more interesting aspects of Samza is the ability for tasks to
>store data locally and execute rich queries against it.
>
>-Of course simple filtering or single-row transformations can be done
>without any need for collecting state. A simple analogy to SQL may make
>make this more obvious. The select- and where-clauses of a SQL query don't
>usually require state: these can be executed a row at a time on input data
>and maintain state between rows. The rest of SQL, multi-row aggregations
>and joins, require more support to execute correctly in a streaming
>fashion. Samza doesn't provide a high-level language like SQL but it does
>provide lower-level primitives that make streaming aggregation and joins
>and other stateful processing easy to implement.
>+Of course simple filtering or single-row transformations can be done
>without any need for collecting state. A simple analogy to SQL may make
>this more obvious. The select- and where-clauses of a SQL query don't
>usually require state: these can be executed a row at a time on input data
>and maintain state between rows. The rest of SQL, multi-row aggregations
>and joins, require more support to execute correctly in a streaming
>fashion. Samza doesn't provide a high-level language like SQL but it does
>provide lower-level primitives that make streaming aggregation and joins
>and other stateful processing easy to implement.
>
> Let's dive into how this works and why it is useful.
>
> ### Common use cases for stateful processing
>
>-First, let's look at some simplistic examples of stateful stream
>processing that might be seen on a consumer website.
>+First, let's look at some simplistic examples of stateful stream
>processing that might be seen on a consumer website. Later in this
>document
>we'll go through specific details of using Samza's built-in key-value
>storage capabilities to implement each of these applications.
>
> ##### Windowed aggregation
>
> Example: Counting the number of page views for each user per hour
>
>-This kind of windowed processing is common for ranking and relevance,
>"trending topics", as well as simple real-time reporting and monitoring.
>+This kind of windowed processing is common for ranking and relevance,
>"trending topics", as well as simple real-time reporting and monitoring.
>For small windows one can just maintain the aggregate in memory and
>manually commit the task position only at window boundaries. However this
>means we have to recover up to a full window on fail-over. But this will
>not be very slow for large windows because of the amount of reprocessing.
>For larger windows or for effectively infinite windows it is better to
>make
>the in-process aggregation fault-tolerant rather than try to recompute it.
>
> ##### Table-table join
>
> Example: Join a table of user profiles to a table of user\_settings by
>user\_id and emit the joined stream
>
>-This example is somewhat simplistic: one might wonder why you would want
>to join two tables in a stream processing system. However consider a more
>realistic example: real-time data normalization. E-commerce companies need
>to handle product imports, web-crawlers need to update their [database of
>the web](labs.yahoo.com/files/YahooWebmap.pdf), and social networks need
>to normalize and index social data for search. Each of these processing
>flows are emensely complex and contain many complex processing stages that
>effectively join together and normalize many data sources into a single
>clean feed.
>+This example is somewhat simplistic: one might wonder why you would want
>to join two tables in a stream processing system. However consider a more
>realistic example: real-time data normalization. E-commerce companies need
>to handle product imports, web-crawlers need to update their [database of
>the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social networks
>need to normalize and index social data for search. Each of these
>processing flows are immensely complex and contain many complex processing
>stages that effectively join together and normalize many data sources into
>a single clean feed.
>
> ##### Table-stream join
>
>@@ -53,7 +53,7 @@ This approach works well enough if the in-memory state
>consists of only a few va
>
> #### Using an external store
>
>-In the absence of built-in support a common pattern for stateful
>processing is to push any state that would be accumulated between rows
>into
>an external database or key-value store. You get something that looks like
>this:
>+In the absence of built-in support a common pattern for stateful
>processing is to push any state that would be accumulated between rows
>into
>an external database or key-value store. The database holds aggregates or
>the dataset being queried to enrich the incoming stream. You get something
>that looks like this:
>
> 
>![state-kv-store](/img/0.7.0/learn/documentation/container/stream_job_and_
>db.png)
>
>@@ -63,7 +63,7 @@ Samza allows this style of processing (nothing will stop
>you from querying a rem
>
> To understand why this is useful let's first understand some of the
>drawbacks of making remote queries in a stream processing job:
>
>-1. **Performance**: The first major drawback of making remote queries is
>that they are slow and expensive. A Kafka stream can deliver hundreds of
>thousands or even millions of messages per second per CPU core because it
>transfers large chunks of data at a time. But a remote database query is a
>more expensive proposition. Though the database may be partitioned and
>scalable this partitioning doesn't match the partitioning of the job into
>tasks so batching becomes much less effective. As a result you would
>expect
>to get a few thousand queries per second per core for remote requests.
>This
>means that adding a processing stage that uses an external database will
>often reduce the throughput by several orders of magnitude.
>+1. **Performance**: The first major drawback of making remote queries is
>that they are slow and expensive. For example, a Kafka stream can deliver
>hundreds of thousands or even millions of messages per second per CPU core
>because it transfers large chunks of data at a time. But a remote database
>query is a more expensive proposition. Though the database may be
>partitioned and scalable this partitioning doesn't match the partitioning
>of the job into tasks so batching becomes much less effective. As a result
>you would expect to get a few thousand queries per second per core for
>remote requests. This means that adding a processing stage that uses an
>external database will often reduce the throughput by several orders of
>magnitude.
> 1. **Isolation**: If your database or service is also running live
>processing, mixing in asynchronous processing can be quite dangerous. A
>scalable stream processing system can run with very high parallelism. If
>such a job comes down (say for a code push) it queues up data for
>processing, when it restarts it will potentially have a large backlog of
>data to process. Since the job may actually have very high parallelism
>this
>can result in huge load spikes, many orders of magnitude higher than
>steady
>state load. If this load is mixed with live queries (i.e. the queries used
>to build web pages or render mobile ui or anything else that has a user
>waiting on the other end) then you may end up causing a denial-of-service
>attack on your live service.
> 1. **Query Capabilities**: Many scalable databases expose very limited
>query interfaces--only supporting simple key-value lookups. Doing the
>equivalent of a "full table scan" or rich traversal may not be practical
>in
>this model.
> 1. **Correctness**: If your task keeps counts or otherwise modifies state
>in a remote store how is this rolled back if the task fails?
>@@ -80,6 +80,8 @@ You can think of this as taking the remote table out of
>the remote database and
>
> Note that now the state is physically on the same machine as the tasks,
>and each task has access only to its local partition. However the
>combination of stateful tasks with the normal partitioning capabilities
>Samza offers makes this a very general feature: in general you just
>repartition on the key by which you want to split your processing and then
>you have full local access to the data within storage in that partition.
>
>+In cases where we were querying the external database on each input
>message to join on additional data for our output stream we would now
>instead create an input stream coming from the remote database that
>captures the changes to the database.
>+
> Let's look at how this addresses the problems of the remote store:
>
> 1. This fixes the performance issues of remote queries because the data
>is
>now local, what would otherwise be a remote query may now just be a lookup
>against local memory or disk (we ship a [LevelDB](
>https://code.google.com/p/leveldb)-based store which is described in
>detail
>below).
>
>
>
>
>
>On Sun, Sep 8, 2013 at 12:21 PM, Tejas Patil
><tejas.patil.cs@gmail.com>wrote:
>
>> I am probably the dumbest person on this list in terms of technical
>> know-how, but hey,.. if I can understand the doc, then most people would
>> understand easily :)
>>
>> Some comments:
>>
>> (1) What does a typical task state consist of ? An explicit example of
>> "task state" would be helpful. There are couple of good examples in the
>>doc
>> but none of them say "hey, for this use case the task state is .."
>>
>> (2) "The problems of remote stores" -> "Performance":
>> Before this point, there was no reference of Kafka at all in the doc and
>> you suddenly start comparing things with Kafka stream. People w/o any
>>Kafka
>> background would not get that part.
>>
>> (3) "Approaches to managing task state" -> "Using an external store"
>> The figure gave me an impression that tasks' o/p goes to 2 places: o/p
>> stream and external store. However reading further made me realize that
>>we
>> just the task state to the external DB
>>  which is different from o/p stream...right ?
>>
>> Trivial things:
>> - "A simple analogy to SQL may make make this more obvious." : Word
>>"make"
>> occurs twice
>> - The hyperlink for "database of the web" is not working
>>
>> Thanks,
>> Tejas
>>
>>
>> On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>>
>> > I took a pass at improving the state management documentation
>>(talking to
>> > people, I don't think anyone understood what we were saying):
>> >
>> >
>> 
>>http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
>>te-management.html
>> >
>> > I would love to get some feedback on this, especially from anyone who
>> > doesn't already know Samza. Does this make any sense? Does it tell you
>> what
>> > you need to know in the right order?
>> >
>> > -Jay
>> >
>>


Mime
View raw message