beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From git-site-r...@apache.org
Subject [beam] branch asf-site updated: Publishing website 2021/01/26 06:03:13 at commit dde38b6
Date Tue, 26 Jan 2021 06:03:44 GMT
This is an automated email from the ASF dual-hosted git repository.

git-site-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new ab8dcc0  Publishing website 2021/01/26 06:03:13 at commit dde38b6
ab8dcc0 is described below

commit ab8dcc0cceaa215064010e5e1a77be3c37ee8c7e
Author: jenkins <builds@apache.org>
AuthorDate: Tue Jan 26 06:03:13 2021 +0000

    Publishing website 2021/01/26 06:03:13 at commit dde38b6
---
 website/generated-content/documentation/index.xml  | 138 ++++++++++++++++++---
 .../documentation/programming-guide/index.html     | 134 +++++++++++++++++---
 website/generated-content/sitemap.xml              |   2 +-
 3 files changed, 239 insertions(+), 35 deletions(-)

diff --git a/website/generated-content/documentation/index.xml b/website/generated-content/documentation/index.xml
index 04e9667..4faffd6 100644
--- a/website/generated-content/documentation/index.xml
+++ b/website/generated-content/documentation/index.xml
@@ -6455,6 +6455,7 @@ read and modified inside the DoFn&amp;rsquo;s &lt;code>@ProcessElement&lt;/code>
 registered, then Beam will automatically infer the coder for the state value. Otherwise, a coder can be explicitly
 specified when creating the ValueState. For example, the following ParDo creates a single state variable that
 accumulates the number of elements seen.&lt;/p>
+&lt;p>Note: &lt;code>ValueState&lt;/code> is called &lt;code>ReadModifyWriteState&lt;/code> in the Python SDK.&lt;/p>
 &lt;div class=language-java>
 &lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-java" data-lang="java">&lt;span class="n">PCollection&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span class="o">,&lt;/span> &lt;span class="n">ValueT&lt;/span>&lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">perUser&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">readPerUser&lt;/ [...]
 &lt;span class="n">perUser&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="k">new&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span class="o">,&lt;/span [...]
@@ -6476,6 +6477,16 @@ accumulates the number of elements seen.&lt;/p>
 &lt;span class="o">...&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">ReadModifyWriteStateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="n">STATE_SPEC&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">ReadModifyWriteStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;num_elements&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">element&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">STATE_SPEC&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="c1"># Read the number element seen so far for this user key.&lt;/span>
+&lt;span class="n">current_value&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">read&lt;/span>&lt;span class="p">()&lt;/span> &lt;span class="ow">or&lt;/span> &lt;span class="mi">0&lt;/span>
+&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">write&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">current_value&lt;/span>&lt;span class="o">+&lt;/span>&lt;span class="mi">1&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;state pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">ReadModifyWriteStateDoFn&lt;/span>&lt;span class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h4 id="combiningstate">CombiningState&lt;/h4>
 &lt;p>&lt;code>CombiningState&lt;/code> allows you to create a state object that is updated using a Beam combiner. For example, the previous
 &lt;code>ValueState&lt;/code> example could be rewritten to use &lt;code>CombiningState&lt;/code>&lt;/p>
@@ -6489,10 +6500,10 @@ accumulates the number of elements seen.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> &lt;span class="nc">CombiningStateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">CombiningStateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">SUM_TOTAL&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">CombiningValueStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;total&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="nb">sum&lt;/span>&lt;span class="p">)&lt;/span>
-&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">element&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">SoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">SUM_TOTAL&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">element&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">SUM_TOTAL&lt;/span>&lt;span class="p">)):&lt;/span>
 &lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="mi">1&lt;/span>&lt;span class="p">)&lt;/span>
 &lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
 &lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Combine state pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">CombiningStateDofn&lt;/span>&lt;span class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
@@ -6520,8 +6531,8 @@ bags larger than available memory.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> &lt;span class="nc">BagStateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">BagStateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">element_pair&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p" [...]
 &lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">element_pair&lt;/span>&lt;span class="p">[&lt;/span>&lt;span class="mi">1&lt;/span>&lt;span class="p">])&lt;/span>
@@ -6553,6 +6564,9 @@ runner can prefetch all of the states necessary. For example:&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="n">This&lt;/span> &lt;span class="ow">is&lt;/span> &lt;span class="ow">not&lt;/span> &lt;span class="n">supported&lt;/span> &lt;span class="n">yet&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">see&lt;/span> &lt;span class="n">BEAM&lt;/span>&lt;span class="o">-&lt;/span>&lt;span class="mf">11506.&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;p>If however there are code paths in which the states are not fetched, then annotating with @AlwaysFetched will add
 unnecessary fetching for those paths. In this case, the readLater method allows the runner to know that the state will
 be read in the future, allowing multiple state reads to be batched together.&lt;/p>
@@ -6606,8 +6620,8 @@ allows for event-time aggregations.&lt;/p>
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> &lt;span class="nc">EventTimerDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">EventTimerDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">WATERMARK&lt;/span>&lt;span class="p">)&lt;/span>
 &lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
@@ -6644,8 +6658,8 @@ to the current time. In Java, the &lt;code>Timer.offset&lt;/code> and &lt;code>T
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> &lt;span class="nc">ProcessingTimerDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">ProcessingTimerDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">REAL_TIME&lt;/span>&lt;span class="p">)&lt;/span>
 &lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
@@ -6663,11 +6677,13 @@ to the current time. In Java, the &lt;code>Timer.offset&lt;/code> and &lt;code>T
 &lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;ProcessingTime timer pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">ProcessingTimerDoFn&lt;/span>&lt;span class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
 &lt;h4 id="dynamic-timer-tags">11.3.3. Dynamic timer tags&lt;/h4>
