beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-1226] Add support for well known coder types to Apache Beam python SDK
Date Wed, 28 Dec 2016 22:28:17 GMT
Repository: beam
Updated Branches:
  refs/heads/python-sdk eff50118b -> 2999e2290


[BEAM-1226] Add support for well known coder types to Apache Beam python SDK

This uses specific cloud object representations for the following types:
kind:pair (TupleCoder with two components, previously pickled)
kind:stream (IterableCoder with a single component, previously ignored)
kind:global_window (GlobalWindowCoder, previously SingletonCoder)
kind:length_prefix (A new type of coder which always encodes the length of the value type
as a prefix, has a single component)
kind:windowed_value (A wrapper coder with two components (value coder and window coder))

This also drops the ability to configure the timestamp coder on WindowedValueCoder.

These changes are towards having a common binary representation for certain well known coders
across multiple languages.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6272e296
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6272e296
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6272e296

Branch: refs/heads/python-sdk
Commit: 6272e296c946311e28f3edc848eaa6caf794ef9d
Parents: eff5011
Author: Luke Cwik <lcwik@google.com>
Authored: Wed Dec 28 13:54:58 2016 -0800
Committer: Luke Cwik <lcwik@google.com>
Committed: Wed Dec 28 13:54:58 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    | 35 +++++--
 sdks/python/apache_beam/coders/coders.py        | 99 +++++++++++++++++---
 .../apache_beam/coders/coders_test_common.py    | 87 ++++++++++++++++-
 .../apache_beam/runners/dataflow_runner.py      |  3 +-
 sdks/python/apache_beam/transforms/window.py    |  4 +-
 .../apache_beam/transforms/window_test.py       |  9 ++
 6 files changed, 208 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 47a837f..fcdc441 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -60,7 +60,7 @@ class CoderImpl(object):
     raise NotImplementedError
 
   def decode(self, encoded):
-    """Encodes an object to an unnested string."""
+    """Decodes an object to an unnested string."""
     raise NotImplementedError
 
   def estimate_size(self, value, nested=False):
@@ -535,7 +535,7 @@ class WindowedValueCoderImpl(StreamCoderImpl):
   """A coder for windowed values."""
 
   def __init__(self, value_coder, timestamp_coder, window_coder):
-    # TODO(robertwb): Do we need the ability to customize timestamp_coder?
+    # TODO(lcwik): Remove the timestamp coder field
     self._value_coder = value_coder
     self._timestamp_coder = timestamp_coder
     self._windows_coder = TupleSequenceCoderImpl(window_coder)
@@ -543,20 +543,15 @@ class WindowedValueCoderImpl(StreamCoderImpl):
   def encode_to_stream(self, value, out, nested):
     wv = value  # type cast
     self._value_coder.encode_to_stream(wv.value, out, True)
-    if isinstance(self._timestamp_coder, TimestampCoderImpl):
-      # Avoid creation of Timestamp object.
-      out.write_bigendian_int64(wv.timestamp_micros)
-    else:
-      self._timestamp_coder.encode_to_stream(wv.timestamp, out, True)
+    # Avoid creation of Timestamp object.
+    out.write_bigendian_int64(wv.timestamp_micros)
     self._windows_coder.encode_to_stream(wv.windows, out, True)
 
   def decode_from_stream(self, in_stream, nested):
     return windowed_value.create(
         self._value_coder.decode_from_stream(in_stream, True),
         # Avoid creation of Timestamp object.
-        in_stream.read_bigendian_int64()
-        if isinstance(self._timestamp_coder, TimestampCoderImpl)
-        else self._timestamp_coder.decode_from_stream(in_stream, True).micros,
+        in_stream.read_bigendian_int64(),
         self._windows_coder.decode_from_stream(in_stream, True))
 
   def get_estimated_size_and_observables(self, value, nested=False):
@@ -577,3 +572,23 @@ class WindowedValueCoderImpl(StreamCoderImpl):
     estimated_size += (
         self._windows_coder.estimate_size(value.windows, nested=True))
     return estimated_size, observables
