samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: state management docs
Date Mon, 09 Sep 2013 19:58:04 GMT
1. Yeah I think maybe the confusion is that we give the examples of
stateful processing without saying why. I tried to make it a little more
clear.
2. Tried to make the transition a little more clear. Samza kind of assumes
at least passing familiarity with Kafka so to some extent this is
unavoidable, but I think the problem is that it isn't clear why we are
talking about kafka (a stream implementation).
3. Not sure how to clarify the diagram (any suggestions?), but hopefully
improved the text.

Let me know if you feel this helps:

diff --git a/docs/learn/documentation/0.7.0/container/state-management.mdb/docs/learn/documentation/0.7.0/container/
state-management.md
index c23c7c3..2f9b740 100644
--- a/docs/learn/documentation/0.7.0/container/state-management.md
+++ b/docs/learn/documentation/0.7.0/container/state-management.md
@@ -5,25 +5,25 @@ title: State Management

 One of the more interesting aspects of Samza is the ability for tasks to
store data locally and execute rich queries against it.

-Of course simple filtering or single-row transformations can be done
without any need for collecting state. A simple analogy to SQL may make
make this more obvious. The select- and where-clauses of a SQL query don't
usually require state: these can be executed a row at a time on input data
and maintain state between rows. The rest of SQL, multi-row aggregations
and joins, require more support to execute correctly in a streaming
fashion. Samza doesn't provide a high-level language like SQL but it does
provide lower-level primitives that make streaming aggregation and joins
and other stateful processing easy to implement.
+Of course simple filtering or single-row transformations can be done
without any need for collecting state. A simple analogy to SQL may make
this more obvious. The select- and where-clauses of a SQL query don't
usually require state: these can be executed a row at a time on input data
and maintain state between rows. The rest of SQL, multi-row aggregations
and joins, require more support to execute correctly in a streaming
fashion. Samza doesn't provide a high-level language like SQL but it does
provide lower-level primitives that make streaming aggregation and joins
and other stateful processing easy to implement.

 Let's dive into how this works and why it is useful.

 ### Common use cases for stateful processing

-First, let's look at some simplistic examples of stateful stream
processing that might be seen on a consumer website.
+First, let's look at some simplistic examples of stateful stream
processing that might be seen on a consumer website. Later in this document
we'll go through specific details of using Samza's built-in key-value
storage capabilities to implement each of these applications.

 ##### Windowed aggregation

 Example: Counting the number of page views for each user per hour

-This kind of windowed processing is common for ranking and relevance,
"trending topics", as well as simple real-time reporting and monitoring.
+This kind of windowed processing is common for ranking and relevance,
"trending topics", as well as simple real-time reporting and monitoring.
For small windows one can just maintain the aggregate in memory and
manually commit the task position only at window boundaries. However this
means we have to recover up to a full window on fail-over. But this will
not be very slow for large windows because of the amount of reprocessing.
For larger windows or for effectively infinite windows it is better to make
the in-process aggregation fault-tolerant rather than try to recompute it.

 ##### Table-table join

 Example: Join a table of user profiles to a table of user\_settings by
user\_id and emit the joined stream

