beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: [BEAM-10112] Add state and timer python examples to website
Date Fri, 05 Jun 2020 18:06:10 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fef8cfe  [BEAM-10112] Add state and timer python examples to website
     new d4ad5d4  Merge pull request #11882 from y1chi/BEAM-10112
fef8cfe is described below

commit fef8cfe01ce5b0053de5e8b8dbe4b78886191c2f
Author: Yichi Zhang <zyichi@google.com>
AuthorDate: Mon Jun 1 13:33:09 2020 -0700

    [BEAM-10112] Add state and timer python examples to website
---
 .../content/en/documentation/programming-guide.md  | 136 ++++++++++++++++++++-
 1 file changed, 132 insertions(+), 4 deletions(-)

diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 82bd053..ae3d955 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -4337,7 +4337,7 @@ written for that key. State is always fully scoped only to the current
processin
 
 Windowing can still be used together with stateful processing. All state for a key is scoped
to the current window. This
 means that the first time a key is seen for a given window any state reads will return empty,
and that a runner can
-garbage collect state when a window is completed. It's also often useful to use Beam's windowed
aggegations prior to
+garbage collect state when a window is completed. It's also often useful to use Beam's windowed
aggregations prior to
 the stateful operator. For example, using a combiner to preaggregate data, and then storing
aggregated data inside of
 state. Merging windows are not currently supported when using state and timers.
 
@@ -4346,9 +4346,17 @@ care must be taken to remember that the elements in input PCollection
have no gu
 program logic is resilient to this. Unit tests written using the DirectRunner will shuffle
the order of element
 processing, and are recommended to test for correctness.
 
+{{< paragraph class="language-java" >}}
 In Java DoFn declares states to be accessed by creating final `StateSpec` member variables
representing each state. Each
 state must be named using the `StateId` annotation; this name is unique to a ParDo in the
graph and has no relation
 to other nodes in the graph. A `DoFn` can declare multiple state variables.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+In Python DoFn declares states to be accessed by creating `StateSpec` class member variables
representing each state. Each 
+`StateSpec` is initialized with a name, this name is unique to a ParDo in the graph and has
no relation
+to other nodes in the graph. A `DoFn` can declare multiple state variables.
+{{< /paragraph >}}
 
 ### 11.1 Types of state {#types-of-state}
 
@@ -4404,6 +4412,17 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>()
{
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class CombiningStateDoFn(DoFn):
+  SUM_TOTAL = CombiningValueStateSpec('total', sum)
+  
+  def process(self, element, state=SoFn.StateParam(SUM_TOTAL)):
+    state.add(1)
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'Combine state pardo' >> beam.ParDo(CombiningStateDofn()))
+{{< /highlight >}}
+
 #### BagState
 
 A common use case for state is to accumulate multiple elements. `BagState` allows for accumulating
an unordered set
@@ -4431,6 +4450,21 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>()
{
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class BagStateDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
+  
+  def process(self, element_pair, state=DoFn.StateParam(ALL_ELEMENTS)):
+    state.add(element_pair[1])
+    if should_fetch():
+      all_elements = list(state.read())
+      process_values(all_elements)
+      state.clear()
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'Bag state pardo' >> beam.ParDo(BagStateDoFn()))
+{{< /highlight >}}
+
 ### 11.2 Deferred state reads {#deferred-state-reads}
 
 When a `DoFn` contains multiple state specifications, reading each one in order can be slow.
Calling the `read()` function
@@ -4478,7 +4512,7 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>()
{
     }
    
     // The runner can now batch all three states into a single read, reducing latency.
-     processState1(state1.read());
+    processState1(state1.read());
     processState2(state2.read());
     processState3(state3.read());
   }
@@ -4520,6 +4554,28 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>()
{
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class EventTimerDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
+  TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
+  
+  def process(self, 
+              element_pair, 
+              t = DoFn.TimestampParam,
+              buffer = DoFn.StateParam(ALL_ELEMENTS), 
+              timer = DoFn.TimerParam(TIMER)):
+    buffer.add(element_pair[1])
+    # Set an event-time timer to the element timestamp.
+    timer.set(t)
+  
+  @on_timer(TIMER)
+  def expiry_callback(self, buffer = DoFn.StateParam(ALL_ELEMENTS)):
+    state.clear()
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'EventTime timer pardo' >> beam.ParDo(EventTimerDoFn()))
+{{< /highlight >}}
+
 #### 11.3.2 Processing-time timers {#processing-time-timers}
 
 Processing-time timers fire when the real wall-clock time passes. This is often used to create
larger batches of data
@@ -4527,7 +4583,7 @@ before processing. It can also be used to schedule events that should
occur at a
 event-time timers, processing-time timers are per key - each key has a separate copy of the