+
+
+class LengthPrefixCoderImpl(StreamCoderImpl):
+  """Coder which prefixes the length of the encoded object in the stream."""
+
+  def __init__(self, value_coder):
+    self._value_coder = value_coder
+
+  def encode_to_stream(self, value, out, nested):
+    encoded_value = self._value_coder.encode(value)
+    out.write_var_int64(len(encoded_value))
+    out.write(encoded_value)
+
+  def decode_from_stream(self, in_stream, nested):
+    value_length = in_stream.read_var_int64()
+    return self._value_coder.decode(in_stream.read(value_length))
+
+  def estimate_size(self, value, nested=False):
+    value_size = self._value_coder.estimate_size(value)
+    return get_varint_size(value_size) + value_size

http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 1e78b1d..67bbbe6 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -23,6 +23,13 @@ import google.protobuf
 
 from apache_beam.coders import coder_impl
 
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  from stream import get_varint_size
+except ImportError:
+  from slow_stream import get_varint_size
+# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
+
 
 # pylint: disable=wrong-import-order, wrong-import-position
 # Avoid dependencies on the full SDK.
@@ -161,7 +168,8 @@ class Coder(object):
         '@type': serialize_coder(self),
         'component_encodings': list(
             component.as_cloud_object()
-            for component in self._get_component_coders())
+            for component in self._get_component_coders()
+        ),
     }
     return value
 
@@ -501,9 +509,17 @@ class TupleCoder(FastCoder):
     return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
 
   def as_cloud_object(self):
-    value = super(TupleCoder, self).as_cloud_object()
-    value['is_pair_like'] = True
-    return value
+    if self.is_kv_coder():
+      return {
+          '@type': 'kind:pair',
+          'is_pair_like': True,
+          'component_encodings': list(
+              component.as_cloud_object()
+              for component in self._get_component_coders()
+          ),
+      }
+
+    return super(TupleCoder, self).as_cloud_object()
 
   def _get_component_coders(self):
     return self.coders()
@@ -563,6 +579,16 @@ class IterableCoder(FastCoder):
   def is_deterministic(self):
     return self._elem_coder.is_deterministic()
 
+  def as_cloud_object(self):
+    return {
+        '@type': 'kind:stream',
+        'is_stream_like': True,
+        'component_encodings': [self._elem_coder.as_cloud_object()],
+    }
+
+  def value_coder(self):
+    return self._elem_coder
+
   @staticmethod
   def from_type_hint(typehint, registry):
     return IterableCoder(registry.get_coder(typehint.inner_type))
@@ -590,17 +616,27 @@ class WindowCoder(PickleCoder):
     return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
 
 
+class GlobalWindowCoder(SingletonCoder):
+  """Coder for global windows."""
+
+  def __init__(self):
+    from apache_beam.transforms import window
+    super(GlobalWindowCoder, self).__init__(window.GlobalWindow())
+
+  def as_cloud_object(self):
+    return {
+        '@type': 'kind:global_window',
+    }
+
+
 class WindowedValueCoder(FastCoder):
   """Coder for windowed values."""
 
-  def __init__(self, wrapped_value_coder, timestamp_coder=None,
-               window_coder=None):
-    if not timestamp_coder:
-      timestamp_coder = TimestampCoder()
+  def __init__(self, wrapped_value_coder, window_coder=None):
     if not window_coder:
       window_coder = PickleCoder()
     self.wrapped_value_coder = wrapped_value_coder
-    self.timestamp_coder = timestamp_coder
+    self.timestamp_coder = TimestampCoder()
     self.window_coder = window_coder
 
   def _create_impl(self):
@@ -615,12 +651,16 @@ class WindowedValueCoder(FastCoder):
                                               self.window_coder])
 
   def as_cloud_object(self):
-    value = super(WindowedValueCoder, self).as_cloud_object()
-    value['is_wrapper'] = True
-    return value
+    return {
+        '@type': 'kind:windowed_value',
+        'is_wrapper': True,
+        'component_encodings': [
+            component.as_cloud_object()
+            for component in self._get_component_coders()],
+    }
 
   def _get_component_coders(self):
-    return [self.wrapped_value_coder, self.timestamp_coder, self.window_coder]
+    return [self.wrapped_value_coder, self.window_coder]
 
   def is_kv_coder(self):
     return self.wrapped_value_coder.is_kv_coder()
