beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Support unkown length iterables for IterableCoder in python SDK
Date Tue, 28 Feb 2017 08:08:56 GMT
Repository: beam
Updated Branches:
  refs/heads/master 16736a6b6 -> 13db84bb0


Support unkown length iterables for IterableCoder in python SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/06535afa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/06535afa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/06535afa

Branch: refs/heads/master
Commit: 06535afa29fd922a0e52724fbb8f4dc7671b46ee
Parents: 16736a6
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Wed Feb 22 19:36:33 2017 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Feb 28 00:08:42 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/fn/v1/standard_coders.yaml  | 15 ++++
 .../beam/sdk/coders/IterableLikeCoder.java      |  6 +-
 .../apache/beam/sdk/coders/CommonCoderTest.java | 56 ++++++++++++++-
 sdks/python/apache_beam/coders/coder_impl.py    | 72 ++++++++++++++++++--
 .../apache_beam/coders/coders_test_common.py    | 20 ++++++
 sdks/python/apache_beam/coders/slow_stream.py   |  3 +
 .../apache_beam/coders/standard_coders_test.py  | 13 ++--
 sdks/python/apache_beam/coders/stream.pxd       |  4 +-
 sdks/python/apache_beam/coders/stream.pyx       | 21 +++---
 .../apache_beam/tests/data/standard_coders.yaml | 16 +++++
 10 files changed, 198 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
index 172b6d8..f37b2d3 100644
--- a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
+++ b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
@@ -33,6 +33,8 @@
 #
 # It is expected that future work will move the `coder` field into a format that it would
be
 # represented by the Runner API, so that it can be understood by all SDKs and harnesses.
+#
+# If a coder is marked non-deterministic in the coder spec, then only the decoding should
be validated.
 
 
 coder:
@@ -131,6 +133,19 @@ examples:
 
 coder:
   urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"}]
