beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] incubator-beam git commit: Fix a couple more coder vs. element-coder changes for element sizing.
Date Tue, 01 Nov 2016 07:06:38 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b33fdfdad -> 7e870613f


Fix a couple more coder vs. element-coder changes for element sizing.

These are not actually correct, but are needed to support the
Dataflow service that was built to work with code before PR 1224.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/beb28453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/beb28453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/beb28453

Branch: refs/heads/python-sdk
Commit: beb284539608cce3800ef77741a7d7cb9766b490
Parents: b33fdfd
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Mon Oct 31 22:31:08 2016 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Mon Oct 31 22:31:08 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/beb28453/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 1ff3a44..40fc1fd 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -152,6 +152,14 @@ class CallbackCoderImpl(CoderImpl):
   def estimate_size(self, value, nested=False):
     return self._get_nested_size(self._size_estimator(value), nested)
 
+  def get_estimated_size_and_observables(self, value, nested=False):
+    # TODO(robertwb): Remove this once all coders are correct.
+    if isinstance(value, observable.ObservableMixin):
+      # CallbackCoderImpl can presumably encode the elements too.
+      return 1, [(value, self)]
+    else:
+      return self.estimate_size(value, nested), []
+
 
 class DeterministicPickleCoderImpl(CoderImpl):
 
@@ -523,6 +531,10 @@ class WindowedValueCoderImpl(StreamCoderImpl):
 
   def get_estimated_size_and_observables(self, value, nested=False):
     """Returns estimated size of value along with any nested observables."""
+    if isinstance(value, observable.ObservableMixin):
+      # Should never be here.
+      # TODO(robertwb): Remove when coders are set correctly.
+      return 0, [(value, self._value_coder)]
     estimated_size = 0
     observables = []
     value_estimated_size, value_observables = (


Mime
View raw message