@@ -633,3 +673,36 @@ class WindowedValueCoder(FastCoder):
 
   def __repr__(self):
     return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
+
+
+class LengthPrefixCoder(FastCoder):
+  """Coder which prefixes the length of the encoded object in the stream."""
+
+  def __init__(self, value_coder):
+    self._value_coder = value_coder
+
+  def _create_impl(self):
+    return coder_impl.LengthPrefixCoderImpl(self._value_coder)
+
+  def is_deterministic(self):
+    return self._value_coder.is_deterministic()
+
+  def estimate_size(self, value):
+    value_size = self._value_coder.estimate_size(value)
+    return get_varint_size(value_size) + value_size
+
+  def value_coder(self):
+    return self._value_coder
+
+  def as_cloud_object(self):
+    return {
+        '@type': 'kind:length_prefix',
+        'component_encodings': [self._value_coder.as_cloud_object()],
+    }
+
+  def _get_component_coders(self):
+    return (self._value_coder,)
+
+  def __repr__(self):
+    return 'LengthPrefixCoder[%r]' % self._value_coder
+

http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index bfd4d77..b2bcb96 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -25,6 +25,7 @@ import dill
 
 import coders
 import observable
+from apache_beam.transforms import window
 from apache_beam.utils import timestamp
 from apache_beam.utils import windowed_value
 
@@ -179,11 +180,28 @@ class CodersTest(unittest.TestCase):
         (timestamp.Timestamp.of(27), 'abc'))
 
   def test_tuple_coder(self):
+    kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
+    # Verify cloud object representation
+    self.assertEqual(
+        {
+            '@type': 'kind:pair',
+            'is_pair_like': True,
+            'component_encodings': [
+                coders.VarIntCoder().as_cloud_object(),
+                coders.BytesCoder().as_cloud_object()],
+        },
+        kv_coder.as_cloud_object())
+    # Test binary representation
+    self.assertEqual(
+        '\x04\x03abc',
+        kv_coder.encode((4, 'abc')))
+    # Test unnested
     self.check_coder(
-        coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())),
+        kv_coder,
         (1, 'a'),
         (-2, 'a' * 100),
         (300, 'abc\0' * 5))