-&lt;p>Beam also supports dynamically setting a timer tag using &lt;code>TimerMap&lt;/code>. This allows for setting multiple different timers
+&lt;p>Beam also supports dynamically setting a timer tag using &lt;code>TimerMap&lt;/code> in the Java SDK. This allows for setting multiple different timers
 in a &lt;code>DoFn&lt;/code> and allowing for the timer tags to be dynamically chosen - e.g. based on data in the input elements. A
 timer with a specific tag can only be set to a single timestamp, so setting the timer again has the effect of
 overwriting the previous expiration time for the timer with that tag. Each &lt;code>TimerMap&lt;/code> is identified with a timer family
 id, and timers in different timer families are independent.&lt;/p>
+&lt;p>In the Python SDK, a dynamic timer tag can be specified while calling &lt;code>set()&lt;/code> or &lt;code>clear()&lt;/code>. By default, the timer
+tag is an empty string if not specified.&lt;/p>
 &lt;div class=language-java>
 &lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-java" data-lang="java">&lt;span class="n">PCollection&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span class="o">,&lt;/span> &lt;span class="n">ValueT&lt;/span>&lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">perUser&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">readPerUser&lt;/ [...]
 &lt;span class="n">perUser&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="k">new&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span class="o">,&lt;/span [...]
@@ -6684,8 +6700,28 @@ id, and timers in different timer families are independent.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-python" data-lang="python">&lt;span class="n">To&lt;/span> &lt;span class="n">be&lt;/span> &lt;span class="n">supported&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">See&lt;/span> &lt;span class="n">BEAM&lt;/span>&lt;span class="o">-&lt;/span>&lt;span class="mi">9602&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">TimerDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">REAL_TIME&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">element_pair&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">timer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">TIMER&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">element_pair&lt;/span>&lt;span class="p">[&lt;/span>&lt;span class="mi">1&lt;/span>&lt;span class="p">])&lt;/span>
+&lt;span class="c1"># Set a timer to go off 30 seconds in the future with dynamic timer tag &amp;#39;first_timer&amp;#39;.&lt;/span>
+&lt;span class="c1"># And set a timer to go off 60 seconds in the future with dynamic timer tag &amp;#39;second_timer&amp;#39;.&lt;/span>
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">Timestamp&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">now&lt;/span>&lt;span class="p">()&lt;/span> &lt;span class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="mi">30&lt;/span>&lt;span class="p">),&lt;/span> &lt;sp [...]
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">Timestamp&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">now&lt;/span>&lt;span class="p">()&lt;/span> &lt;span class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="mi">60&lt;/span>&lt;span class="p">),&lt;/span> &lt;sp [...]
+&lt;span class="c1"># Note that a timer can also be explicitly cleared if previously set with a dynamic timer tag:&lt;/span>
+&lt;span class="c1"># timer.clear(dynamic_timer_tag=...)&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">TIMER&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">expiry_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">),&lt;/span> &lt;span class="n">timer_tag&lt;/span>&lt;span [...]
+&lt;span class="c1"># Process timer, the dynamic timer tag associated with expiring timer can be read back with DoFn.DynamicTimerTagParam.&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="k">yield&lt;/span> &lt;span class="p">(&lt;/span>&lt;span class="n">timer_tag&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="s1">&amp;#39;fired&amp;#39;&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;ProcessingTime timer pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">TimerDoFn&lt;/span>&lt;span class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
 &lt;h4 id="timer-output-timestamps">11.3.4. Timer output timestamps&lt;/h4>
 &lt;p>By default, event-time timers will hold the output watermark of the &lt;code>ParDo&lt;/code> to the timestamp of the timer. This means
@@ -6778,6 +6814,9 @@ past the timestamp of the minimum element. The following code demonstrates this.
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="n">Timer&lt;/span> &lt;span class="n">output&lt;/span> &lt;span class="n">timestamps&lt;/span> &lt;span class="ow">is&lt;/span> &lt;span class="ow">not&lt;/span> &lt;span class="n">yet&lt;/span> &lt;span class="n">supported&lt;/span> &lt;span class="ow">in&lt;/span> &lt;span class="n">Python&lt;/span> &lt;span class="n">SDK&lt;/span>&lt;span class="o">.&lt;/span> &lt;span class="n [...]
+&lt;/div>
 &lt;h3 id="garbage-collecting-state">11.4. Garbage collecting state&lt;/h3>
 &lt;p>Per-key state needs to be garbage collected, or eventually the increasing size of state may negatively impact
 performance. There are two common strategies for garbage collecting state.&lt;/p>
@@ -6802,8 +6841,8 @@ garbage-collection strategy.&lt;/p>
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> &lt;span class="nc">StateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">StateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
 &lt;span class="n">element_pair&lt;/span>&lt;span class="p">,&lt;/span>
@@ -6851,8 +6890,8 @@ This can be done by updating a timer that garbage collects state. For example&lt
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> &lt;span class="nc">UserDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">UserDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;state&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="n">MAX_TIMESTAMP&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">CombiningValueStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;max_timestamp_seen&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="nb">max&lt;/span>&lt;span class="p">)&lt;/span>
 &lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;gc-timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">WATERMARK&lt;/span>&lt;span class="p">)&lt;/span>
@@ -6896,7 +6935,7 @@ event before the view event. The one hour join timeout should be based on event
 &lt;/span>&lt;span class="c1">&lt;/span>&lt;span class="n">PCollection&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span class="o">,&lt;/span> &lt;span class="n">Event&lt;/span>&lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">eventsPerLinkId&lt;/span> &lt;span class="o">=&lt;/span>
 &lt;span class="n">readEvents&lt;/span>&lt;span class="o">()&lt;/span>
 &lt;span class="o">.&lt;/span>&lt;span class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="n">WithKeys&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="n">Event&lt;/span>&lt;span class="o">::&lt;/span>&lt;span class="n">getLinkId&lt;/span>&lt;span class="o">).&lt;/span>&lt;span class="na">withKeyType&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="n">TypeDescriptors&lt;/span>&lt;span class="o"> [...]