timer.
 
 While processing-time timers can be set to an absolute timestamp, it is very common to set
them to an offset relative 
-to the current time. The `Timer.offset` and `Timer.setRelative` methods can be used to accomplish
this.
+to the current time. In Java, the `Timer.offset` and `Timer.setRelative` methods can be used
to accomplish this.
 
 {{< highlight java >}}
 PCollection<KV<String, ValueT>> perUser = readPerUser();
@@ -4546,6 +4602,28 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>()
{
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class ProcessingTimerDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
+  TIMER = TimerSpec('timer', TimeDomain.REAL_TIME)
+  
+  def process(self, 
+              element_pair, 
+              buffer = DoFn.StateParam(ALL_ELEMENTS), 
+              timer = DoFn.TimerParam(TIMER)):
+    buffer.add(element_pair[1])
+    # Set a timer to go off 30 seconds in the future.
+    timer.set(Timestamp.now() + Duration(seconds=30))
+  
+  @on_timer(TIMER)
+  def expiry_callback(self, buffer = DoFn.StateParam(ALL_ELEMENTS)):
+    # Process timer.
+    state.clear()
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'ProcessingTime timer pardo' >> beam.ParDo(ProcessingTimerDoFn()))
+{{< /highlight >}}
+
 #### 11.3.3 Dynamic timer tags {#dynamic-timer-tags}
 
 Beam also supports dynamically setting a timer tag using `TimerMap`. This allows for setting
multiple different timers
@@ -4573,6 +4651,9 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>()
{
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+To be supported, See BEAM-9602
+{{< /highlight >}}
 #### 11.3.4 Timer output timestamps {#timer-output-timestamps}
 
 By default, event-time timers will hold the output watermark of the `ParDo` to the timestamp
of the timer. This means
@@ -4684,7 +4765,7 @@ performance. There are two common strategies for garbage collecting
state.
 All state and timers for a key is scoped to the window it is in. This means that depending
on the timestamp of the 
 input element the ParDo will see different values for the state depending on the window that
element falls into. In
 addition, once the input watermark passes the end of the window, the runner should garbage
collect all state for that
-window. (note: if allowed lateness is set to a positive value for the window, the runner
must wait for the watemark to
+window. (note: if allowed lateness is set to a positive value for the window, the runner
must wait for the watermark to
 pass the end of the window plus the allowed lateness before garbage collecting state). This
can be used as a 
 garbage-collection strategy.
 
@@ -4704,6 +4785,20 @@ perUser.apply(Window.into(CalendarWindows.days(1)
          }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class StateDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
+  
+  def process(self, 
+              element_pair, 
+              buffer = DoFn.StateParam(ALL_ELEMENTS)):
+    ...
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'Windowing' >> beam.WindowInto(FixedWindows(60 * 60 * 24))
+       | 'DoFn' >> beam.ParDo(StateDoFn()))
+{{< /highlight >}}
+
 This `ParDo` stores state per day. Once the pipeline is done processing data for a given
day, all the state for that
 day is garbage collected.
 
@@ -4751,6 +4846,39 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>()
{
  }
 {{< /highlight >}}
 
+{{< highlight python >}}
+class UserDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('state', coders.VarIntCoder())
+  MAX_TIMESTAMP = CombiningValueStateSpec('max_timestamp_seen', max)
+  TIMER = TimerSpec('gc-timer', TimeDomain.WATERMARK)
+  
+  def process(self, 
+              element, 
+              t = DoFn.TimestampParam,
+              state = DoFn.StateParam(ALL_ELEMENTS), 
+              max_timestamp = DoFn.StateParam(MAX_TIMESTAMP),
+              timer = DoFn.TimerParam(TIMER)):
+    update_state(state, element)
+    max_timestamp.add(t.micros)
+    
+    # Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting
the same timer, so 
+    # as long as there is activity on this key the state will stay active. Once the key goes
inactive for one hour's
+    # worth of event time (as measured by the watermark), then the gc timer will fire.
+    expiration_time = Timestamp(micros=max_timestamp.read()) + Duration(seconds=60*60)
+    timer.set(expiration_time)
+  
+  @on_timer(TIMER)
+  def expiry_callback(self, 
+                      state = DoFn.StateParam(ALL_ELEMENTS),
+                      max_timestamp = DoFn.StateParam(MAX_TIMESTAMP)):
+    state.clear()
+    max_timestamp.clear()
+  
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'User DoFn' >> beam.ParDo(UserDoFn()))
+{{< /highlight >}}
+
 ### 11.5 State and timers examples {#state-timers-examples}
 
 Following are some example uses of state and timers


Mime
View raw message