-This example is somewhat simplistic: one might wonder why you would want
to join two tables in a stream processing system. However consider a more
realistic example: real-time data normalization. E-commerce companies need
to handle product imports, web-crawlers need to update their [database of
the web](labs.yahoo.com/files/YahooWebmap.pdf), and social networks need
to normalize and index social data for search. Each of these processing
flows are emensely complex and contain many complex processing stages that
effectively join together and normalize many data sources into a single
clean feed.
+This example is somewhat simplistic: one might wonder why you would want
to join two tables in a stream processing system. However consider a more
realistic example: real-time data normalization. E-commerce companies need
to handle product imports, web-crawlers need to update their [database of
the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social networks
need to normalize and index social data for search. Each of these
processing flows are immensely complex and contain many complex processing
stages that effectively join together and normalize many data sources into
a single clean feed.

 ##### Table-stream join

@@ -53,7 +53,7 @@ This approach works well enough if the in-memory state
consists of only a few va

 #### Using an external store

-In the absence of built-in support a common pattern for stateful
processing is to push any state that would be accumulated between rows into
an external database or key-value store. You get something that looks like
this:
+In the absence of built-in support a common pattern for stateful
processing is to push any state that would be accumulated between rows into
an external database or key-value store. The database holds aggregates or
the dataset being queried to enrich the incoming stream. You get something
that looks like this:

 ![state-kv-store](/img/0.7.0/learn/documentation/container/stream_job_and_db.png)

@@ -63,7 +63,7 @@ Samza allows this style of processing (nothing will stop
you from querying a rem

 To understand why this is useful let's first understand some of the
drawbacks of making remote queries in a stream processing job:

-1. **Performance**: The first major drawback of making remote queries is
that they are slow and expensive. A Kafka stream can deliver hundreds of
thousands or even millions of messages per second per CPU core because it
transfers large chunks of data at a time. But a remote database query is a
more expensive proposition. Though the database may be partitioned and
scalable this partitioning doesn't match the partitioning of the job into
tasks so batching becomes much less effective. As a result you would expect
to get a few thousand queries per second per core for remote requests. This
means that adding a processing stage that uses an external database will
often reduce the throughput by several orders of magnitude.
+1. **Performance**: The first major drawback of making remote queries is
that they are slow and expensive. For example, a Kafka stream can deliver
hundreds of thousands or even millions of messages per second per CPU core
because it transfers large chunks of data at a time. But a remote database
query is a more expensive proposition. Though the database may be
partitioned and scalable this partitioning doesn't match the partitioning
of the job into tasks so batching becomes much less effective. As a result
you would expect to get a few thousand queries per second per core for
remote requests. This means that adding a processing stage that uses an
external database will often reduce the throughput by several orders of
magnitude.
 1. **Isolation**: If your database or service is also running live
processing, mixing in asynchronous processing can be quite dangerous. A
scalable stream processing system can run with very high parallelism. If
such a job comes down (say for a code push) it queues up data for
processing, when it restarts it will potentially have a large backlog of
data to process. Since the job may actually have very high parallelism this
can result in huge load spikes, many orders of magnitude higher than steady
state load. If this load is mixed with live queries (i.e. the queries used
to build web pages or render mobile ui or anything else that has a user
waiting on the other end) then you may end up causing a denial-of-service
attack on your live service.
 1. **Query Capabilities**: Many scalable databases expose very limited
query interfaces--only supporting simple key-value lookups. Doing the
equivalent of a "full table scan" or rich traversal may not be practical in
this model.
 1. **Correctness**: If your task keeps counts or otherwise modifies state
in a remote store how is this rolled back if the task fails?
@@ -80,6 +80,8 @@ You can think of this as taking the remote table out of
the remote database and

 Note that now the state is physically on the same machine as the tasks,
and each task has access only to its local partition. However the
combination of stateful tasks with the normal partitioning capabilities
Samza offers makes this a very general feature: in general you just
repartition on the key by which you want to split your processing and then
you have full local access to the data within storage in that partition.

+In cases where we were querying the external database on each input
message to join on additional data for our output stream we would now
instead create an input stream coming from the remote database that
captures the changes to the database.
+
 Let's look at how this addresses the problems of the remote store:

 1. This fixes the performance issues of remote queries because the data is
now local, what would otherwise be a remote query may now just be a lookup
against local memory or disk (we ship a [LevelDB](
https://code.google.com/p/leveldb)-based store which is described in detail
below).





On Sun, Sep 8, 2013 at 12:21 PM, Tejas Patil <tejas.patil.cs@gmail.com>wrote:

> I am probably the dumbest person on this list in terms of technical
> know-how, but hey,.. if I can understand the doc, then most people would
> understand easily :)
>
> Some comments:
>
> (1) What does a typical task state consist of ? An explicit example of
> "task state" would be helpful. There are couple of good examples in the doc
> but none of them say "hey, for this use case the task state is .."
>
> (2) "The problems of remote stores" -> "Performance":
> Before this point, there was no reference of Kafka at all in the doc and
> you suddenly start comparing things with Kafka stream. People w/o any Kafka
> background would not get that part.
>
> (3) "Approaches to managing task state" -> "Using an external store"
> The figure gave me an impression that tasks' o/p goes to 2 places: o/p
> stream and external store. However reading further made me realize that we
> just the task state to the external DB
>  which is different from o/p stream...right ?
>
> Trivial things:
> - "A simple analogy to SQL may make make this more obvious." : Word "make"
> occurs twice
> - The hyperlink for "database of the web" is not working
>
> Thanks,
> Tejas
>
>
> On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
> > I took a pass at improving the state management documentation (talking to
> > people, I don't think anyone understood what we were saying):
> >
> >
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html
> >
> > I would love to get some feedback on this, especially from anyone who
> > doesn't already know Samza. Does this make any sense? Does it tell you
> what
> > you need to know in the right order?
> >
> > -Jay
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message