beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [beam] branch master updated: [BEAM-7121] Add deterministic proto coder (#8377)
Date Wed, 15 May 2019 08:59:45 GMT
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 98a4d2b  [BEAM-7121] Add deterministic proto coder (#8377)
98a4d2b is described below

commit 98a4d2bffadf8ad540a45b141270f8b00bba9c6e
Author: Yifan Mai <yifanmai@google.com>
AuthorDate: Wed May 15 01:59:33 2019 -0700

    [BEAM-7121] Add deterministic proto coder (#8377)
---
 sdks/python/apache_beam/coders/coder_impl.py       |  7 +++++
 sdks/python/apache_beam/coders/coders.py           | 22 ++++++++++++++++
 sdks/python/apache_beam/coders/coders_test.py      | 28 ++++++++++++++++++++
 .../apache_beam/coders/coders_test_common.py       |  1 +
 .../apache_beam/tools/coders_microbenchmark.py     | 30 ++++++++++++++++++++++
 5 files changed, 88 insertions(+)

diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 1895fd3..45e9b4a 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -282,6 +282,13 @@ class ProtoCoderImpl(SimpleCoderImpl):
     return proto_message
 
 
+class DeterministicProtoCoderImpl(ProtoCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
+
+  def encode(self, value):
+    return value.SerializeToString(deterministic=True)
+
+
 UNKNOWN_TYPE = 0xFF
 NONE_TYPE = 0
 INT_TYPE = 1
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 3d47d85..df3ac6f 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -751,6 +751,9 @@ class ProtoCoder(FastCoder):
     # a Map.
     return False
 
+  def as_deterministic_coder(self, step_label, error_message=None):
+    return DeterministicProtoCoder(self.proto_message_type)
+
   def __eq__(self, other):
     return (type(self) == type(other)
             and self.proto_message_type == other.proto_message_type)
@@ -767,6 +770,25 @@ class ProtoCoder(FastCoder):
                         ', but got a %s' % typehint))
 
 
+class DeterministicProtoCoder(ProtoCoder):
+  """A deterministic Coder for Google Protocol Buffers.
+
+  It supports both Protocol Buffers syntax versions 2 and 3. However,
+  the runtime version of the python protobuf library must exactly match the
+  version of the protoc compiler what was used to generate the protobuf
+  messages.
+  """
+
+  def _create_impl(self):
+    return coder_impl.DeterministicProtoCoderImpl(self.proto_message_type)
+
+  def is_deterministic(self):
+    return True
+
+  def as_deterministic_coder(self, step_label, error_message=None):
+    return self
+
+
 class TupleCoder(FastCoder):
   """Coder of tuple objects."""
 
diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py
index dcbdb73..c37df18 100644
--- a/sdks/python/apache_beam/coders/coders_test.py
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -84,6 +84,34 @@ class ProtoCoderTest(unittest.TestCase):
     self.assertEqual(ma, real_coder.decode(real_coder.encode(ma)))
 
 
+class DeterministicProtoCoderTest(unittest.TestCase):
+
+  def test_deterministic_proto_coder(self):
+    ma = test_message.MessageA()
+    mb = ma.field2.add()
+    mb.field1 = True
+    ma.field1 = u'hello world'
+    expected_coder = coders.DeterministicProtoCoder(ma.__class__)
+    real_coder = (coders_registry.get_coder(ma.__class__)
+                  .as_deterministic_coder(step_label='unused'))
+    self.assertTrue(real_coder.is_deterministic())
+    self.assertEqual(expected_coder, real_coder)
+    self.assertEqual(real_coder.encode(ma), expected_coder.encode(ma))
+    self.assertEqual(ma, real_coder.decode(real_coder.encode(ma)))
+
+  def test_deterministic_proto_coder_determinism(self):
+    for _ in range(10):
+      keys = list(range(20))
+      mm_forward = test_message.MessageWithMap()
+      for key in keys:
+        mm_forward.field1[str(key)].field1 = str(key)
+      mm_reverse = test_message.MessageWithMap()
+      for key in reversed(keys):
+        mm_reverse.field1[str(key)].field1 = str(key)
+      coder = coders.DeterministicProtoCoder(mm_forward.__class__)
+      self.assertEqual(coder.encode(mm_forward), coder.encode(mm_reverse))
+
+
 class DummyClass(object):
   """A class with no registered coder."""
   def __init__(self):
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 4dd1cf1..94dfe8a 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -68,6 +68,7 @@ class CodersTest(unittest.TestCase):
                    if isinstance(c, type) and issubclass(c, coders.Coder) and
                    'Base' not in c.__name__)
     standard -= set([coders.Coder,
+                     coders.DeterministicProtoCoder,
                      coders.FastCoder,
                      coders.ProtoCoder,
                      coders.RunnerAPICoderHolder,
diff --git a/sdks/python/apache_beam/tools/coders_microbenchmark.py b/sdks/python/apache_beam/tools/coders_microbenchmark.py
index 695bdd1..29bdd0e 100644
--- a/sdks/python/apache_beam/tools/coders_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/coders_microbenchmark.py
@@ -39,6 +39,7 @@ import sys
 
 from past.builtins import unicode
 
+from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 from apache_beam.coders import coders
 from apache_beam.tools import utils
 from apache_beam.transforms import window
@@ -131,6 +132,23 @@ def large_iterable():
     yield k
 
 
+def random_message_with_map(size):
+  message = test_message.MessageWithMap()
+  keys = list_int(size)
+  random.shuffle(keys)
+  for key in keys:
+    message.field1[str(key)].field1 = small_string()
+  return message
+
+
+def small_message_with_map():
+  return random_message_with_map(5)
+
+
+def large_message_with_map():
+  return random_message_with_map(20)
+
+
 def globally_windowed_value():
   return windowed_value.WindowedValue(
       value=small_int(),
@@ -199,6 +217,18 @@ def run_coder_benchmarks(
           coders.FastPrimitivesCoder(),
           large_dict),
       coder_benchmark_factory(
+          coders.ProtoCoder(test_message.MessageWithMap),
+          small_message_with_map),
+      coder_benchmark_factory(
+          coders.ProtoCoder(test_message.MessageWithMap),
+          large_message_with_map),
+      coder_benchmark_factory(
+          coders.DeterministicProtoCoder(test_message.MessageWithMap),
+          small_message_with_map),
+      coder_benchmark_factory(
+          coders.DeterministicProtoCoder(test_message.MessageWithMap),
+          large_message_with_map),
+      coder_benchmark_factory(
           coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
           wv_with_one_window),
       coder_benchmark_factory(


Mime
View raw message