beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: [BEAM-7389] Created elementwise for consistency with docs
Date Fri, 04 Oct 2019 22:29:23 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c0b3fda  [BEAM-7389] Created elementwise for consistency with docs
     new 50c5281  Merge pull request #9664 from davidcavazos/normalize-filenames
c0b3fda is described below

commit c0b3fdaf49d4c9f96da4570259ce4d85e4609422
Author: David Cavazos <dcavazos@google.com>
AuthorDate: Thu Oct 3 09:31:19 2019 -0700

    [BEAM-7389] Created elementwise for consistency with docs
---
 .../snippets/transforms/elementwise/__init__.py    |  18 ++
 .../snippets/transforms/elementwise/filter.py      | 182 +++++++++++++++
 .../snippets/transforms/elementwise/filter_test.py |  81 +++++++
 .../snippets/transforms/elementwise/flatmap.py     | 248 +++++++++++++++++++++
 .../transforms/elementwise/flatmap_test.py         |  92 ++++++++
 .../snippets/transforms/elementwise/keys.py        |  42 ++++
 .../snippets/transforms/elementwise/keys_test.py   |  56 +++++
 .../snippets/transforms/elementwise/kvswap.py      |  42 ++++
 .../snippets/transforms/elementwise/kvswap_test.py |  56 +++++
 .../snippets/transforms/elementwise/map.py         | 226 +++++++++++++++++++
 .../snippets/transforms/elementwise/map_test.py    |  90 ++++++++
 .../snippets/transforms/elementwise/pardo.py       | 126 +++++++++++
 .../snippets/transforms/elementwise/pardo_test.py  | 120 ++++++++++
 .../snippets/transforms/elementwise/partition.py   | 136 +++++++++++
 .../transforms/elementwise/partition_test.py       |  84 +++++++
 .../snippets/transforms/elementwise/regex.py       | 236 ++++++++++++++++++++
 .../snippets/transforms/elementwise/regex_test.py  | 173 ++++++++++++++
 .../snippets/transforms/elementwise/tostring.py    |  86 +++++++
 .../transforms/elementwise/tostring_test.py        | 105 +++++++++
 .../snippets/transforms/elementwise/values.py      |  42 ++++
 .../snippets/transforms/elementwise/values_test.py |  56 +++++
 .../transforms/elementwise/withtimestamps.py       | 128 +++++++++++
 .../transforms/elementwise/withtimestamps_test.py  | 103 +++++++++
 23 files changed, 2528 insertions(+)

diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/__init__.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/__init__.py
new file mode 100644
index 0000000..6569e3f
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter.py
new file mode 100644
index 0000000..44b11b8
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter.py
@@ -0,0 +1,182 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def filter_function(test=None):
+  # [START filter_function]
+  import apache_beam as beam
+
+  def is_perennial(plant):
+    return plant['duration'] == 'perennial'
+
+  with beam.Pipeline() as pipeline:
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter perennials' >> beam.Filter(is_perennial)
+        | beam.Map(print)
+    )
+    # [END filter_function]
+    if test:
+      test(perennials)
+
+
+def filter_lambda(test=None):
+  # [START filter_lambda]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter perennials' >> beam.Filter(
+            lambda plant: plant['duration'] == 'perennial')
+        | beam.Map(print)
+    )
+    # [END filter_lambda]
+    if test:
+      test(perennials)
+
+
+def filter_multiple_arguments(test=None):
+  # [START filter_multiple_arguments]
+  import apache_beam as beam
+
+  def has_duration(plant, duration):
+    return plant['duration'] == duration
+
+  with beam.Pipeline() as pipeline:
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
+        | beam.Map(print)
+    )
+    # [END filter_multiple_arguments]
+    if test:
+      test(perennials)
+
+
+def filter_side_inputs_singleton(test=None):
+  # [START filter_side_inputs_singleton]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    perennial = pipeline | 'Perennial' >> beam.Create(['perennial'])
+
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter perennials' >> beam.Filter(
+            lambda plant, duration: plant['duration'] == duration,
+            duration=beam.pvalue.AsSingleton(perennial),
+        )
+        | beam.Map(print)
+    )
+    # [END filter_side_inputs_singleton]
+    if test:
+      test(perennials)
+
+
+def filter_side_inputs_iter(test=None):
+  # [START filter_side_inputs_iter]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    valid_durations = pipeline | 'Valid durations' >> beam.Create([
+        'annual',
+        'biennial',
+        'perennial',
+    ])
+
+    valid_plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'PERENNIAL'},
+        ])
+        | 'Filter valid plants' >> beam.Filter(
+            lambda plant, valid_durations: plant['duration'] in valid_durations,
+            valid_durations=beam.pvalue.AsIter(valid_durations),
+        )
+        | beam.Map(print)
+    )
+    # [END filter_side_inputs_iter]
+    if test:
+      test(valid_plants)
+
+
+def filter_side_inputs_dict(test=None):
+  # [START filter_side_inputs_dict]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    keep_duration = pipeline | 'Duration filters' >> beam.Create([
+        ('annual', False),
+        ('biennial', False),
+        ('perennial', True),
+    ])
+
+    perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Filter plants by duration' >> beam.Filter(
+            lambda plant, keep_duration: keep_duration[plant['duration']],
+            keep_duration=beam.pvalue.AsDict(keep_duration),
+        )
+        | beam.Map(print)
+    )
+    # [END filter_side_inputs_dict]
+    if test:
+      test(perennials)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter_test.py
new file mode 100644
index 0000000..d989e43
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/filter_test.py
@@ -0,0 +1,81 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import filter
+
+
+def check_perennials(actual):
+  # [START perennials]
+  perennials = [
+      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+      {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+  ]
+  # [END perennials]
+  assert_that(actual, equal_to(perennials))
+
+
+def check_valid_plants(actual):
+  # [START valid_plants]
+  valid_plants = [
+      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+      {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+      {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+  ]
+  # [END valid_plants]
+  assert_that(actual, equal_to(valid_plants))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.filter.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class FilterTest(unittest.TestCase):
+  def test_filter_function(self):
+    filter.filter_function(check_perennials)
+
+  def test_filter_lambda(self):
+    filter.filter_lambda(check_perennials)
+
+  def test_filter_multiple_arguments(self):
+    filter.filter_multiple_arguments(check_perennials)
+
+  def test_filter_side_inputs_singleton(self):
+    filter.filter_side_inputs_singleton(check_perennials)
+
+  def test_filter_side_inputs_iter(self):
+    filter.filter_side_inputs_iter(check_valid_plants)
+
+  def test_filter_side_inputs_dict(self):
+    filter.filter_side_inputs_dict(check_perennials)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap.py
new file mode 100644
index 0000000..50ffe7a
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap.py
@@ -0,0 +1,248 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def flatmap_simple(test=None):
+  # [START flatmap_simple]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            'πŸ“Strawberry πŸ₯•Carrot πŸ†Eggplant',
+            'πŸ…Tomato πŸ₯”Potato',
+        ])
+        | 'Split words' >> beam.FlatMap(str.split)
+        | beam.Map(print)
+    )
+    # [END flatmap_simple]
+    if test:
+      test(plants)
+
+
+def flatmap_function(test=None):
+  # [START flatmap_function]
+  import apache_beam as beam
+
+  def split_words(text):
+    return text.split(',')
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            'πŸ“Strawberry,πŸ₯•Carrot,πŸ†Eggplant',
+            'πŸ…Tomato,πŸ₯”Potato',
+        ])
+        | 'Split words' >> beam.FlatMap(split_words)
+        | beam.Map(print)
+    )
+    # [END flatmap_function]
+    if test:
+      test(plants)
+
+
+def flatmap_lambda(test=None):
+  # [START flatmap_lambda]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            ['πŸ“Strawberry', 'πŸ₯•Carrot', 'πŸ†Eggplant'],
+            ['πŸ…Tomato', 'πŸ₯”Potato'],
+        ])
+        | 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
+        | beam.Map(print)
+    )
+    # [END flatmap_lambda]
+    if test:
+      test(plants)
+
+
+def flatmap_generator(test=None):
+  # [START flatmap_generator]
+  import apache_beam as beam
+
+  def generate_elements(elements):
+    for element in elements:
+      yield element
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            ['πŸ“Strawberry', 'πŸ₯•Carrot', 'πŸ†Eggplant'],
+            ['πŸ…Tomato', 'πŸ₯”Potato'],
+        ])
+        | 'Flatten lists' >> beam.FlatMap(generate_elements)
+        | beam.Map(print)
+    )
+    # [END flatmap_generator]
+    if test:
+      test(plants)
+
+
+def flatmap_multiple_arguments(test=None):
+  # [START flatmap_multiple_arguments]
+  import apache_beam as beam
+
+  def split_words(text, delimiter=None):
+    return text.split(delimiter)
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            'πŸ“Strawberry,πŸ₯•Carrot,πŸ†Eggplant',
+            'πŸ…Tomato,πŸ₯”Potato',
+        ])
+        | 'Split words' >> beam.FlatMap(split_words, delimiter=',')
+        | beam.Map(print)
+    )
+    # [END flatmap_multiple_arguments]
+    if test:
+      test(plants)
+
+
+def flatmap_tuple(test=None):
+  # [START flatmap_tuple]
+  import apache_beam as beam
+
+  def format_plant(icon, plant):
+    if icon:
+      yield '{}{}'.format(icon, plant)
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            ('πŸ“', 'Strawberry'),
+            ('πŸ₯•', 'Carrot'),
+            ('πŸ†', 'Eggplant'),
+            ('πŸ…', 'Tomato'),
+            ('πŸ₯”', 'Potato'),
+            (None, 'Invalid'),
+        ])
+        | 'Format' >> beam.FlatMapTuple(format_plant)
+        | beam.Map(print)
+    )
+    # [END flatmap_tuple]
+    if test:
+      test(plants)
+
+
+def flatmap_side_inputs_singleton(test=None):
+  # [START flatmap_side_inputs_singleton]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    delimiter = pipeline | 'Create delimiter' >> beam.Create([','])
+
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            'πŸ“Strawberry,πŸ₯•Carrot,πŸ†Eggplant',
+            'πŸ…Tomato,πŸ₯”Potato',
+        ])
+        | 'Split words' >> beam.FlatMap(
+            lambda text, delimiter: text.split(delimiter),
+            delimiter=beam.pvalue.AsSingleton(delimiter),
+        )
+        | beam.Map(print)
+    )
+    # [END flatmap_side_inputs_singleton]
+    if test:
+      test(plants)
+
+
+def flatmap_side_inputs_iter(test=None):
+  # [START flatmap_side_inputs_iter]
+  import apache_beam as beam
+
+  def normalize_and_validate_durations(plant, valid_durations):
+    plant['duration'] = plant['duration'].lower()
+    if plant['duration'] in valid_durations:
+      yield plant
+
+  with beam.Pipeline() as pipeline:
+    valid_durations = pipeline | 'Valid durations' >> beam.Create([
+        'annual',
+        'biennial',
+        'perennial',
+    ])
+
+    valid_plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'Perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'BIENNIAL'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'unknown'},
+        ])
+        | 'Normalize and validate durations' >> beam.FlatMap(
+            normalize_and_validate_durations,
+            valid_durations=beam.pvalue.AsIter(valid_durations),
+        )
+        | beam.Map(print)
+    )
+    # [END flatmap_side_inputs_iter]
+    if test:
+      test(valid_plants)
+
+
+def flatmap_side_inputs_dict(test=None):
+  # [START flatmap_side_inputs_dict]
+  import apache_beam as beam
+
+  def replace_duration_if_valid(plant, durations):
+    if plant['duration'] in durations:
+      plant['duration'] = durations[plant['duration']]
+      yield plant
+
+  with beam.Pipeline() as pipeline:
+    durations = pipeline | 'Durations dict' >> beam.Create([
+        (0, 'annual'),
+        (1, 'biennial'),
+        (2, 'perennial'),
+    ])
+
+    valid_plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 2},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 1},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 2},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 0},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': -1},
+        ])
+        | 'Replace duration if valid' >> beam.FlatMap(
+            replace_duration_if_valid,
+            durations=beam.pvalue.AsDict(durations),
+        )
+        | beam.Map(print)
+    )
+    # [END flatmap_side_inputs_dict]
+    if test:
+      test(valid_plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py
new file mode 100644
index 0000000..718dcee
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_test.py
@@ -0,0 +1,92 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import flatmap
+
+
+def check_plants(actual):
+  # [START plants]
+  plants = [
+      'πŸ“Strawberry',
+      'πŸ₯•Carrot',
+      'πŸ†Eggplant',
+      'πŸ…Tomato',
+      'πŸ₯”Potato',
+  ]
+  # [END plants]
+  assert_that(actual, equal_to(plants))
+
+
+def check_valid_plants(actual):
+  # [START valid_plants]
+  valid_plants = [
+      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+      {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+      {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+  ]
+  # [END valid_plants]
+  assert_that(actual, equal_to(valid_plants))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.flatmap.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class FlatMapTest(unittest.TestCase):
+  def test_flatmap_simple(self):
+    flatmap.flatmap_simple(check_plants)
+
+  def test_flatmap_function(self):
+    flatmap.flatmap_function(check_plants)
+
+  def test_flatmap_lambda(self):
+    flatmap.flatmap_lambda(check_plants)
+
+  def test_flatmap_generator(self):
+    flatmap.flatmap_generator(check_plants)
+
+  def test_flatmap_multiple_arguments(self):
+    flatmap.flatmap_multiple_arguments(check_plants)
+
+  def test_flatmap_tuple(self):
+    flatmap.flatmap_tuple(check_plants)
+
+  def test_flatmap_side_inputs_singleton(self):
+    flatmap.flatmap_side_inputs_singleton(check_plants)
+
+  def test_flatmap_side_inputs_iter(self):
+    flatmap.flatmap_side_inputs_iter(check_valid_plants)
+
+  def test_flatmap_side_inputs_dict(self):
+    flatmap.flatmap_side_inputs_dict(check_valid_plants)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys.py
new file mode 100644
index 0000000..01c9d6b
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys.py
@@ -0,0 +1,42 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def keys(test=None):
+  # [START keys]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    icons = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            ('πŸ“', 'Strawberry'),
+            ('πŸ₯•', 'Carrot'),
+            ('πŸ†', 'Eggplant'),
+            ('πŸ…', 'Tomato'),
+            ('πŸ₯”', 'Potato'),
+        ])
+        | 'Keys' >> beam.Keys()
+        | beam.Map(print)
+    )
+    # [END keys]
+    if test:
+      test(icons)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys_test.py
new file mode 100644
index 0000000..780c5e4
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/keys_test.py
@@ -0,0 +1,56 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import keys
+
+
+def check_icons(actual):
+  # [START icons]
+  icons = [
+      'πŸ“',
+      'πŸ₯•',
+      'πŸ†',
+      'πŸ…',
+      'πŸ₯”',
+  ]
+  # [END icons]
+  assert_that(actual, equal_to(icons))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.keys.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class KeysTest(unittest.TestCase):
+  def test_keys(self):
+    keys.keys(check_icons)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap.py
new file mode 100644
index 0000000..2107fd5
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap.py
@@ -0,0 +1,42 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def kvswap(test=None):
+  # [START kvswap]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            ('πŸ“', 'Strawberry'),
+            ('πŸ₯•', 'Carrot'),
+            ('πŸ†', 'Eggplant'),
+            ('πŸ…', 'Tomato'),
+            ('πŸ₯”', 'Potato'),
+        ])
+        | 'Key-Value swap' >> beam.KvSwap()
+        | beam.Map(print)
+    )
+    # [END kvswap]
+    if test:
+      test(plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap_test.py
new file mode 100644
index 0000000..ea7698b
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/kvswap_test.py
@@ -0,0 +1,56 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import kvswap
+
+
+def check_plants(actual):
+  # [START plants]
+  plants = [
+      ('Strawberry', 'πŸ“'),
+      ('Carrot', 'πŸ₯•'),
+      ('Eggplant', 'πŸ†'),
+      ('Tomato', 'πŸ…'),
+      ('Potato', 'πŸ₯”'),
+  ]
+  # [END plants]
+  assert_that(actual, equal_to(plants))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.kvswap.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class KvSwapTest(unittest.TestCase):
+  def test_kvswap(self):
+    kvswap.kvswap(check_plants)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map.py
new file mode 100644
index 0000000..9defd47
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map.py
@@ -0,0 +1,226 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def map_simple(test=None):
+  # [START map_simple]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '   πŸ“Strawberry   \n',
+            '   πŸ₯•Carrot   \n',
+            '   πŸ†Eggplant   \n',
+            '   πŸ…Tomato   \n',
+            '   πŸ₯”Potato   \n',
+        ])
+        | 'Strip' >> beam.Map(str.strip)
+        | beam.Map(print)
+    )
+    # [END map_simple]
+    if test:
+      test(plants)
+
+
+def map_function(test=None):
+  # [START map_function]
+  import apache_beam as beam
+
+  def strip_header_and_newline(text):
+    return text.strip('# \n')
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '# πŸ“Strawberry\n',
+            '# πŸ₯•Carrot\n',
+            '# πŸ†Eggplant\n',
+            '# πŸ…Tomato\n',
+            '# πŸ₯”Potato\n',
+        ])
+        | 'Strip header' >> beam.Map(strip_header_and_newline)
+        | beam.Map(print)
+    )
+    # [END map_function]
+    if test:
+      test(plants)
+
+
+def map_lambda(test=None):
+  # [START map_lambda]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '# πŸ“Strawberry\n',
+            '# πŸ₯•Carrot\n',
+            '# πŸ†Eggplant\n',
+            '# πŸ…Tomato\n',
+            '# πŸ₯”Potato\n',
+        ])
+        | 'Strip header' >> beam.Map(lambda text: text.strip('# \n'))
+        | beam.Map(print)
+    )
+    # [END map_lambda]
+    if test:
+      test(plants)
+
+
+def map_multiple_arguments(test=None):
+  # [START map_multiple_arguments]
+  import apache_beam as beam
+
+  def strip(text, chars=None):
+    return text.strip(chars)
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '# πŸ“Strawberry\n',
+            '# πŸ₯•Carrot\n',
+            '# πŸ†Eggplant\n',
+            '# πŸ…Tomato\n',
+            '# πŸ₯”Potato\n',
+        ])
+        | 'Strip header' >> beam.Map(strip, chars='# \n')
+        | beam.Map(print)
+    )
+    # [END map_multiple_arguments]
+    if test:
+      test(plants)
+
+
+def map_tuple(test=None):
+  # [START map_tuple]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            ('πŸ“', 'Strawberry'),
+            ('πŸ₯•', 'Carrot'),
+            ('πŸ†', 'Eggplant'),
+            ('πŸ…', 'Tomato'),
+            ('πŸ₯”', 'Potato'),
+        ])
+        | 'Format' >> beam.MapTuple(
+            lambda icon, plant: '{}{}'.format(icon, plant))
+        | beam.Map(print)
+    )
+    # [END map_tuple]
+    if test:
+      test(plants)
+
+
+def map_side_inputs_singleton(test=None):
+  # [START map_side_inputs_singleton]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    chars = pipeline | 'Create chars' >> beam.Create(['# \n'])
+
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '# πŸ“Strawberry\n',
+            '# πŸ₯•Carrot\n',
+            '# πŸ†Eggplant\n',
+            '# πŸ…Tomato\n',
+            '# πŸ₯”Potato\n',
+        ])
+        | 'Strip header' >> beam.Map(
+            lambda text, chars: text.strip(chars),
+            chars=beam.pvalue.AsSingleton(chars),
+        )
+        | beam.Map(print)
+    )
+    # [END map_side_inputs_singleton]
+    if test:
+      test(plants)
+
+
+def map_side_inputs_iter(test=None):
+  # [START map_side_inputs_iter]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    chars = pipeline | 'Create chars' >> beam.Create(['#', ' ', '\n'])
+
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            '# πŸ“Strawberry\n',
+            '# πŸ₯•Carrot\n',
+            '# πŸ†Eggplant\n',
+            '# πŸ…Tomato\n',
+            '# πŸ₯”Potato\n',
+        ])
+        | 'Strip header' >> beam.Map(
+            lambda text, chars: text.strip(''.join(chars)),
+            chars=beam.pvalue.AsIter(chars),
+        )
+        | beam.Map(print)
+    )
+    # [END map_side_inputs_iter]
+    if test:
+      test(plants)
+
+
+def map_side_inputs_dict(test=None):
+  # [START map_side_inputs_dict]
+  import apache_beam as beam
+
+  def replace_duration(plant, durations):
+    plant['duration'] = durations[plant['duration']]
+    return plant
+
+  with beam.Pipeline() as pipeline:
+    durations = pipeline | 'Durations' >> beam.Create([
+        (0, 'annual'),
+        (1, 'biennial'),
+        (2, 'perennial'),
+    ])
+
+    plant_details = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 2},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 1},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 2},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 0},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 2},
+        ])
+        | 'Replace duration' >> beam.Map(
+            replace_duration,
+            durations=beam.pvalue.AsDict(durations),
+        )
+        | beam.Map(print)
+    )
+    # [END map_side_inputs_dict]
+    if test:
+      test(plant_details)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map_test.py
new file mode 100644
index 0000000..4186176
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/map_test.py
@@ -0,0 +1,90 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import map
+
+
+def check_plants(actual):
+  # [START plants]
+  plants = [
+      'πŸ“Strawberry',
+      'πŸ₯•Carrot',
+      'πŸ†Eggplant',
+      'πŸ…Tomato',
+      'πŸ₯”Potato',
+  ]
+  # [END plants]
+  assert_that(actual, equal_to(plants))
+
+
+def check_plant_details(actual):
+  # [START plant_details]
+  plant_details = [
+      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+      {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+      {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+      {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+  ]
+  # [END plant_details]
+  assert_that(actual, equal_to(plant_details))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.map.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class MapTest(unittest.TestCase):
+  def test_map_simple(self):
+    map.map_simple(check_plants)
+
+  def test_map_function(self):
+    map.map_function(check_plants)
+
+  def test_map_lambda(self):
+    map.map_lambda(check_plants)
+
+  def test_map_multiple_arguments(self):
+    map.map_multiple_arguments(check_plants)
+
+  def test_map_tuple(self):
+    map.map_tuple(check_plants)
+
+  def test_map_side_inputs_singleton(self):
+    map.map_side_inputs_singleton(check_plants)
+
+  def test_map_side_inputs_iter(self):
+    map.map_side_inputs_iter(check_plants)
+
+  def test_map_side_inputs_dict(self):
+    map.map_side_inputs_dict(check_plant_details)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py
new file mode 100644
index 0000000..971e9f0
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py
@@ -0,0 +1,126 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+from __future__ import unicode_literals
+
+
+def pardo_dofn(test=None):
+  # [START pardo_dofn]
+  import apache_beam as beam
+
+  class SplitWords(beam.DoFn):
+    def __init__(self, delimiter=','):
+      self.delimiter = delimiter
+
+    def process(self, text):
+      for word in text.split(self.delimiter):
+        yield word
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            'πŸ“Strawberry,πŸ₯•Carrot,πŸ†Eggplant',
+            'πŸ…Tomato,πŸ₯”Potato',
+        ])
+        | 'Split words' >> beam.ParDo(SplitWords(','))
+        | beam.Map(print)
+    )
+    # [END pardo_dofn]
+    if test:
+      test(plants)
+
+
+def pardo_dofn_params(test=None):
+  # pylint: disable=line-too-long
+  # [START pardo_dofn_params]
+  import apache_beam as beam
+
+  class AnalyzeElement(beam.DoFn):
+    def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
+      yield '\n'.join([
+          '# timestamp',
+          'type(timestamp) -> ' + repr(type(timestamp)),
+          'timestamp.micros -> ' + repr(timestamp.micros),
+          'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
+          'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
+          '',
+          '# window',
+          'type(window) -> ' + repr(type(window)),
+          'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
+          'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
+          'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
+      ])
+
+  with beam.Pipeline() as pipeline:
+    dofn_params = (
+        pipeline
+        | 'Create a single test element' >> beam.Create([':)'])
+        | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
+            lambda elem: beam.window.TimestampedValue(elem, 1584675660))
+        | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
+        | 'Analyze element' >> beam.ParDo(AnalyzeElement())
+        | beam.Map(print)
+    )
+    # [END pardo_dofn_params]
+    # pylint: enable=line-too-long
+    if test:
+      test(dofn_params)
+
+
+def pardo_dofn_methods(test=None):
+  # [START pardo_dofn_methods]
+  import apache_beam as beam
+
+  class DoFnMethods(beam.DoFn):
+    def __init__(self):
+      print('__init__')
+      self.window = beam.window.GlobalWindow()
+
+    def setup(self):
+      print('setup')
+
+    def start_bundle(self):
+      print('start_bundle')
+
+    def process(self, element, window=beam.DoFn.WindowParam):
+      self.window = window
+      yield '* process: ' + element
+
+    def finish_bundle(self):
+      yield beam.utils.windowed_value.WindowedValue(
+          value='* finish_bundle: 🌱🌳🌍',
+          timestamp=0,
+          windows=[self.window],
+      )
+
+    def teardown(self):
+      print('teardown')
+
+  with beam.Pipeline() as pipeline:
+    results = (
+        pipeline
+        | 'Create inputs' >> beam.Create(['πŸ“', 'πŸ₯•', 'πŸ†', 'πŸ…', 'πŸ₯”'])
+        | 'DoFn methods' >> beam.ParDo(DoFnMethods())
+        | beam.Map(print)
+    )
+    # [END pardo_dofn_methods]
+    if test:
+      return test(results)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py
new file mode 100644
index 0000000..8507e01
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py
@@ -0,0 +1,120 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import io
+import platform
+import sys
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import pardo
+
+
+def check_plants(actual):
+  # [START plants]
+  plants = [
+      'πŸ“Strawberry',
+      'πŸ₯•Carrot',
+      'πŸ†Eggplant',
+      'πŸ…Tomato',
+      'πŸ₯”Potato',
+  ]
+  # [END plants]
+  assert_that(actual, equal_to(plants))
+
+
+def check_dofn_params(actual):
+  # pylint: disable=line-too-long
+  dofn_params = '\n'.join('''[START dofn_params]
+# timestamp
+type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
+timestamp.micros -> 1584675660000000
+timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
+timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)
+
+# window
+type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
+window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
+window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
+window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)
+[END dofn_params]'''.splitlines()[1:-1])
+  # pylint: enable=line-too-long
+  assert_that(actual, equal_to([dofn_params]))
+
+
+def check_dofn_methods(actual):
+  # Return the expected stdout to check the ordering of the called methods.
+  return '''[START results]
+__init__
+setup
+start_bundle
+* process: πŸ“
+* process: πŸ₯•
+* process: πŸ†
+* process: πŸ…
+* process: πŸ₯”
+* finish_bundle: 🌱🌳🌍
+teardown
+[END results]'''.splitlines()[1:-1]
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.pardo.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class ParDoTest(unittest.TestCase):
+  def test_pardo_dofn(self):
+    pardo.pardo_dofn(check_plants)
+
+  # TODO: Remove this after Python 2 deprecation.
+  # https://issues.apache.org/jira/browse/BEAM-8124
+  @unittest.skipIf(sys.version_info[0] < 3 and platform.system() == 'Windows',
+                   'Python 2 on Windows uses `long` rather than `int`')
+  def test_pardo_dofn_params(self):
+    pardo.pardo_dofn_params(check_dofn_params)
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+@mock.patch('sys.stdout', new_callable=io.StringIO)
+class ParDoStdoutTest(unittest.TestCase):
+  def test_pardo_dofn_methods(self, mock_stdout):
+    expected = pardo.pardo_dofn_methods(check_dofn_methods)
+    actual = mock_stdout.getvalue().splitlines()
+
+    # For the stdout, check the ordering of the methods, not of the elements.
+    actual_stdout = [line.split(':')[0] for line in actual]
+    expected_stdout = [line.split(':')[0] for line in expected]
+    self.assertEqual(actual_stdout, expected_stdout)
+
+    # For the elements, ignore the stdout and just make sure all elements match.
+    actual_elements = {line for line in actual if line.startswith('*')}
+    expected_elements = {line for line in expected if line.startswith('*')}
+    self.assertEqual(actual_elements, expected_elements)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py
new file mode 100644
index 0000000..6f839d4
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py
@@ -0,0 +1,136 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def partition_function(test=None):
+  # [START partition_function]
+  import apache_beam as beam
+
+  durations = ['annual', 'biennial', 'perennial']
+
+  def by_duration(plant, num_partitions):
+    return durations.index(plant['duration'])
+
+  with beam.Pipeline() as pipeline:
+    annuals, biennials, perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Partition' >> beam.Partition(by_duration, len(durations))
+    )
+    _ = (
+        annuals
+        | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
+    )
+    _ = (
+        biennials
+        | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
+    )
+    _ = (
+        perennials
+        | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
+    )
+    # [END partition_function]
+    if test:
+      test(annuals, biennials, perennials)
+
+
+def partition_lambda(test=None):
+  # [START partition_lambda]
+  import apache_beam as beam
+
+  durations = ['annual', 'biennial', 'perennial']
+
+  with beam.Pipeline() as pipeline:
+    annuals, biennials, perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Partition' >> beam.Partition(
+            lambda plant, num_partitions: durations.index(plant['duration']),
+            len(durations),
+        )
+    )
+    _ = (
+        annuals
+        | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
+    )
+    _ = (
+        biennials
+        | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
+    )
+    _ = (
+        perennials
+        | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
+    )
+    # [END partition_lambda]
+    if test:
+      test(annuals, biennials, perennials)
+
+
+def partition_multiple_arguments(test=None):
+  # [START partition_multiple_arguments]
+  import apache_beam as beam
+  import json
+
+  def split_dataset(plant, num_partitions, ratio):
+    assert num_partitions == len(ratio)
+    bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)
+    total = 0
+    for i, part in enumerate(ratio):
+      total += part
+      if bucket < total:
+        return i
+    return len(ratio) - 1
+
+  with beam.Pipeline() as pipeline:
+    train_dataset, test_dataset = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2])
+    )
+    _ = (
+        train_dataset
+        | 'Train' >> beam.Map(lambda x: print('train: ' + str(x)))
+    )
+    _ = (
+        test_dataset
+        | 'Test'  >> beam.Map(lambda x: print('test: ' + str(x)))
+    )
+    # [END partition_multiple_arguments]
+    if test:
+      test(train_dataset, test_dataset)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition_test.py
new file mode 100644
index 0000000..0b8ae3d
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition_test.py
@@ -0,0 +1,84 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import partition
+
+
+def check_partitions(actual1, actual2, actual3):
+  # [START partitions]
+  annuals = [
+      {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+  ]
+  biennials = [
+      {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+  ]
+  perennials = [
+      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+      {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+  ]
+  # [END partitions]
+  assert_that(actual1, equal_to(annuals), label='assert annuals')
+  assert_that(actual2, equal_to(biennials), label='assert biennials')
+  assert_that(actual3, equal_to(perennials), label='assert perennials')
+
+
+def check_split_datasets(actual1, actual2):
+  # [START train_test]
+  train_dataset = [
+      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
+      {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
+      {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
+  ]
+  test_dataset = [
+      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
+      {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
+  ]
+  # [END train_test]
+  assert_that(actual1, equal_to(train_dataset), label='assert train')
+  assert_that(actual2, equal_to(test_dataset), label='assert test')
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.partition.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class PartitionTest(unittest.TestCase):
+  def test_partition_function(self):
+    partition.partition_function(check_partitions)
+
+  def test_partition_lambda(self):
+    partition.partition_lambda(check_partitions)
+
+  def test_partition_multiple_arguments(self):
+    partition.partition_multiple_arguments(check_split_datasets)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex.py
new file mode 100644
index 0000000..b39b534
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex.py
@@ -0,0 +1,236 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def regex_matches(test=None):
+  # [START regex_matches]
+  import apache_beam as beam
+
+  # Matches a named group 'icon', and then two comma-separated groups.
+  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
+  with beam.Pipeline() as pipeline:
+    plants_matches = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            'πŸ“, Strawberry, perennial',
+            'πŸ₯•, Carrot, biennial ignoring trailing words',
+            'πŸ†, Eggplant, perennial',
+            'πŸ…, Tomato, annual',
+            'πŸ₯”, Potato, perennial',
+            '# 🍌, invalid, format',
+            'invalid, πŸ‰, format',
+        ])
+        | 'Parse plants' >> beam.Regex.matches(regex)
+        | beam.Map(print)
+    )
+    # [END regex_matches]
+    if test:
+      test(plants_matches)
+
+
+def regex_all_matches(test=None):
+  # [START regex_all_matches]
+  import apache_beam as beam
+
+  # Matches a named group 'icon', and then two comma-separated groups.
+  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
+  with beam.Pipeline() as pipeline:
+    plants_all_matches = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            'πŸ“, Strawberry, perennial',
+            'πŸ₯•, Carrot, biennial ignoring trailing words',
+            'πŸ†, Eggplant, perennial',
+            'πŸ…, Tomato, annual',
+            'πŸ₯”, Potato, perennial',
+            '# 🍌, invalid, format',
+            'invalid, πŸ‰, format',
+        ])
+        | 'Parse plants' >> beam.Regex.all_matches(regex)
+        | beam.Map(print)
+    )
+    # [END regex_all_matches]
+    if test:
+      test(plants_all_matches)
+
+
+def regex_matches_kv(test=None):
+  # [START regex_matches_kv]
+  import apache_beam as beam
+
+  # Matches a named group 'icon', and then two comma-separated groups.
+  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
+  with beam.Pipeline() as pipeline:
+    plants_matches_kv = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            'πŸ“, Strawberry, perennial',
+            'πŸ₯•, Carrot, biennial ignoring trailing words',
+            'πŸ†, Eggplant, perennial',
+            'πŸ…, Tomato, annual',
+            'πŸ₯”, Potato, perennial',
+            '# 🍌, invalid, format',
+            'invalid, πŸ‰, format',
+        ])
+        | 'Parse plants' >> beam.Regex.matches_kv(regex, keyGroup='icon')
+        | beam.Map(print)
+    )
+    # [END regex_matches_kv]
+    if test:
+      test(plants_matches_kv)
+
+
+def regex_find(test=None):
+  # [START regex_find]
+  import apache_beam as beam
+
+  # Matches a named group 'icon', and then two comma-separated groups.
+  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
+  with beam.Pipeline() as pipeline:
+    plants_matches = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            '# πŸ“, Strawberry, perennial',
+            '# πŸ₯•, Carrot, biennial ignoring trailing words',
+            '# πŸ†, Eggplant, perennial - 🍌, Banana, perennial',
+            '# πŸ…, Tomato, annual - πŸ‰, Watermelon, annual',
+            '# πŸ₯”, Potato, perennial',
+        ])
+        | 'Parse plants' >> beam.Regex.find(regex)
+        | beam.Map(print)
+    )
+    # [END regex_find]
+    if test:
+      test(plants_matches)
+
+
+def regex_find_all(test=None):
+  # [START regex_find_all]
+  import apache_beam as beam
+
+  # Matches a named group 'icon', and then two comma-separated groups.
+  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
+  with beam.Pipeline() as pipeline:
+    plants_find_all = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            '# πŸ“, Strawberry, perennial',
+            '# πŸ₯•, Carrot, biennial ignoring trailing words',
+            '# πŸ†, Eggplant, perennial - 🍌, Banana, perennial',
+            '# πŸ…, Tomato, annual - πŸ‰, Watermelon, annual',
+            '# πŸ₯”, Potato, perennial',
+        ])
+        | 'Parse plants' >> beam.Regex.find_all(regex)
+        | beam.Map(print)
+    )
+    # [END regex_find_all]
+    if test:
+      test(plants_find_all)
+
+
+def regex_find_kv(test=None):
+  # [START regex_find_kv]
+  import apache_beam as beam
+
+  # Matches a named group 'icon', and then two comma-separated groups.
+  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
+  with beam.Pipeline() as pipeline:
+    plants_matches_kv = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            '# πŸ“, Strawberry, perennial',
+            '# πŸ₯•, Carrot, biennial ignoring trailing words',
+            '# πŸ†, Eggplant, perennial - 🍌, Banana, perennial',
+            '# πŸ…, Tomato, annual - πŸ‰, Watermelon, annual',
+            '# πŸ₯”, Potato, perennial',
+        ])
+        | 'Parse plants' >> beam.Regex.find_kv(regex, keyGroup='icon')
+        | beam.Map(print)
+    )
+    # [END regex_find_kv]
+    if test:
+      test(plants_matches_kv)
+
+
+def regex_replace_all(test=None):
+  # [START regex_replace_all]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants_replace_all = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            'πŸ“ : Strawberry : perennial',
+            'πŸ₯• : Carrot : biennial',
+            'πŸ†\t:\tEggplant\t:\tperennial',
+            'πŸ… : Tomato : annual',
+            'πŸ₯” : Potato : perennial',
+        ])
+        | 'To CSV' >> beam.Regex.replace_all(r'\s*:\s*', ',')
+        | beam.Map(print)
+    )
+    # [END regex_replace_all]
+    if test:
+      test(plants_replace_all)
+
+
+def regex_replace_first(test=None):
+  # [START regex_replace_first]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants_replace_first = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            'πŸ“, Strawberry, perennial',
+            'πŸ₯•, Carrot, biennial',
+            'πŸ†,\tEggplant, perennial',
+            'πŸ…, Tomato, annual',
+            'πŸ₯”, Potato, perennial',
+        ])
+        | 'As dictionary' >> beam.Regex.replace_first(r'\s*,\s*', ': ')
+        | beam.Map(print)
+    )
+    # [END regex_replace_first]
+    if test:
+      test(plants_replace_first)
+
+
+def regex_split(test=None):
+  # [START regex_split]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants_split = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            'πŸ“ : Strawberry : perennial',
+            'πŸ₯• : Carrot : biennial',
+            'πŸ†\t:\tEggplant : perennial',
+            'πŸ… : Tomato : annual',
+            'πŸ₯” : Potato : perennial',
+        ])
+        | 'Parse plants' >> beam.Regex.split(r'\s*:\s*')
+        | beam.Map(print)
+    )
+    # [END regex_split]
+    if test:
+      test(plants_split)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex_test.py
new file mode 100644
index 0000000..8312312
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/regex_test.py
@@ -0,0 +1,173 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import regex
+
+
+def check_matches(actual):
+  # [START plants_matches]
+  plants_matches = [
+      'πŸ“, Strawberry, perennial',
+      'πŸ₯•, Carrot, biennial',
+      'πŸ†, Eggplant, perennial',
+      'πŸ…, Tomato, annual',
+      'πŸ₯”, Potato, perennial',
+  ]
+  # [END plants_matches]
+  assert_that(actual, equal_to(plants_matches))
+
+
+def check_all_matches(actual):
+  # [START plants_all_matches]
+  plants_all_matches = [
+      ['πŸ“, Strawberry, perennial', 'πŸ“', 'Strawberry', 'perennial'],
+      ['πŸ₯•, Carrot, biennial', 'πŸ₯•', 'Carrot', 'biennial'],
+      ['πŸ†, Eggplant, perennial', 'πŸ†', 'Eggplant', 'perennial'],
+      ['πŸ…, Tomato, annual', 'πŸ…', 'Tomato', 'annual'],
+      ['πŸ₯”, Potato, perennial', 'πŸ₯”', 'Potato', 'perennial'],
+  ]
+  # [END plants_all_matches]
+  assert_that(actual, equal_to(plants_all_matches))
+
+
+def check_matches_kv(actual):
+  # [START plants_matches_kv]
+  plants_matches_kv = [
+      ('πŸ“', 'πŸ“, Strawberry, perennial'),
+      ('πŸ₯•', 'πŸ₯•, Carrot, biennial'),
+      ('πŸ†', 'πŸ†, Eggplant, perennial'),
+      ('πŸ…', 'πŸ…, Tomato, annual'),
+      ('πŸ₯”', 'πŸ₯”, Potato, perennial'),
+  ]
+  # [END plants_matches_kv]
+  assert_that(actual, equal_to(plants_matches_kv))
+
+
+def check_find_all(actual):
+  # [START plants_find_all]
+  plants_find_all = [
+      ['πŸ“, Strawberry, perennial'],
+      ['πŸ₯•, Carrot, biennial'],
+      ['πŸ†, Eggplant, perennial', '🍌, Banana, perennial'],
+      ['πŸ…, Tomato, annual', 'πŸ‰, Watermelon, annual'],
+      ['πŸ₯”, Potato, perennial'],
+  ]
+  # [END plants_find_all]
+  assert_that(actual, equal_to(plants_find_all))
+
+
+def check_find_kv(actual):
+  # [START plants_find_kv]
+  plants_find_all = [
+      ('πŸ“', 'πŸ“, Strawberry, perennial'),
+      ('πŸ₯•', 'πŸ₯•, Carrot, biennial'),
+      ('πŸ†', 'πŸ†, Eggplant, perennial'),
+      ('🍌', '🍌, Banana, perennial'),
+      ('πŸ…', 'πŸ…, Tomato, annual'),
+      ('πŸ‰', 'πŸ‰, Watermelon, annual'),
+      ('πŸ₯”', 'πŸ₯”, Potato, perennial'),
+  ]
+  # [END plants_find_kv]
+  assert_that(actual, equal_to(plants_find_all))
+
+
+def check_replace_all(actual):
+  # [START plants_replace_all]
+  plants_replace_all = [
+      'πŸ“,Strawberry,perennial',
+      'πŸ₯•,Carrot,biennial',
+      'πŸ†,Eggplant,perennial',
+      'πŸ…,Tomato,annual',
+      'πŸ₯”,Potato,perennial',
+  ]
+  # [END plants_replace_all]
+  assert_that(actual, equal_to(plants_replace_all))
+
+
+def check_replace_first(actual):
+  # [START plants_replace_first]
+  plants_replace_first = [
+      'πŸ“: Strawberry, perennial',
+      'πŸ₯•: Carrot, biennial',
+      'πŸ†: Eggplant, perennial',
+      'πŸ…: Tomato, annual',
+      'πŸ₯”: Potato, perennial',
+  ]
+  # [END plants_replace_first]
+  assert_that(actual, equal_to(plants_replace_first))
+
+
+def check_split(actual):
+  # [START plants_split]
+  plants_split = [
+      ['πŸ“', 'Strawberry', 'perennial'],
+      ['πŸ₯•', 'Carrot', 'biennial'],
+      ['πŸ†', 'Eggplant', 'perennial'],
+      ['πŸ…', 'Tomato', 'annual'],
+      ['πŸ₯”', 'Potato', 'perennial'],
+  ]
+  # [END plants_split]
+  assert_that(actual, equal_to(plants_split))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.regex.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class RegexTest(unittest.TestCase):
+  def test_matches(self):
+    regex.regex_matches(check_matches)
+
+  def test_all_matches(self):
+    regex.regex_all_matches(check_all_matches)
+
+  def test_matches_kv(self):
+    regex.regex_matches_kv(check_matches_kv)
+
+  def test_find(self):
+    regex.regex_find(check_matches)
+
+  def test_find_all(self):
+    regex.regex_find_all(check_find_all)
+
+  def test_find_kv(self):
+    regex.regex_find_kv(check_find_kv)
+
+  def test_replace_all(self):
+    regex.regex_replace_all(check_replace_all)
+
+  def test_replace_first(self):
+    regex.regex_replace_first(check_replace_first)
+
+  def test_split(self):
+    regex.regex_split(check_split)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring.py
new file mode 100644
index 0000000..1d0b7dd
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring.py
@@ -0,0 +1,86 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def tostring_kvs(test=None):
+  # [START tostring_kvs]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            ('πŸ“', 'Strawberry'),
+            ('πŸ₯•', 'Carrot'),
+            ('πŸ†', 'Eggplant'),
+            ('πŸ…', 'Tomato'),
+            ('πŸ₯”', 'Potato'),
+        ])
+        | 'To string' >> beam.ToString.Kvs()
+        | beam.Map(print)
+    )
+    # [END tostring_kvs]
+    if test:
+      test(plants)
+
+
+def tostring_element(test=None):
+  # [START tostring_element]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plant_lists = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            ['πŸ“', 'Strawberry', 'perennial'],
+            ['πŸ₯•', 'Carrot', 'biennial'],
+            ['πŸ†', 'Eggplant', 'perennial'],
+            ['πŸ…', 'Tomato', 'annual'],
+            ['πŸ₯”', 'Potato', 'perennial'],
+        ])
+        | 'To string' >> beam.ToString.Element()
+        | beam.Map(print)
+    )
+    # [END tostring_element]
+    if test:
+      test(plant_lists)
+
+
+def tostring_iterables(test=None):
+  # [START tostring_iterables]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants_csv = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            ['πŸ“', 'Strawberry', 'perennial'],
+            ['πŸ₯•', 'Carrot', 'biennial'],
+            ['πŸ†', 'Eggplant', 'perennial'],
+            ['πŸ…', 'Tomato', 'annual'],
+            ['πŸ₯”', 'Potato', 'perennial'],
+        ])
+        | 'To string' >> beam.ToString.Iterables()
+        | beam.Map(print)
+    )
+    # [END tostring_iterables]
+    if test:
+      test(plants_csv)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring_test.py
new file mode 100644
index 0000000..b253ea1
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/tostring_test.py
@@ -0,0 +1,105 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import sys
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import tostring
+
+
+def check_plants(actual):
+  # [START plants]
+  plants = [
+      'πŸ“,Strawberry',
+      'πŸ₯•,Carrot',
+      'πŸ†,Eggplant',
+      'πŸ…,Tomato',
+      'πŸ₯”,Potato',
+  ]
+  # [END plants]
+  assert_that(actual, equal_to(plants))
+
+
+def check_plant_lists(actual):
+  # [START plant_lists]
+  plant_lists = [
+      "['πŸ“', 'Strawberry', 'perennial']",
+      "['πŸ₯•', 'Carrot', 'biennial']",
+      "['πŸ†', 'Eggplant', 'perennial']",
+      "['πŸ…', 'Tomato', 'annual']",
+      "['πŸ₯”', 'Potato', 'perennial']",
+  ]
+  # [END plant_lists]
+
+  # Some unicode characters become escaped with double backslashes.
+  import apache_beam as beam
+
+  def normalize_escaping(elem):
+    # In Python 2 all utf-8 characters are escaped with double backslashes.
+    # TODO: Remove this after Python 2 deprecation.
+    # https://issues.apache.org/jira/browse/BEAM-8124
+    if sys.version_info.major == 2:
+      return elem.decode('string-escape')
+
+    # In Python 3.5 some utf-8 characters are escaped with double backslashes.
+    if '\\' in elem:
+      return bytes(elem, 'utf-8').decode('unicode-escape')
+    return elem
+  actual = actual | beam.Map(normalize_escaping)
+  assert_that(actual, equal_to(plant_lists))
+
+
+def check_plants_csv(actual):
+  # [START plants_csv]
+  plants_csv = [
+      'πŸ“,Strawberry,perennial',
+      'πŸ₯•,Carrot,biennial',
+      'πŸ†,Eggplant,perennial',
+      'πŸ…,Tomato,annual',
+      'πŸ₯”,Potato,perennial',
+  ]
+  # [END plants_csv]
+  assert_that(actual, equal_to(plants_csv))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.tostring.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class ToStringTest(unittest.TestCase):
+  def test_tostring_kvs(self):
+    tostring.tostring_kvs(check_plants)
+
+  def test_tostring_element(self):
+    tostring.tostring_element(check_plant_lists)
+
+  def test_tostring_iterables(self):
+    tostring.tostring_iterables(check_plants_csv)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values.py
new file mode 100644
index 0000000..8504ff4
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values.py
@@ -0,0 +1,42 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def values(test=None):
+  # [START values]
+  import apache_beam as beam
+
+  with beam.Pipeline() as pipeline:
+    plants = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            ('πŸ“', 'Strawberry'),
+            ('πŸ₯•', 'Carrot'),
+            ('πŸ†', 'Eggplant'),
+            ('πŸ…', 'Tomato'),
+            ('πŸ₯”', 'Potato'),
+        ])
+        | 'Values' >> beam.Values()
+        | beam.Map(print)
+    )
+    # [END values]
+    if test:
+      test(plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values_test.py
new file mode 100644
index 0000000..06abef6
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/values_test.py
@@ -0,0 +1,56 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import values
+
+
+def check_plants(actual):
+  # [START plants]
+  plants = [
+      'Strawberry',
+      'Carrot',
+      'Eggplant',
+      'Tomato',
+      'Potato',
+  ]
+  # [END plants]
+  assert_that(actual, equal_to(plants))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.values.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class ValuesTest(unittest.TestCase):
+  def test_values(self):
+    values.values(check_plants)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py
new file mode 100644
index 0000000..d1e2e3f
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py
@@ -0,0 +1,128 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def withtimestamps_event_time(test=None):
+  # [START withtimestamps_event_time]
+  import apache_beam as beam
+
+  class GetTimestamp(beam.DoFn):
+    def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+      yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
+
+  with beam.Pipeline() as pipeline:
+    plant_timestamps = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            {'name': 'Strawberry', 'season': 1585699200}, # April, 2020
+            {'name': 'Carrot', 'season': 1590969600},     # June, 2020
+            {'name': 'Artichoke', 'season': 1583020800},  # March, 2020
+            {'name': 'Tomato', 'season': 1588291200},     # May, 2020
+            {'name': 'Potato', 'season': 1598918400},     # September, 2020
+        ])
+        | 'With timestamps' >> beam.Map(
+            lambda plant: beam.window.TimestampedValue(plant, plant['season']))
+        | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+        | beam.Map(print)
+    )
+    # [END event_time]
+    if test:
+      test(plant_timestamps)
+
+
+def withtimestamps_logical_clock(test=None):
+  # [START withtimestamps_logical_clock]
+  import apache_beam as beam
+
+  class GetTimestamp(beam.DoFn):
+    def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+      event_id = int(timestamp.micros / 1e6)  # equivalent to seconds
+      yield '{} - {}'.format(event_id, plant['name'])
+
+  with beam.Pipeline() as pipeline:
+    plant_events = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            {'name': 'Strawberry', 'event_id': 1},
+            {'name': 'Carrot', 'event_id': 4},
+            {'name': 'Artichoke', 'event_id': 2},
+            {'name': 'Tomato', 'event_id': 3},
+            {'name': 'Potato', 'event_id': 5},
+        ])
+        | 'With timestamps' >> beam.Map(lambda plant: \
+            beam.window.TimestampedValue(plant, plant['event_id']))
+        | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+        | beam.Map(print)
+    )
+    # [END logical_clock]
+    if test:
+      test(plant_events)
+
+
+def withtimestamps_processing_time(test=None):
+  # [START withtimestamps_processing_time]
+  import apache_beam as beam
+  import time
+
+  class GetTimestamp(beam.DoFn):
+    def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+      yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
+
+  with beam.Pipeline() as pipeline:
+    plant_processing_times = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            {'name': 'Strawberry'},
+            {'name': 'Carrot'},
+            {'name': 'Artichoke'},
+            {'name': 'Tomato'},
+            {'name': 'Potato'},
+        ])
+        | 'With timestamps' >> beam.Map(lambda plant: \
+            beam.window.TimestampedValue(plant, time.time()))
+        | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+        | beam.Map(print)
+    )
+    # [END processing_time]
+    if test:
+      test(plant_processing_times)
+
+
+def time_tuple2unix_time():
+  # [START time_tuple2unix_time]
+  import time
+
+  time_tuple = time.strptime('2020-03-19 20:50:00', '%Y-%m-%d %H:%M:%S')
+  unix_time = time.mktime(time_tuple)
+  # [END time_tuple2unix_time]
+  return unix_time
+
+
+def datetime2unix_time():
+  # [START datetime2unix_time]
+  import time
+  import datetime
+
+  now = datetime.datetime.now()
+  time_tuple = now.timetuple()
+  unix_time = time.mktime(time_tuple)
+  # [END datetime2unix_time]
+  return unix_time
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps_test.py
new file mode 100644
index 0000000..53fa7e2
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps_test.py
@@ -0,0 +1,103 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from . import withtimestamps
+
+
+def check_plant_timestamps(actual):
+  # [START plant_timestamps]
+  plant_timestamps = [
+      '2020-04-01 00:00:00 - Strawberry',
+      '2020-06-01 00:00:00 - Carrot',
+      '2020-03-01 00:00:00 - Artichoke',
+      '2020-05-01 00:00:00 - Tomato',
+      '2020-09-01 00:00:00 - Potato',
+  ]
+  # [END plant_timestamps]
+  assert_that(actual, equal_to(plant_timestamps))
+
+
+def check_plant_events(actual):
+  # [START plant_events]
+  plant_events = [
+      '1 - Strawberry',
+      '4 - Carrot',
+      '2 - Artichoke',
+      '3 - Tomato',
+      '5 - Potato',
+  ]
+  # [END plant_events]
+  assert_that(actual, equal_to(plant_events))
+
+
+def check_plant_processing_times(actual):
+  import apache_beam as beam
+
+  # [START plant_processing_times]
+  plant_processing_times = [
+      '2020-03-20 20:12:42.145594 - Strawberry',
+      '2020-03-20 20:12:42.145827 - Carrot',
+      '2020-03-20 20:12:42.145962 - Artichoke',
+      '2020-03-20 20:12:42.146093 - Tomato',
+      '2020-03-20 20:12:42.146216 - Potato',
+  ]
+  # [END plant_processing_times]
+
+  # Since `time.time()` will always give something different, we'll
+  # simply strip the timestamp information before testing the results.
+  actual = actual | beam.Map(lambda row: row.split('-')[-1].strip())
+  expected = [row.split('-')[-1].strip() for row in plant_processing_times]
+  assert_that(actual, equal_to(expected))
+
+
+@mock.patch('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
+@mock.patch('apache_beam.examples.snippets.transforms.elementwise.withtimestamps.print', lambda elem: elem)
+# pylint: enable=line-too-long
+class WithTimestampsTest(unittest.TestCase):
+  def test_event_time(self):
+    withtimestamps.withtimestamps_event_time(check_plant_timestamps)
+
+  def test_logical_clock(self):
+    withtimestamps.withtimestamps_logical_clock(check_plant_events)
+
+  def test_processing_time(self):
+    withtimestamps.withtimestamps_processing_time(check_plant_processing_times)
+
+  def test_time_tuple2unix_time(self):
+    unix_time = withtimestamps.time_tuple2unix_time()
+    self.assertIsInstance(unix_time, float)
+
+  def test_datetime2unix_time(self):
+    unix_time = withtimestamps.datetime2unix_time()
+    self.assertIsInstance(unix_time, float)
+
+
+if __name__ == '__main__':
+  unittest.main()


Mime
View raw message