+    # Test nested
     self.check_coder(
         coders.TupleCoder(
             (coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())),
@@ -206,18 +224,47 @@ class CodersTest(unittest.TestCase):
     self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0')
 
   def test_iterable_coder(self):
-    self.check_coder(coders.IterableCoder(coders.VarIntCoder()),
+    iterable_coder = coders.IterableCoder(coders.VarIntCoder())
+    # Verify cloud object representation
+    self.assertEqual(
+        {
+            '@type': 'kind:stream',
+            'is_stream_like': True,
+            'component_encodings': [coders.VarIntCoder().as_cloud_object()]
+        },
+        iterable_coder.as_cloud_object())
+    # Test unnested
+    self.check_coder(iterable_coder,
                      [1], [-1, 0, 100])
+    # Test nested
     self.check_coder(
         coders.TupleCoder((coders.VarIntCoder(),
                            coders.IterableCoder(coders.VarIntCoder()))),
         (1, [1, 2, 3]))
 
   def test_windowed_value_coder(self):
+    coder = coders.WindowedValueCoder(coders.VarIntCoder(),
+                                      coders.GlobalWindowCoder())
+    # Verify cloud object representation
+    self.assertEqual(
+        {
+            '@type': 'kind:windowed_value',
+            'is_wrapper': True,
+            'component_encodings': [
+                coders.VarIntCoder().as_cloud_object(),
+                coders.GlobalWindowCoder().as_cloud_object(),
+            ],
+        },
+        coder.as_cloud_object())
+    # Test binary representation
+    self.assertEqual('\x01\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01',
+                     coder.encode(window.GlobalWindows.windowed_value(1)))
+    # Test unnested
     self.check_coder(
         coders.WindowedValueCoder(coders.VarIntCoder()),
         windowed_value.WindowedValue(3, -100, ()),
         windowed_value.WindowedValue(-1, 100, (1, 2, 3)))
+    # Test nested
     self.check_coder(
         coders.TupleCoder((
             coders.WindowedValueCoder(coders.FloatCoder()),
@@ -241,6 +288,42 @@ class CodersTest(unittest.TestCase):
     self.check_coder(coders.TupleCoder((proto_coder, coders.BytesCoder())),
                      (ma, 'a'), (mb, 'b'))
 
+  def test_global_window_coder(self):
+    coder = coders.GlobalWindowCoder()
+    value = window.GlobalWindow()
+    # Verify cloud object representation
+    self.assertEqual({'@type': 'kind:global_window'},
+                     coder.as_cloud_object())
+    # Test binary representation
+    self.assertEqual('', coder.encode(value))
+    self.assertEqual(value, coder.decode(''))
+    # Test unnested
+    self.check_coder(coder, value)
+    # Test nested
+    self.check_coder(coders.TupleCoder((coder, coder)),
+                     (value, value))
+
+  def test_length_prefix_coder(self):
+    coder = coders.LengthPrefixCoder(coders.BytesCoder())
+    # Verify cloud object representation
+    self.assertEqual(
+        {
+            '@type': 'kind:length_prefix',
+            'component_encodings': [coders.BytesCoder().as_cloud_object()]
+        },
+        coder.as_cloud_object())
+    # Test binary representation
+    self.assertEqual('\x00', coder.encode(''))
+    self.assertEqual('\x01a', coder.encode('a'))
+    self.assertEqual('\x02bc', coder.encode('bc'))
+    self.assertEqual('\xff\x7f' + 'z' * 16383, coder.encode('z' * 16383))
+    # Test unnested
+    self.check_coder(coder, '', 'a', 'bc', 'def')
+    # Test nested
+    self.check_coder(coders.TupleCoder((coder, coder)),
+                     ('', 'a'),
+                     ('bc', 'def'))
+
   def test_nested_observables(self):
     class FakeObservableIterator(observable.ObservableMixin):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 392a166..3ee95c5 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -206,8 +206,7 @@ class DataflowRunner(PipelineRunner):
     if window_coder:
       return coders.WindowedValueCoder(
           coders.registry.get_coder(typehint),
-          coders.TimestampCoder(),
-          window_coder)
+          window_coder=window_coder)
     else:
       return coders.registry.get_coder(typehint)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 9485032..70759e0 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -111,7 +111,7 @@ class WindowFn(object):
     raise NotImplementedError
 
   def get_window_coder(self):
-    return coders.PickleCoder()
+    return coders.WindowCoder()
 
   def get_transformed_output_time(self, window, input_timestamp):  # pylint: disable=unused-argument
     """Given input time and output window, returns output time for window.
@@ -240,7 +240,7 @@ class GlobalWindows(WindowFn):
     pass  # No merging.
 
   def get_window_coder(self):
-    return coders.SingletonCoder(GlobalWindow())
+    return coders.GlobalWindowCoder()
 
   def __hash__(self):
     return hash(type(self))

http://git-wip-us.apache.org/repos/asf/beam/blob/6272e296/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 6c3c98e..856d011 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -28,6 +28,8 @@ from apache_beam.transforms import GroupByKey
 from apache_beam.transforms import Map
 from apache_beam.transforms import window
 from apache_beam.transforms import WindowInto
+from apache_beam.transforms.timeutil import MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import MIN_TIMESTAMP
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import IntervalWindow
@@ -55,6 +57,13 @@ reify_windows = core.ParDo(ReifyWindowsFn())
 
 class WindowTest(unittest.TestCase):
 
+  def test_global_window(self):
+    self.assertEqual(window.GlobalWindow(), window.GlobalWindow())
+    self.assertNotEqual(window.GlobalWindow(),
+                        window.IntervalWindow(MIN_TIMESTAMP, MAX_TIMESTAMP))
+    self.assertNotEqual(window.IntervalWindow(MIN_TIMESTAMP, MAX_TIMESTAMP),
+                        window.GlobalWindow())
+
   def test_fixed_windows(self):
     # Test windows with offset: 2, 7, 12, 17, ...
     windowfn = window.FixedWindows(size=5, offset=2)


Mime
View raw message