-&lt;span class="n">perUser&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="k">new&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span class="o">,&lt;/span [...]
+&lt;span class="n">eventsPerLinkId&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="k">new&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span class="o">,& [...]
 &lt;span class="c1">// Store the view event.
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="nd">@StateId&lt;/span>&lt;span class="o">(&lt;/span>&lt;span class="s">&amp;#34;view&amp;#34;&lt;/span>&lt;span class="o">)&lt;/span> &lt;span class="kd">private&lt;/span> &lt;span class="kd">final&lt;/span> &lt;span class="n">StateSpec&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">ValueState&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span class="n">Event&lt;/span>&lt;span class="o">&amp;gt;&amp;gt;&lt;/sp [...]
 &lt;span class="c1">// Store the click event.
@@ -6951,6 +6990,50 @@ event before the view event. The one hour join timeout should be based on event
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">JoinDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="c1"># stores the view event.&lt;/span>
+&lt;span class="n">VIEW_STATE_SPEC&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">ReadModifyWriteStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;view&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">EventCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="c1"># stores the click event.&lt;/span>
+&lt;span class="n">CLICK_STATE_SPEC&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">ReadModifyWriteStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;click&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">EventCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="c1"># The maximum element timestamp value seen so far.&lt;/span>
+&lt;span class="n">MAX_TIMESTAMP&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">CombiningValueStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;max_timestamp_seen&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="nb">max&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="c1"># Timer that fires when an hour goes by with an incomplete join.&lt;/span>
+&lt;span class="n">GC_TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;gc&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">WATERMARK&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">element&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">VIEW_STATE_SPEC&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">CLICK_STATE_SPEC&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">MAX_TIMESTAMP&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">ts&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">TimestampParam&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">gc&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">GC_TIMER&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="n">event&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">element&lt;/span>
+&lt;span class="k">if&lt;/span> &lt;span class="n">event&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">type&lt;/span> &lt;span class="o">==&lt;/span> &lt;span class="s1">&amp;#39;view&amp;#39;&lt;/span>&lt;span class="p">:&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">write&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">event&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">else&lt;/span>&lt;span class="p">:&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">write&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">event&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">previous_view&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">view&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">read&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">previous_click&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">click&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">read&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="c1"># We&amp;#39;ve seen both a view and a click. Output a joined event and clear state.&lt;/span>
+&lt;span class="k">if&lt;/span> &lt;span class="n">previous_view&lt;/span> &lt;span class="ow">and&lt;/span> &lt;span class="n">previous_click&lt;/span>&lt;span class="p">:&lt;/span>
+&lt;span class="k">yield&lt;/span> &lt;span class="p">(&lt;/span>&lt;span class="n">previous_view&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">previous_click&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="k">else&lt;/span>&lt;span class="p">:&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">ts&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">gc&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">read&lt;/span>&lt;span class="p">()&lt;/span> &lt;span class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="mi">3600&lt;/span>&lt;span class="p">))&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">GC_TIMER&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">gc_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">VIEW_STATE_SPEC&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">CLICK_STATE_SPEC&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">MAX_TIMESTAMP&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;EventsPerLinkId&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">ReadPerLinkEvents&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Join DoFn&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">JoinDoFn&lt;/span>&lt;span class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h4 id="batching-rpcs">11.5.2. Batching RPCs&lt;/h4>
 &lt;p>In this example, input elements are being forwarded to an external RPC service. The RPC accepts batch requests -
 multiple events for the same user can be batched in a single RPC call. Since this RPC service also imposes rate limits,