+  # This is for iterables of unknown length, where the encoding is not
+  # deterministic.
+  non_deterministic: True
+examples:
+  "\u00ff\u00ff\u00ff\u00ff\u0000": []
+  "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
+  "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
   components: [{urn: "urn:beam:coders:global_window:0.1"}]
 examples:
   "\0\0\0\u0001": [""]

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index da64a93..61402ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -211,9 +211,9 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
           elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
         }
       } else {
-        // TODO: Update to use an accurate count depending on size and count, currently we
-        // are under estimating the size by up to 10 bytes per block of data since we are
-        // not encoding the count prefix which occurs at most once per 64k of data and is
upto
+        // TODO: (BEAM-1537) Update to use an accurate count depending on size and count,
+        // currently we are under estimating the size by up to 10 bytes per block of data
since we
+        // are not encoding the count prefix which occurs at most once per 64k of data and
is upto
         // 10 bytes long. Since we include the total count we can upper bound the underestimate
         // to be 10 / 65536 ~= 0.0153% of the actual size.
         observer.update(4L);

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
index 660d608..1db7a2b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
@@ -21,6 +21,9 @@ import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -40,6 +43,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -83,13 +87,16 @@ public class CommonCoderTest {
   abstract static class CommonCoder {
     abstract String getUrn();
     abstract List<CommonCoder> getComponents();
+    abstract Boolean getNonDeterministic();
     @JsonCreator
     static CommonCoder create(
         @JsonProperty("urn") String urn,
-        @JsonProperty("components") @Nullable List<CommonCoder> components) {
+        @JsonProperty("components") @Nullable List<CommonCoder> components,
+        @JsonProperty("non_deterministic") @Nullable Boolean nonDeterministic) {
       return new AutoValue_CommonCoderTest_CommonCoder(
           checkNotNull(urn, "urn"),
-          firstNonNull(components, Collections.<CommonCoder>emptyList()));
+          firstNonNull(components, Collections.<CommonCoder>emptyList()),
+          firstNonNull(nonDeterministic, Boolean.FALSE));
     }
   }
 
@@ -280,7 +287,50 @@ public class CommonCoderTest {
     Object testValue = convertValue(testSpec.getValue(), testSpec.getCoder(), coder);
     Context context = testSpec.getNested() ? Context.NESTED : Context.OUTER;
     byte[] encoded = CoderUtils.encodeToByteArray(coder, testValue, context);
-    assertThat(testSpec.toString(), encoded, equalTo(testSpec.getSerialized()));
+    Object decodedValue = CoderUtils.decodeFromByteArray(coder, testSpec.getSerialized(),
context);
+
+    if (!testSpec.getCoder().getNonDeterministic()) {
+      assertThat(testSpec.toString(), encoded, equalTo(testSpec.getSerialized()));
+    }
+    verifyDecodedValue(testSpec.getCoder(), decodedValue, testValue);
+  }
+
+  private void verifyDecodedValue(CommonCoder coder, Object expectedValue, Object actualValue)
{
+    switch (coder.getUrn()) {
+      case "urn:beam:coders:bytes:0.1":
+        assertThat(expectedValue, equalTo(actualValue));
+        break;
+      case "urn:beam:coders:kv:0.1":
+        assertThat(actualValue, instanceOf(KV.class));
+        verifyDecodedValue(coder.getComponents().get(0),
+            ((KV) expectedValue).getKey(), ((KV) actualValue).getKey());
+        verifyDecodedValue(coder.getComponents().get(0),
+            ((KV) expectedValue).getValue(), ((KV) actualValue).getValue());
+        break;
+      case "urn:beam:coders:varint:0.1":
+        assertEquals(expectedValue, actualValue);
+        break;
+      case "urn:beam:coders:interval_window:0.1":
+        assertEquals(expectedValue, actualValue);
+        break;
+      case "urn:beam:coders:stream:0.1":
+        assertThat(actualValue, instanceOf(Iterable.class));
+        CommonCoder componentCoder = coder.getComponents().get(0);
+        Iterator<Object> expectedValueIterator = ((Iterable<Object>) expectedValue).iterator();
+        for (Object value: (Iterable<Object>) actualValue) {
+          verifyDecodedValue(componentCoder, expectedValueIterator.next(), value);
+        }
+        assertFalse(expectedValueIterator.hasNext());
+        break;
+      case "urn:beam:coders:global_window:0.1":
+        assertEquals(expectedValue, actualValue);
+        break;
+      case "urn:beam:coders:windowed_value:0.1":
+        assertEquals(expectedValue, actualValue);
+        break;
+      default:
+        throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 2dbfae7..137d1be 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -509,7 +509,29 @@ class TupleCoderImpl(AbstractComponentCoderImpl):
 
 
 class SequenceCoderImpl(StreamCoderImpl):
-  """A coder for sequences of known length."""
+  """A coder for sequences.
+
+  If the length of the sequence in known we encode the length as a 32 bit
+  ``int`` followed by the encoded bytes.
+
+  If the length of the sequence is unknown, we encode the length as ``-1``
+  followed by the encoding of elements buffered up to 64K bytes before prefixing
+  the count of number of elements. A ``0`` is encoded at the end to indicate the
+  end of stream.
+
+  The resulting encoding would look like this::
+
+    -1
+    countA element(0) element(1) ... element(countA - 1)
+    countB element(0) element(1) ... element(countB - 1)
+    ...
+    countX element(0) element(1) ... element(countX - 1)
+    0
+
+  """
+
+  # Default buffer size of 64kB of handling iterables of unknown length.
+  _DEFAULT_BUFFER_SIZE = 64 * 1024
 
   def __init__(self, elem_coder):
     self._elem_coder = elem_coder
@@ -519,15 +541,46 @@ class SequenceCoderImpl(StreamCoderImpl):
 
   def encode_to_stream(self, value, out, nested):
     # Compatible with Java's IterableLikeCoder.
-    out.write_bigendian_int32(len(value))
-    for elem in value:
-      self._elem_coder.encode_to_stream(elem, out, True)
+    if hasattr(value, '__len__'):
+      out.write_bigendian_int32(len(value))
+      for elem in value:
+        self._elem_coder.encode_to_stream(elem, out, True)
+    else:
+      # We don't know the size without traversing it so use a fixed size buffer
+      # and encode as many elements as possible into it before outputting
+      # the size followed by the elements.
+
+      # -1 to indicate that the length is not known.
+      out.write_bigendian_int32(-1)
+      buffer = create_OutputStream()
+      prev_index = index = -1
+      for index, elem in enumerate(value):
+        self._elem_coder.encode_to_stream(elem, buffer, True)
+        if out.size() > self._DEFAULT_BUFFER_SIZE:
+          out.write_var_int64(index - prev_index)
+          out.write(buffer.get())
+          prev_index = index
+          buffer = create_OutputStream()
+      if index > prev_index:
+        out.write_var_int64(index - prev_index)
+        out.write(buffer.get())
+      out.write_var_int64(0)
 
   def decode_from_stream(self, in_stream, nested):
     size = in_stream.read_bigendian_int32()
-    return self._construct_from_sequence(
-        [self._elem_coder.decode_from_stream(in_stream, True)
-         for _ in range(size)])
+
+    if size >= 0:
+      elements = [self._elem_coder.decode_from_stream(in_stream, True)
+                  for _ in range(size)]
+    else:
+      elements = []
+      count = in_stream.read_var_int64()
+      while count > 0:
+        for _ in range(count):
+          elements.append(self._elem_coder.decode_from_stream(in_stream, True))
+        count = in_stream.read_var_int64()
+
+    return self._construct_from_sequence(elements)
 
   def estimate_size(self, value, nested=False):
     """Estimates the encoded size of the given value, in bytes."""
@@ -551,6 +604,11 @@ class SequenceCoderImpl(StreamCoderImpl):
                 elem, nested=True))
         estimated_size += child_size
         observables += child_observables
+      # TODO: (BEAM-1537) Update to use an accurate count depending on size and
+      # count, currently we are underestimating the size by up to 10 bytes
+      # per block of data since we are not including the count prefix which
+      # occurs at most once per 64k of data and is upto 10 bytes long. The upper
+      # bound of the underestimate is 10 / 65536 ~= 0.0153% of the actual size.
       return estimated_size, observables
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 338f89e..6491ea8 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -249,6 +249,26 @@ class CodersTest(unittest.TestCase):
                            coders.IterableCoder(coders.VarIntCoder()))),
         (1, [1, 2, 3]))
 
