Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 41bf6f8b4 -> b2728cf13
Assert transform without side inputs
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c795c10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c795c10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c795c10
Branch: refs/heads/python-sdk
Commit: 1c795c10f0017766462cced1db1295c50bf2e32f
Parents: 41bf6f8
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Tue Dec 27 11:09:26 2016 -0800
Committer: Vikas Kedigehalli <vikasrk@google.com>
Committed: Tue Dec 27 11:09:26 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/util.py | 24 +++++++++++++-----------
1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c795c10/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 9815996..ad63a02 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -20,10 +20,12 @@
from __future__ import absolute_import
-from apache_beam.pvalue import AsList
-from apache_beam.transforms import core
from apache_beam.transforms import window
-from apache_beam.transforms.core import CombinePerKey, Create, Flatten, GroupByKey, Map
+from apache_beam.transforms.core import CombinePerKey
+from apache_beam.transforms.core import Flatten
+from apache_beam.transforms.core import GroupByKey
+from apache_beam.transforms.core import Map
+from apache_beam.transforms.core import WindowInto
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.ptransform import ptransform_fn
@@ -217,17 +219,17 @@ def assert_that(actual, matcher, label='assert_that'):
Ignored.
"""
- def match(_, actual):
- matcher(actual)
-
class AssertThat(PTransform):
- def expand(self, pipeline):
- return pipeline | 'singleton' >> Create([None]) | Map(
- match,
- AsList(actual | core.WindowInto(window.GlobalWindows())))
+ def expand(self, pcoll):
+ return (pcoll
+ | WindowInto(window.GlobalWindows())
+ | "ToVoidKey" >> Map(lambda v: (None, v))
+ | "Group" >> GroupByKey()
+ | "UnKey" >> Map(lambda (k, v): v)
+ | "Match" >> Map(matcher))
def default_label(self):
return label
- actual.pipeline | AssertThat() # pylint: disable=expression-not-assigned
+ actual | AssertThat() # pylint: disable=expression-not-assigned
|