@@ -6987,6 +7070,27 @@ we want to batch ten seconds worth of events together in order to reduce the num
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" data-lang="py">&lt;span class="k">class&lt;/span> &lt;span class="nc">BufferDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="n">BUFFER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">EventCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">IS_TIMER_SET&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">ReadModifyWriteStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;is_timer_set&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">BooleanCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">OUTPUT&lt;/span> &lt;span class="o">=&lt;/span> &lt;span class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="s1">&amp;#39;output&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> &lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">REAL_TIME&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">BUFFER&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">is_timer_set&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">IS_TIMER_SET&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">timer&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">OUTPUT&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">element&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">if&lt;/span> &lt;span class="ow">not&lt;/span> &lt;span class="n">is_timer_set&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">read&lt;/span>&lt;span class="p">():&lt;/span>
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">Timestamp&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">now&lt;/span>&lt;span class="p">()&lt;/span> &lt;span class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="mi">10&lt;/span>&lt;span class="p">))&lt;/span>
+&lt;span class="n">is_timer_set&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">write&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">True&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">OUTPUT&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">output_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">BUFFER&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">is_timer_set&lt;/span>&lt;span class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="n">IS_TIMER_SET&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="n">send_rpc&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="nb">list&lt;/span>&lt;span class="p">(&lt;/span>&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">read&lt;/span>&lt;span class="p">()))&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">is_timer_set&lt;/span>&lt;span class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h2 id="splittable-dofns">12. Splittable &lt;code>DoFns&lt;/code>&lt;/h2>
 &lt;p>A Splittable &lt;code>DoFn&lt;/code> (SDF) enables users to create modular components containing I/Os (and some advanced
 &lt;a href="https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv">non I/O use cases&lt;/a>). Having modular
diff --git a/website/generated-content/documentation/programming-guide/index.html b/website/generated-content/documentation/programming-guide/index.html
index 637506d..b1d044c 100644
--- a/website/generated-content/documentation/programming-guide/index.html
+++ b/website/generated-content/documentation/programming-guide/index.html
@@ -2280,7 +2280,7 @@ to other nodes in the graph. A <code>DoFn</code> can declare multiple state vari
 read and modified inside the DoFn&rsquo;s <code>@ProcessElement</code> or <code>@OnTimer</code> methods. If the type of the ValueState has a coder
 registered, then Beam will automatically infer the coder for the state value. Otherwise, a coder can be explicitly
 specified when creating the ValueState. For example, the following ParDo creates a single state variable that
-accumulates the number of elements seen.</p><div class=language-java><div class=highlight><pre class=chroma><code class=language-java data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> <span class=n>readPerUser</span><span class=o>();</span>
+accumulates the number of elements seen.</p><p>Note: <code>ValueState</code> is called <code>ReadModifyWriteState</code> in the Python SDK.</p><div class=language-java><div class=highlight><pre class=chroma><code class=language-java data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> <sp [...]
 <span class=n>perUser</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@StateId</span><span class=o>(</span><span class=s>&#34;state&#34;</span><span class=o>)</span> <span class=kd>private</span> <span class=kd>final</span> <span class=n>StateSpec</span><span class=o>&lt;</span><span class=n>ValueState</span><span class=o>&lt;</span><span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=n>numElements</span> <span class=o>=</span> <span class=n>StateSpecs</span><span class=o>.</span><span class=na>value</span><span class=o>() [...]
 
@@ -2295,7 +2295,16 @@ accumulates the number of elements seen.</p><div class=language-java><div class=
 <span class=n>perUser</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@StateId</span><span class=o>(</span><span class=s>&#34;state&#34;</span><span class=o>)</span> <span class=kd>private</span> <span class=kd>final</span> <span class=n>StateSpec</span><span class=o>&lt;</span><span class=n>ValueState</span><span class=o>&lt;</span><span class=n>MyType</span><span class=o>&gt;&gt;</span> <span class=n>numElements</span> <span class=o>=</span> <span class=n>StateSpecs</span><span class=o>.</span><span class=na>value</span><span class=o>(</ [...]
                  <span class=o>...</span>
-<span class=o>}));</span></code></pre></div></div><h4 id=combiningstate>CombiningState</h4><p><code>CombiningState</code> allows you to create a state object that is updated using a Beam combiner. For example, the previous
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>ReadModifyWriteStateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+  <span class=n>STATE_SPEC</span> <span class=o>=</span> <span class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span class=s1>&#39;num_elements&#39;</span><span class=p>,</span> <span class=n>VarIntCoder</span><span class=p>())</span>
+
+  <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>element</span><span class=p>,</span> <span class=n>state</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>STATE_SPEC</span><span class=p>)):</span>
+    <span class=c1># Read the number element seen so far for this user key.</span>
+    <span class=n>current_value</span> <span class=o>=</span> <span class=n>state</span><span class=o>.</span><span class=n>read</span><span class=p>()</span> <span class=ow>or</span> <span class=mi>0</span>
+    <span class=n>state</span><span class=o>.</span><span class=n>write</span><span class=p>(</span><span class=n>current_value</span><span class=o>+</span><span class=mi>1</span><span class=p>)</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per user&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;state pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>ReadModifyWriteStateDoFn</span><span class=p>()))</span></code></pre></div></div><h4 id=combiningstate>CombiningState</h4><p><code>CombiningState</code> allows you to create a state object that is updated using a Beam combiner. For example, the previous
 <code>ValueState</code> example could be rewritten to use <code>CombiningState</code></p><div class=language-java><div class=highlight><pre class=chroma><code class=language-java data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> <span class=n>readPerUser</span><s [...]
 <span class=n>perUser</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@StateId</span><span class=o>(</span><span class=s>&#34;state&#34;</span><span class=o>)</span> <span class=kd>private</span> <span class=kd>final</span> <span class=n>StateSpec</span><span class=o>&lt;</span><span class=n>CombiningState</span><span class=o>&lt;</span><span class=n>Integer</span><span class=o>,</span> <span class=kt>int</span><span class=o>[],</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=n>numElements</span> <span class=o> [...]
@@ -2304,10 +2313,10 @@ accumulates the number of elements seen.</p><div class=language-java><div class=
   <span class=nd>@ProcessElement</span> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>process</span><span class=o>(</span><span class=nd>@StateId</span><span class=o>(</span><span class=s>&#34;state&#34;</span><span class=o>)</span> <span class=n>ValueState</span><span class=o>&lt;</span><span class=n>Integer</span><span class=o>&gt;</span> <span class=n>state</span><span class=o>)</span> <span class=o>{</span>
     <span class=n>state</span><span class=o>.</span><span class=na>add</span><span class=o>(</span><span class=n>1</span><span class=o>);</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div class=language-python><div class=highlight><pre class=chroma><code class=language-python data-lang=python><span class=k>class</span> <span class=nc>CombiningStateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>CombiningStateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>SUM_TOTAL</span> <span class=o>=</span> <span class=n>CombiningValueStateSpec</span><span class=p>(</span><span class=s1>&#39;total&#39;</span><span class=p>,</span> <span class=nb>sum</span><span class=p>)</span>
 
-  <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>element</span><span class=p>,</span> <span class=n>state</span><span class=o>=</span><span class=n>SoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>SUM_TOTAL</span><span class=p>)):</span>
+  <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>element</span><span class=p>,</span> <span class=n>state</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>SUM_TOTAL</span><span class=p>)):</span>
     <span class=n>state</span><span class=o>.</span><span class=n>add</span><span class=p>(</span><span class=mi>1</span><span class=p>)</span>
 
 <span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per user&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ReadPerUser</span><span class=p>()</span>
@@ -2330,7 +2339,7 @@ bags larger than available memory.</p><div class=language-java><div class=highli
       <span class=n>state</span><span class=o>.</span><span class=na>clear</span><span class=o>();</span>  <span class=c1>// Clear the state for this key.
 </span><span class=c1></span>    <span class=o>}</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div class=language-python><div class=highlight><pre class=chroma><code class=language-python data-lang=python><span class=k>class</span> <span class=nc>BagStateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>BagStateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span class=n>BagStateSpec</span><span class=p>(</span><span class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span class=n>coders</span><span class=o>.</span><span class=n>VarIntCoder</span><span class=p>())</span>
 
   <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>element_pair</span><span class=p>,</span> <span class=n>state</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>)):</span>