+  def test_iterable_coder_unknown_length(self):
+    # Empty
+    self._test_iterable_coder_of_unknown_length(0)
+    # Single element
+    self._test_iterable_coder_of_unknown_length(1)
+    # Multiple elements
+    self._test_iterable_coder_of_unknown_length(100)
+    # Multiple elements with underlying stream buffer overflow.
+    self._test_iterable_coder_of_unknown_length(80000)
+
+  def _test_iterable_coder_of_unknown_length(self, count):
+    def iter_generator(count):
+      for i in range(count):
+        yield i
+
+    iterable_coder = coders.IterableCoder(coders.VarIntCoder())
+    self.assertItemsEqual(list(iter_generator(count)),
+                          iterable_coder.decode(
+                              iterable_coder.encode(iter_generator(count))))
+
   def test_windowed_value_coder(self):
     coder = coders.WindowedValueCoder(coders.VarIntCoder(),
                                       coders.GlobalWindowCoder())

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py
index 5462fae..a87495c 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -64,6 +64,9 @@ class OutputStream(object):
   def get(self):
     return ''.join(self.data)
 
+  def size(self):
+    return len(self.data)
+
 
 class ByteCountingOutputStream(OutputStream):
   """A pure Python implementation of stream.ByteCountingOutputStream."""

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index e26d2c8..4a48ed9 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -94,13 +94,18 @@ class StandardCodersTest(unittest.TestCase):
       for expected_encoded, json_value in spec['examples'].items():
         value = parse_value(json_value)
         expected_encoded = expected_encoded.encode('latin1')
-        actual_encoded = encode_nested(coder, value, nested)
-        if self.fix and actual_encoded != expected_encoded:
-          self.to_fix[spec['index'], expected_encoded] = actual_encoded
+        if not spec['coder'].get('non_deterministic', False):
+          actual_encoded = encode_nested(coder, value, nested)
+          if self.fix and actual_encoded != expected_encoded:
+            self.to_fix[spec['index'], expected_encoded] = actual_encoded
+          else:
+            self.assertEqual(expected_encoded, actual_encoded)
+            self.assertEqual(decode_nested(coder, expected_encoded, nested),
+                             value)
         else:
+          # Only verify decoding for a non-deterministic coder
           self.assertEqual(decode_nested(coder, expected_encoded, nested),
                            value)
