beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: [BEAM-1283] Finish bundle should only emit windowed values
Date Tue, 09 May 2017 16:48:42 GMT
Repository: beam
Updated Branches:
  refs/heads/master 395d14e33 -> 9575694ca


[BEAM-1283] Finish bundle should only emit windowed values


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf88605
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf88605
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf88605

Branch: refs/heads/master
Commit: 3bf886056680ca50e11d36dc0da402bb0196a7c7
Parents: 395d14e
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Mon May 1 15:00:24 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Tue May 9 09:48:03 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets_test.py     | 10 ++++++++--
 sdks/python/apache_beam/io/iobase.py                   |  4 +++-
 sdks/python/apache_beam/runners/common.py              | 13 ++-----------
 sdks/python/apache_beam/transforms/ptransform_test.py  |  8 +++++---
 4 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3bf88605/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 0148096..da0a962 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -33,6 +33,7 @@ from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.examples.snippets import snippets
+from apache_beam.utils.windowed_value import WindowedValue
 
 # pylint: disable=expression-not-assigned
 from apache_beam.test_pipeline import TestPipeline
@@ -366,6 +367,7 @@ class TypeHintsTest(unittest.TestCase):
 
 class SnippetsTest(unittest.TestCase):
   # Replacing text read/write transforms with dummy transforms for testing.
+
   class DummyReadTransform(beam.PTransform):
     """A transform that will replace iobase.ReadFromText.
 
@@ -387,16 +389,20 @@ class SnippetsTest(unittest.TestCase):
         pass
 
       def finish_bundle(self):
+        from apache_beam.transforms import window
+
         assert self.file_to_read
         for file_name in glob.glob(self.file_to_read):
           if self.compression_type is None:
             with open(file_name) as file:
               for record in file:
-                yield self.coder.decode(record.rstrip('\n'))
+                value = self.coder.decode(record.rstrip('\n'))
+                yield WindowedValue(value, -1, [window.GlobalWindow()])
           else:
             with gzip.open(file_name, 'r') as file:
               for record in file:
-                yield self.coder.decode(record.rstrip('\n'))
+                value = self.coder.decode(record.rstrip('\n'))
+                yield WindowedValue(value, -1, [window.GlobalWindow()])
 
     def expand(self, pcoll):
       return pcoll | beam.Create([None]) | 'DummyReadForTesting' >> beam.ParDo(

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf88605/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 312542a..d47ef5b 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -44,6 +44,7 @@ from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.utils.windowed_value import WindowedValue
 
 
 # Encapsulates information about a bundle of a source generated when method
@@ -931,7 +932,8 @@ class _WriteBundleDoFn(core.DoFn):
 
   def finish_bundle(self):
     if self.writer is not None:
-      yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP)
+      yield WindowedValue(self.writer.close(), window.MAX_TIMESTAMP,
+                          [window.GlobalWindow()])
 
 
 class _WriteKeyedBundleDoFn(core.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf88605/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 74c61ab..ec1f5dc 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -487,18 +487,9 @@ class OutputProcessor(object):
 
       if isinstance(result, WindowedValue):
         windowed_value = result
-      elif isinstance(result, TimestampedValue):
-        value = result.value
-        timestamp = result.timestamp
-        assign_context = NoContext(value, timestamp)
-        windowed_value = WindowedValue(
-            value, timestamp, self.window_fn.assign(assign_context))
       else:
-        value = result
-        timestamp = -1
-        assign_context = NoContext(value)
-        windowed_value = WindowedValue(
-            value, timestamp, self.window_fn.assign(assign_context))
+        raise RuntimeError('Finish Bundle should only output WindowedValue ' +\
+                           'type but got %s' % type(result))
 
       if tag is None:
         self.main_receivers.receive(windowed_value)

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf88605/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index e712661..5948460 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -30,8 +30,10 @@ import apache_beam as beam
 from apache_beam.metrics import Metrics
 from apache_beam.metrics.metric import MetricsFilter
 from apache_beam.io.iobase import Read
-from apache_beam.test_pipeline import TestPipeline
+from apache_beam.options.pipeline_options import TypeOptions
 import apache_beam.pvalue as pvalue
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.transforms import window
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
@@ -40,7 +42,7 @@ import apache_beam.typehints as typehints
 from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types
 from apache_beam.typehints.typehints_test import TypeHintTestCase
-from apache_beam.options.pipeline_options import TypeOptions
+from apache_beam.utils.windowed_value import WindowedValue
 
 
 # Disable frequent lint warning due to pipe operator for chaining transforms.
@@ -280,7 +282,7 @@ class PTransformTest(unittest.TestCase):
         pass
 
       def finish_bundle(self):
-        yield 'finish'
+        yield WindowedValue('finish', -1, [window.GlobalWindow()])
 
     pipeline = TestPipeline()
     pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])


Mime
View raw message