@@ -2358,7 +2367,7 @@ runner can prefetch all of the states necessary. For example:</p><div class=lang
     <span class=n>state2</span><span class=o>.</span><span class=na>read</span><span class=o>();</span>
     <span class=n>state3</span><span class=o>.</span><span class=na>read</span><span class=o>();</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><p>If however there are code paths in which the states are not fetched, then annotating with @AlwaysFetched will add
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=n>This</span> <span class=ow>is</span> <span class=ow>not</span> <span class=n>supported</span> <span class=n>yet</span><span class=p>,</span> <span class=n>see</span> <span class=n>BEAM</span><span class=o>-</span><span class=mf>11506.</span></code></pre></div></div><p>If however there are code paths in which the states are not  [...]
 unnecessary fetching for those paths. In this case, the readLater method allows the runner to know that the state will
 be read in the future, allowing multiple state reads to be batched together.</p><div class=language-java><div class=highlight><pre class=chroma><code class=language-java data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> <span class=n>readPerUser</span><span class [...]
 <span class=n>perUser</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
@@ -2404,7 +2413,7 @@ allows for event-time aggregations.</p><div class=language-java><div class=highl
    <span class=nd>@OnTimer</span><span class=o>(</span><span class=s>&#34;timer&#34;</span><span class=o>)</span> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
       <span class=c1>//Process timer.
 </span><span class=c1></span>   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div class=language-python><div class=highlight><pre class=chroma><code class=language-python data-lang=python><span class=k>class</span> <span class=nc>EventTimerDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>EventTimerDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span class=n>BagStateSpec</span><span class=p>(</span><span class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span class=n>coders</span><span class=o>.</span><span class=n>VarIntCoder</span><span class=p>())</span>
   <span class=n>TIMER</span> <span class=o>=</span> <span class=n>TimerSpec</span><span class=p>(</span><span class=s1>&#39;timer&#39;</span><span class=p>,</span> <span class=n>TimeDomain</span><span class=o>.</span><span class=n>WATERMARK</span><span class=p>)</span>
 
@@ -2438,7 +2447,7 @@ to the current time. In Java, the <code>Timer.offset</code> and <code>Timer.setR
    <span class=nd>@OnTimer</span><span class=o>(</span><span class=s>&#34;timer&#34;</span><span class=o>)</span> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
       <span class=c1>//Process timer.
 </span><span class=c1></span>   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div class=language-python><div class=highlight><pre class=chroma><code class=language-python data-lang=python><span class=k>class</span> <span class=nc>ProcessingTimerDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>ProcessingTimerDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span class=n>BagStateSpec</span><span class=p>(</span><span class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span class=n>coders</span><span class=o>.</span><span class=n>VarIntCoder</span><span class=p>())</span>
   <span class=n>TIMER</span> <span class=o>=</span> <span class=n>TimerSpec</span><span class=p>(</span><span class=s1>&#39;timer&#39;</span><span class=p>,</span> <span class=n>TimeDomain</span><span class=o>.</span><span class=n>REAL_TIME</span><span class=p>)</span>
 
@@ -2456,11 +2465,12 @@ to the current time. In Java, the <code>Timer.offset</code> and <code>Timer.setR
     <span class=n>state</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
 
 <span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per user&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ReadPerUser</span><span class=p>()</span>
-       <span class=o>|</span> <span class=s1>&#39;ProcessingTime timer pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>ProcessingTimerDoFn</span><span class=p>()))</span></code></pre></div></div><h4 id=dynamic-timer-tags>11.3.3. Dynamic timer tags</h4><p>Beam also supports dynamically setting a timer tag using <code>TimerMap</code>. This allows for setting multiple different timers
+       <span class=o>|</span> <span class=s1>&#39;ProcessingTime timer pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>ProcessingTimerDoFn</span><span class=p>()))</span></code></pre></div></div><h4 id=dynamic-timer-tags>11.3.3. Dynamic timer tags</h4><p>Beam also supports dynamically setting a timer tag using <code>TimerMap</code> in the Java SDK. This allows for setting multiple [...]
 in a <code>DoFn</code> and allowing for the timer tags to be dynamically chosen - e.g. based on data in the input elements. A
 timer with a specific tag can only be set to a single timestamp, so setting the timer again has the effect of
 overwriting the previous expiration time for the timer with that tag. Each <code>TimerMap</code> is identified with a timer family
-id, and timers in different timer families are independent.</p><div class=language-java><div class=highlight><pre class=chroma><code class=language-java data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> <span class=n>readPerUser</span><span class=o>();</span>
+id, and timers in different timer families are independent.</p><p>In the Python SDK, a dynamic timer tag can be specified while calling <code>set()</code> or <code>clear()</code>. By default, the timer
+tag is an empty string if not specified.</p><div class=language-java><div class=highlight><pre class=chroma><code class=language-java data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> <span class=n>readPerUser</span><span class=o>();</span>
 <span class=n>perUser</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@TimerFamily</span><span class=o>(</span><span class=s>&#34;actionTimers&#34;</span><span class=o>)</span> <span class=kd>private</span> <span class=kd>final</span> <span class=n>TimerSpec</span> <span class=n>timer</span> <span class=o>=</span>
     <span class=n>TimerSpecs</span><span class=o>.</span><span class=na>timerMap</span><span class=o>(</span><span class=n>TimeDomain</span><span class=o>.</span><span class=na>EVENT_TIME</span><span class=o>);</span>
@@ -2475,7 +2485,30 @@ id, and timers in different timer families are independent.</p><div class=langua
    <span class=nd>@OnTimerFamily</span><span class=o>(</span><span class=s>&#34;actionTimers&#34;</span><span class=o>)</span> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>onTimer</span><span class=o>(</span><span class=nd>@TimerId</span> <span class=n>String</span> <span class=n>timerId</span><span class=o>)</span> <span class=o>{</span>
      <span class=n>LOG</span><span class=o>.</span><span class=na>info</span><span class=o>(</span><span class=s>&#34;Timer fired with id &#34;</span> <span class=o>+</span> <span class=n>timerId</span><span class=o>);</span>
    <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div class=language-python><div class=highlight><pre class=chroma><code class=language-python data-lang=python><span class=n>To</span> <span class=n>be</span> <span class=n>supported</span><span class=p>,</span> <span class=n>See</span> <span class=n>BEAM</span><span class=o>-</span><span class=mi>9602</span></code></pre></div></div><h4 id=timer-output-timestamps>11.3.4. Timer output timestamps</h4><p>By default, event-time timers will ho [...]
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>TimerDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+  <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span class=n>BagStateSpec</span><span class=p>(</span><span class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span class=n>coders</span><span class=o>.</span><span class=n>VarIntCoder</span><span class=p>())</span>
+  <span class=n>TIMER</span> <span class=o>=</span> <span class=n>TimerSpec</span><span class=p>(</span><span class=s1>&#39;timer&#39;</span><span class=p>,</span> <span class=n>TimeDomain</span><span class=o>.</span><span class=n>REAL_TIME</span><span class=p>)</span>
+
+  <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span>
+              <span class=n>element_pair</span><span class=p>,</span>
+              <span class=nb>buffer</span> <span class=o>=</span> <span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span>
+              <span class=n>timer</span> <span class=o>=</span> <span class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+    <span class=nb>buffer</span><span class=o>.</span><span class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span class=p>[</span><span class=mi>1</span><span class=p>])</span>
+    <span class=c1># Set a timer to go off 30 seconds in the future with dynamic timer tag &#39;first_timer&#39;.</span>
+    <span class=c1># And set a timer to go off 60 seconds in the future with dynamic timer tag &#39;second_timer&#39;.</span>
+    <span class=n>timer</span><span class=o>.</span><span class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span class=o>.</span><span class=n>now</span><span class=p>()</span> <span class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span class=o>=</span><span class=mi>30</span><span class=p>),</span> <span class=n>dynamic_timer_tag</span><span class=o>=</span><span class=s1>&#39;first_timer&#39;</span><span class=p>)</span>
+    <span class=n>timer</span><span class=o>.</span><span class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span class=o>.</span><span class=n>now</span><span class=p>()</span> <span class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span class=o>=</span><span class=mi>60</span><span class=p>),</span> <span class=n>dynamic_timer_tag</span><span class=o>=</span><span class=s1>&#39;second_timer&#39;</span><span class=p>)</span>
+    <span class=c1># Note that a timer can also be explicitly cleared if previously set with a dynamic timer tag:</span>
+    <span class=c1># timer.clear(dynamic_timer_tag=...)</span>
+
+  <span class=nd>@on_timer</span><span class=p>(</span><span class=n>TIMER</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>expiry_callback</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=nb>buffer</span> <span class=o>=</span> <span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span> <span class=n>timer_tag</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>DynamicTimerTagParam</span><span cl [...]
+    <span class=c1># Process timer, the dynamic timer tag associated with expiring timer can be read back with DoFn.DynamicTimerTagParam.</span>
+    <span class=nb>buffer</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
+    <span class=k>yield</span> <span class=p>(</span><span class=n>timer_tag</span><span class=p>,</span> <span class=s1>&#39;fired&#39;</span><span class=p>)</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per user&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;ProcessingTime timer pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>TimerDoFn</span><span class=p>()))</span></code></pre></div></div><h4 id=timer-output-timestamps>11.3.4. Timer output timestamps</h4><p>By default, event-time timers will hold the output watermark of the <code>ParDo</code> to the timestamp of the timer. This means
 that if a timer is set to 12pm, any windowed aggregations or event-time timers later in the pipeline graph that finish
 after 12pm will not expire. The timestamp of the timer is also the default output timestamp for the timer callback. This
 means that any elements output from the onTimer method will have a timestamp equal to the timestamp of the timer firing.
@@ -2562,7 +2595,7 @@ past the timestamp of the minimum element. The following code demonstrates this.
     <span class=c1>// Note that the timer has now fired.
 </span><span class=c1></span>    <span class=n>timerTimestamp</span><span class=o>.</span><span class=na>clear</span><span class=o>();</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h3 id=garbage-collecting-state>11.4. Garbage collecting state</h3><p>Per-key state needs to be garbage collected, or eventually the increasing size of state may negatively impact
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=n>Timer</span> <span class=n>output</span> <span class=n>timestamps</span> <span class=ow>is</span> <span class=ow>not</span> <span class=n>yet</span> <span class=n>supported</span> <span class=ow>in</span> <span class=n>Python</span> <span class=n>SDK</span><span class=o>.</span> <span class=n>See</span> <span class=n>BEAM</span [...]
 performance. There are two common strategies for garbage collecting state.</p><h5 id=using-windows-for-garbage-collection>11.4.1. <strong>Using windows for garbage collection</strong></h5><p>All state and timers for a key is scoped to the window it is in. This means that depending on the timestamp of the
 input element the ParDo will see different values for the state depending on the window that element falls into. In
 addition, once the input watermark passes the end of the window, the runner should garbage collect all state for that
@@ -2578,7 +2611,7 @@ garbage-collection strategy.</p><p>For example, given the following:</p><div cla
               <span class=c1>// The state is scoped to a calendar day window. That means that if the input timestamp ts is after
 </span><span class=c1></span>              <span class=c1>// midnight PST, then a new copy of the state will be seen for the next day.
 </span><span class=c1></span>           <span class=o>}</span>
-         <span class=o>}));</span></code></pre></div></div><div class=language-python><div class=highlight><pre class=chroma><code class=language-python data-lang=python><span class=k>class</span> <span class=nc>StateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+         <span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>StateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span class=n>BagStateSpec</span><span class=p>(</span><span class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span class=n>coders</span><span class=o>.</span><span class=n>VarIntCoder</span><span class=p>())</span>
 
   <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span>
@@ -2625,7 +2658,7 @@ This can be done by updating a timer that garbage collects state. For example</p
 </span><span class=c1></span>       <span class=n>state</span><span class=o>.</span><span class=na>clear</span><span class=o>();</span>
        <span class=n>maxTimestamp</span><span class=o>.</span><span class=na>clear</span><span class=o>();</span>
     <span class=o>}</span>
- <span class=o>}</span></code></pre></div></div><div class=language-python><div class=highlight><pre class=chroma><code class=language-python data-lang=python><span class=k>class</span> <span class=nc>UserDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+ <span class=o>}</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>UserDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span class=n>BagStateSpec</span><span class=p>(</span><span class=s1>&#39;state&#39;</span><span class=p>,</span> <span class=n>coders</span><span class=o>.</span><span class=n>VarIntCoder</span><span class=p>())</span>
   <span class=n>MAX_TIMESTAMP</span> <span class=o>=</span> <span class=n>CombiningValueStateSpec</span><span class=p>(</span><span class=s1>&#39;max_timestamp_seen&#39;</span><span class=p>,</span> <span class=nb>max</span><span class=p>)</span>
   <span class=n>TIMER</span> <span class=o>=</span> <span class=n>TimerSpec</span><span class=p>(</span><span class=s1>&#39;gc-timer&#39;</span><span class=p>,</span> <span class=n>TimeDomain</span><span class=o>.</span><span class=n>WATERMARK</span><span class=p>)</span>
@@ -2667,7 +2700,7 @@ event before the view event. The one hour join timeout should be based on event
     <span class=n>readEvents</span><span class=o>()</span>
     <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>WithKeys</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>Event</span><span class=o>::</span><span class=n>getLinkId</span><span class=o>).</span><span class=na>withKeyType</span><span class=o>(</span><span class=n>TypeDescriptors</span><span class=o>.</span><span class=na>strings</span><span class=o>()));</span>
 
-<span class=n>perUser</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Event</span><span class=o>&gt;,</span> <span class=n>JoinedEvent</span><span class=o>&gt;()</span> <span class=o [...]
+<span class=n>eventsPerLinkId</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Event</span><span class=o>&gt;,</span> <span class=n>JoinedEvent</span><span class=o>&gt;()</span> <span [...]
   <span class=c1>// Store the view event.
 </span><span class=c1></span>  <span class=nd>@StateId</span><span class=o>(</span><span class=s>&#34;view&#34;</span><span class=o>)</span> <span class=kd>private</span> <span class=kd>final</span> <span class=n>StateSpec</span><span class=o>&lt;</span><span class=n>ValueState</span><span class=o>&lt;</span><span class=n>Event</span><span class=o>&gt;&gt;</span> <span class=n>viewState</span> <span class=o>=</span> <span class=n>StateSpecs</span><span class=o>.</span><span class=na>valu [...]
   <span class=c1>// Store the click event.
@@ -2726,7 +2759,54 @@ event before the view event. The one hour join timeout should be based on event
       <span class=n>clickState</span><span class=o>.</span><span class=na>clear</span><span class=o>();</span>
       <span class=n>maxTimestampState</span><span class=o>.</span><span class=na>clear</span><span class=o>();</span>
     <span class=o>}</span>
- <span class=o>}));</span></code></pre></div></div><h4 id=batching-rpcs>11.5.2. Batching RPCs</h4><p>In this example, input elements are being forwarded to an external RPC service. The RPC accepts batch requests -
+ <span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>JoinDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+  <span class=c1># stores the view event.</span>
+  <span class=n>VIEW_STATE_SPEC</span> <span class=o>=</span> <span class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span class=s1>&#39;view&#39;</span><span class=p>,</span> <span class=n>EventCoder</span><span class=p>())</span>
+  <span class=c1># stores the click event.</span>
+  <span class=n>CLICK_STATE_SPEC</span> <span class=o>=</span> <span class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span class=s1>&#39;click&#39;</span><span class=p>,</span> <span class=n>EventCoder</span><span class=p>())</span>
+  <span class=c1># The maximum element timestamp value seen so far.</span>
+  <span class=n>MAX_TIMESTAMP</span> <span class=o>=</span> <span class=n>CombiningValueStateSpec</span><span class=p>(</span><span class=s1>&#39;max_timestamp_seen&#39;</span><span class=p>,</span> <span class=nb>max</span><span class=p>)</span>
+  <span class=c1># Timer that fires when an hour goes by with an incomplete join.</span>
+  <span class=n>GC_TIMER</span> <span class=o>=</span> <span class=n>TimerSpec</span><span class=p>(</span><span class=s1>&#39;gc&#39;</span><span class=p>,</span> <span class=n>TimeDomain</span><span class=o>.</span><span class=n>WATERMARK</span><span class=p>)</span>
+
+  <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span>
+              <span class=n>element</span><span class=p>,</span>
+              <span class=n>view</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>VIEW_STATE_SPEC</span><span class=p>),</span>
+              <span class=n>click</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>CLICK_STATE_SPEC</span><span class=p>),</span>
+              <span class=n>max_timestamp_seen</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>MAX_TIMESTAMP</span><span class=p>),</span>
+              <span class=n>ts</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>TimestampParam</span><span class=p>,</span>
+              <span class=n>gc</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span class=p>(</span><span class=n>GC_TIMER</span><span class=p>)):</span>
+    <span class=n>event</span> <span class=o>=</span> <span class=n>element</span>
+    <span class=k>if</span> <span class=n>event</span><span class=o>.</span><span class=n>type</span> <span class=o>==</span> <span class=s1>&#39;view&#39;</span><span class=p>:</span>
+      <span class=n>view</span><span class=o>.</span><span class=n>write</span><span class=p>(</span><span class=n>event</span><span class=p>)</span>
+    <span class=k>else</span><span class=p>:</span>
+      <span class=n>click</span><span class=o>.</span><span class=n>write</span><span class=p>(</span><span class=n>event</span><span class=p>)</span>
+
+    <span class=n>previous_view</span> <span class=o>=</span> <span class=n>view</span><span class=o>.</span><span class=n>read</span><span class=p>()</span>
+    <span class=n>previous_click</span> <span class=o>=</span> <span class=n>click</span><span class=o>.</span><span class=n>read</span><span class=p>()</span>
+
+    <span class=c1># We&#39;ve seen both a view and a click. Output a joined event and clear state.</span>
+    <span class=k>if</span> <span class=n>previous_view</span> <span class=ow>and</span> <span class=n>previous_click</span><span class=p>:</span>
+      <span class=k>yield</span> <span class=p>(</span><span class=n>previous_view</span><span class=p>,</span> <span class=n>previous_click</span><span class=p>)</span>
+      <span class=n>view</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
+      <span class=n>click</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
+      <span class=n>max_timestamp_seen</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
+    <span class=k>else</span><span class=p>:</span>
+      <span class=n>max_timestamp_seen</span><span class=o>.</span><span class=n>add</span><span class=p>(</span><span class=n>ts</span><span class=p>)</span>
+      <span class=n>gc</span><span class=o>.</span><span class=n>set</span><span class=p>(</span><span class=n>max_timestamp_seen</span><span class=o>.</span><span class=n>read</span><span class=p>()</span> <span class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span class=o>=</span><span class=mi>3600</span><span class=p>))</span>
+
+  <span class=nd>@on_timer</span><span class=p>(</span><span class=n>GC_TIMER</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>gc_callback</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span>
+                  <span class=n>view</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>VIEW_STATE_SPEC</span><span class=p>),</span>
+                  <span class=n>click</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>CLICK_STATE_SPEC</span><span class=p>),</span>
+                  <span class=n>max_timestamp_seen</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>MAX_TIMESTAMP</span><span class=p>)):</span>
+    <span class=n>view</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
+    <span class=n>click</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
+    <span class=n>max_timestamp_seen</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
+
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span class=n>p</span> <span class=o>|</span> <span class=s1>&#39;EventsPerLinkId&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ReadPerLinkEvents</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;Join DoFn&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>JoinDoFn</span><span class=p>()))</span></code></pre></div></div><h4 id=batching-rpcs>11.5.2. Batching RPCs</h4><p>In this example, input elements are being forwarded to an external RPC service. The RPC accepts batch requests -
 multiple events for the same user can be batched in a single RPC call. Since this RPC service also imposes rate limits,
 we want to batch ten seconds worth of events together in order to reduce the number of calls.</p><div class=language-java><div class=highlight><pre class=chroma><code class=language-java data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> <span class=n>readPerUser< [...]
 <span class=n>perUser</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