-          self.assertEqual(expected_encoded, actual_encoded)
 
   def parse_coder(self, spec):
     return self._urn_to_coder_class[spec['urn']](

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/stream.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd
index 22ad8c1..a1eb9f7 100644
--- a/sdks/python/apache_beam/coders/stream.pxd
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -20,7 +20,7 @@ cimport libc.stdint
 
 cdef class OutputStream(object):
   cdef char* data
-  cdef size_t size
+  cdef size_t _size
   cdef size_t pos
 
   cpdef write(self, bytes b, bint nested=*)
@@ -32,7 +32,7 @@ cdef class OutputStream(object):
   cpdef write_bigendian_double(self, double d)
 
   cpdef bytes get(self)
-
+  cpdef size_t size(self) except? -1
   cdef extend(self, size_t missing)
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/coders/stream.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
index 845f289..ae24418 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -25,9 +25,9 @@ cdef class OutputStream(object):
   #TODO(robertwb): Consider using raw C++ streams.
 
   def __cinit__(self):
-    self.size = 1024
+    self.buffer_size = 1024
     self.pos = 0
-    self.data = <char*>libc.stdlib.malloc(self.size)
+    self.data = <char*>libc.stdlib.malloc(self.buffer_size)
     assert self.data, "OutputStream malloc failed."
 
   def __dealloc__(self):
@@ -38,13 +38,13 @@ cdef class OutputStream(object):
     cdef size_t blen = len(b)
     if nested:
       self.write_var_int64(blen)
-    if self.size < self.pos + blen:
+    if self.buffer_size < self.pos + blen:
       self.extend(blen)
     libc.string.memcpy(self.data + self.pos, <char*>b, blen)
     self.pos += blen
 
   cpdef write_byte(self, unsigned char val):
-    if  self.size < self.pos + 1:
+    if  self.buffer_size < self.pos + 1:
       self.extend(1)
     self.data[self.pos] = val
     self.pos += 1
@@ -66,7 +66,7 @@ cdef class OutputStream(object):
     self.write_bigendian_uint64(signed_v)
 
   cpdef write_bigendian_uint64(self, libc.stdint.uint64_t v):
-    if  self.size < self.pos + 8:
+    if  self.buffer_size < self.pos + 8:
       self.extend(8)
     self.data[self.pos    ] = <unsigned char>(v >> 56)
     self.data[self.pos + 1] = <unsigned char>(v >> 48)
@@ -80,7 +80,7 @@ cdef class OutputStream(object):
 
   cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
     cdef libc.stdint.uint32_t v = signed_v
-    if  self.size < self.pos + 4:
+    if  self.buffer_size < self.pos + 4:
       self.extend(4)
     self.data[self.pos    ] = <unsigned char>(v >> 24)
     self.data[self.pos + 1] = <unsigned char>(v >> 16)
@@ -94,10 +94,13 @@ cdef class OutputStream(object):
   cpdef bytes get(self):
     return self.data[:self.pos]
 
+  cpdef size_t size(self) except? -1:
+    return self.pos
+
   cdef extend(self, size_t missing):
-    while missing > self.size - self.pos:
-      self.size *= 2
-    self.data = <char*>libc.stdlib.realloc(self.data, self.size)
+    while missing > self.buffer_size - self.pos:
+      self.buffer_size *= 2
+    self.data = <char*>libc.stdlib.realloc(self.data, self.buffer_size)
     assert self.data, "OutputStream realloc failed."
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/06535afa/sdks/python/apache_beam/tests/data/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/data/standard_coders.yaml b/sdks/python/apache_beam/tests/data/standard_coders.yaml
index 172b6d8..790cacb 100644
--- a/sdks/python/apache_beam/tests/data/standard_coders.yaml
+++ b/sdks/python/apache_beam/tests/data/standard_coders.yaml
@@ -33,6 +33,9 @@
 #
 # It is expected that future work will move the `coder` field into a format that it would
be
 # represented by the Runner API, so that it can be understood by all SDKs and harnesses.
+#
+# If a coder is marked non-deterministic in the coder spec, then only the decoding should
be validated.
+
 
 
 coder:
@@ -131,6 +134,19 @@ examples:
 
 coder:
   urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:bytes:0.1"}]
+  # This is for iterables of unknown length, where the encoding is not
+  # deterministic.
+  non_deterministic: True
+examples:
+  "\u00ff\u00ff\u00ff\u00ff\u0000": []
+  "\u00ff\u00ff\u00ff\u00ff\u0001\u0003abc\u0000": ["abc"]
+  "\u00ff\u00ff\u00ff\u00ff\u0002\u0004ab\u0000c\u0004de\u0000f\u0000": ["ab\0c", "de\0f"]
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
   components: [{urn: "urn:beam:coders:global_window:0.1"}]
 examples:
   "\0\0\0\u0001": [""]


Mime
View raw message