@@ -2759,7 +2839,27 @@ we want to batch ten seconds worth of events together in order to reduce the num
     <span class=n>elementsState</span><span class=o>.</span><span class=na>clear</span><span class=o>();</span>
     <span class=n>isTimerSetState</span><span class=o>.</span><span class=na>clear</span><span class=o>();</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h2 id=splittable-dofns>12. Splittable <code>DoFns</code></h2><p>A Splittable <code>DoFn</code> (SDF) enables users to create modular components containing I/Os (and some advanced
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>class</span> <span class=nc>BufferDoFn</span><span class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+  <span class=n>BUFFER</span> <span class=o>=</span> <span class=n>BagStateSpec</span><span class=p>(</span><span class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span class=n>EventCoder</span><span class=p>())</span>
+  <span class=n>IS_TIMER_SET</span> <span class=o>=</span> <span class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span class=s1>&#39;is_timer_set&#39;</span><span class=p>,</span> <span class=n>BooleanCoder</span><span class=p>())</span>
+  <span class=n>OUTPUT</span> <span class=o>=</span> <span class=n>TimerSpec</span><span class=p>(</span><span class=s1>&#39;output&#39;</span><span class=p>,</span> <span class=n>TimeDomain</span><span class=o>.</span><span class=n>REAL_TIME</span><span class=p>)</span>
+
+  <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span>
+              <span class=nb>buffer</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>BUFFER</span><span class=p>),</span>
+              <span class=n>is_timer_set</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>IS_TIMER_SET</span><span class=p>),</span>
+              <span class=n>timer</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span class=p>(</span><span class=n>OUTPUT</span><span class=p>)):</span>
+    <span class=nb>buffer</span><span class=o>.</span><span class=n>add</span><span class=p>(</span><span class=n>element</span><span class=p>)</span>
+    <span class=k>if</span> <span class=ow>not</span> <span class=n>is_timer_set</span><span class=o>.</span><span class=n>read</span><span class=p>():</span>
+      <span class=n>timer</span><span class=o>.</span><span class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span class=o>.</span><span class=n>now</span><span class=p>()</span> <span class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span class=o>=</span><span class=mi>10</span><span class=p>))</span>
+      <span class=n>is_timer_set</span><span class=o>.</span><span class=n>write</span><span class=p>(</span><span class=bp>True</span><span class=p>)</span>
+
+  <span class=nd>@on_timer</span><span class=p>(</span><span class=n>OUTPUT</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>output_callback</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span>
+                      <span class=nb>buffer</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>BUFFER</span><span class=p>),</span>
+                      <span class=n>is_timer_set</span><span class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span class=n>IS_TIMER_SET</span><span class=p>)):</span>
+    <span class=n>send_rpc</span><span class=p>(</span><span class=nb>list</span><span class=p>(</span><span class=nb>buffer</span><span class=o>.</span><span class=n>read</span><span class=p>()))</span>
+    <span class=nb>buffer</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span>
+    <span class=n>is_timer_set</span><span class=o>.</span><span class=n>clear</span><span class=p>()</span></code></pre></div></div><h2 id=splittable-dofns>12. Splittable <code>DoFns</code></h2><p>A Splittable <code>DoFn</code> (SDF) enables users to create modular components containing I/Os (and some advanced
 <a href="https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv">non I/O use cases</a>). Having modular
 I/O components that can be connected to each other simplify typical patterns that users want.
 For example, a popular use case is to read filenames from a message queue followed by parsing those
diff --git a/website/generated-content/sitemap.xml b/website/generated-content/sitemap.xml
index cc6607f..4bbf3a9 100644
--- a/website/generated-content/sitemap.xml
+++ b/website/generated-content/sitemap.xml
@@ -1 +1 @@
-<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/categories/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/kafka-to-pubsub-example/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url> [...]
\ No newline at end of file
+<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/categories/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/kafka-to-pubsub-example/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url> [...]
\ No newline at end of file


Mime
View raw message