beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Add cross-SDK implementations and tests of WindowedValueCoder
Date Wed, 22 Feb 2017 18:19:11 GMT
Repository: beam
Updated Branches:
  refs/heads/master fbaac0fc8 -> ede77c1b5


Add cross-SDK implementations and tests of WindowedValueCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f22f7e56
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f22f7e56
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f22f7e56

Branch: refs/heads/master
Commit: f22f7e56ae61d0d14761e8201e5d18dce420b5e2
Parents: fbaac0f
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Wed Feb 15 18:13:12 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Wed Feb 22 09:38:13 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/fn/v1/standard_coders.yaml  | 56 +++++++++++++++++-
 .../apache/beam/sdk/coders/CommonCoderTest.java | 49 ++++++++++++++--
 sdks/python/apache_beam/coders/coder_impl.py    | 60 +++++++++++++++++---
 .../apache_beam/coders/coders_test_common.py    |  9 ++-
 .../apache_beam/coders/standard_coders_test.py  | 16 +++++-
 .../apache_beam/tests/data/standard_coders.yaml | 54 +++++++++++++++++-
 sdks/python/apache_beam/transforms/window.py    |  2 +-
 7 files changed, 226 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f22f7e56/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
index 58a2a90..172b6d8 100644
--- a/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
+++ b/sdks/common/fn-api/src/test/resources/org/apache/beam/fn/v1/standard_coders.yaml
@@ -100,7 +100,7 @@ examples:
 ---
 
 coder:
-  urn: "urn:beam:coders:intervalwindow:0.1"
+  urn: "urn:beam:coders:interval_window:0.1"
 examples:
   "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000,
span: 3600000}
   "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end:
1456881825000, span: 2592000000}
@@ -125,4 +125,56 @@ coder:
 examples:
   "\0\0\0\u0001\u0003abc": ["abc"]
   "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
-  "\0\0\0\0": []
\ No newline at end of file
+  "\0\0\0\0": []
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+  "\0\0\0\u0001": [""]
+
+---
+
+coder:
+  urn: "urn:beam:coders:global_window:0.1"
+examples:
+  "": ""
+
+---
+
+# All windowed values consist of pane infos that represent NO_FIRING until full support is
added
+# in the Python SDK (BEAM-1522).
+coder:
+  urn: "urn:beam:coders:windowed_value:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"},
+               {urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+  "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
+    value: 2,
+    timestamp: 1454293425000,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: ["global"]
+  }
+
+---
+
+coder:
+  urn: "urn:beam:coders:windowed_value:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"},
+               {urn: "urn:beam:coders:interval_window:0.1"}]
+examples:
+  "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004":
{
+    value: 4,
+    timestamp: -400000,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: [{end: 1454293425000, span: 280000}]
+  }
+
+  "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002":
{
+    value: 2,
+    timestamp: -100,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
+  }

http://git-wip-us.apache.org/repos/asf/beam/blob/f22f7e56/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
index 7eafbe2..660d608 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CommonCoderTest.java
@@ -45,9 +45,13 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -69,8 +73,10 @@ public class CommonCoderTest {
       .put("urn:beam:coders:bytes:0.1", ByteCoder.class)
       .put("urn:beam:coders:kv:0.1", KvCoder.class)
       .put("urn:beam:coders:varint:0.1", VarLongCoder.class)
-      .put("urn:beam:coders:intervalwindow:0.1", IntervalWindowCoder.class)
+      .put("urn:beam:coders:interval_window:0.1", IntervalWindowCoder.class)
       .put("urn:beam:coders:stream:0.1", IterableCoder.class)
+      .put("urn:beam:coders:global_window:0.1", GlobalWindow.Coder.class)
+      .put("urn:beam:coders:windowed_value:0.1", WindowedValue.FullWindowedValueCoder.class)
       .build();
 
   @AutoValue
@@ -191,15 +197,16 @@ public class CommonCoderTest {
         Object v = convertValue(kvMap.get("value"), coderSpec.getComponents().get(1), valueCoder);
         return KV.of(k, v);
       }
-      case "urn:beam:coders:varint:0.1":
+      case "urn:beam:coders:varint:0.1": {
         return ((Number) value).longValue();
-      case "urn:beam:coders:intervalwindow:0.1": {
+      }
+      case "urn:beam:coders:interval_window:0.1": {
         Map<String, Object> kvMap = (Map<String, Object>) value;
         Instant end = new Instant(((Number) kvMap.get("end")).longValue());
         Duration span = Duration.millis(((Number) kvMap.get("span")).longValue());
         return new IntervalWindow(end.minus(span), span);
       }
-      case "urn:beam:coders:stream:0.1":
+      case "urn:beam:coders:stream:0.1": {
         Coder elementCoder = ((IterableCoder) coder).getElemCoder();
         List<Object> elements = (List<Object>) value;
         List<Object> convertedElements = new LinkedList<>();
@@ -208,6 +215,33 @@ public class CommonCoderTest {
               convertValue(element, coderSpec.getComponents().get(0), elementCoder));
         }
         return convertedElements;
+      }
+      case "urn:beam:coders:global_window:0.1": {
+        return GlobalWindow.INSTANCE;
+      }
+      case "urn:beam:coders:windowed_value:0.1": {
+        Map<String, Object> kvMap = (Map<String, Object>) value;
+        Coder valueCoder = ((WindowedValue.FullWindowedValueCoder) coder).getValueCoder();
+        Coder windowCoder = ((WindowedValue.FullWindowedValueCoder) coder).getWindowCoder();
+        Object windowValue = convertValue(
+            kvMap.get("value"), coderSpec.getComponents().get(0), valueCoder);
+        Instant timestamp = new Instant(((Number) kvMap.get("timestamp")).longValue());
+        List<BoundedWindow> windows = new LinkedList<>();
+        for (Object window : ((List<Object>) kvMap.get("windows"))) {
+          windows.add((BoundedWindow) convertValue(window, coderSpec.getComponents().get(1),
+              windowCoder));
+        }
+
+        Map<String, Object> paneInfoMap = (Map<String, Object>) kvMap.get("pane");
+        PaneInfo paneInfo = PaneInfo.createPane(
+            (boolean) paneInfoMap.get("is_first"),
+            (boolean) paneInfoMap.get("is_last"),
+            PaneInfo.Timing.valueOf((String) paneInfoMap.get("timing")),
+            (int) paneInfoMap.get("index"),
+            (int) paneInfoMap.get("on_time_index"));
+
+        return WindowedValue.of(windowValue, timestamp, windows, paneInfo);
+      }
       default:
         throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn());
     }
@@ -225,10 +259,15 @@ public class CommonCoderTest {
         return KvCoder.of(components.get(0), components.get(1));
       case "urn:beam:coders:varint:0.1":
         return VarLongCoder.of();
-      case "urn:beam:coders:intervalwindow:0.1":
+      case "urn:beam:coders:interval_window:0.1":
         return IntervalWindowCoder.of();
       case "urn:beam:coders:stream:0.1":
         return IterableCoder.of(components.get(0));
+      case "urn:beam:coders:global_window:0.1":
+        return GlobalWindow.Coder.INSTANCE;
+      case "urn:beam:coders:windowed_value:0.1":
+        return WindowedValue.FullWindowedValueCoder.of(components.get(0),
+            (Coder<BoundedWindow>) components.get(1));
       default:
         throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f22f7e56/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 840397a..2dbfae7 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -24,11 +24,12 @@ encode many elements with minimal overhead.
 This module may be optionally compiled with Cython, using the corresponding
 coder_impl.pxd file for type hints.
 """
-
 from types import NoneType
 
 from apache_beam.coders import observable
 from apache_beam.utils.timestamp import Timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils import windowed_value
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -570,6 +571,18 @@ class IterableCoderImpl(SequenceCoderImpl):
 class WindowedValueCoderImpl(StreamCoderImpl):
   """A coder for windowed values."""
 
+  # Ensure that lexicographic ordering of the bytes corresponds to
+  # chronological order of timestamps.
+  # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
+  # byte representation of timestamps.
+  def _to_normal_time(self, value):
+    """Convert "lexicographically ordered unsigned" to signed."""
+    return value - (1 << 63)
+
+  def _from_normal_time(self, value):
+    """Convert signed to "lexicographically ordered unsigned"."""
+    return value + (1 << 63)
+
   def __init__(self, value_coder, timestamp_coder, window_coder):
     # TODO(lcwik): Remove the timestamp coder field
     self._value_coder = value_coder
@@ -578,17 +591,48 @@ class WindowedValueCoderImpl(StreamCoderImpl):
 
   def encode_to_stream(self, value, out, nested):
     wv = value  # type cast
-    self._value_coder.encode_to_stream(wv.value, out, True)
     # Avoid creation of Timestamp object.
-    out.write_bigendian_int64(wv.timestamp_micros)
+    restore_sign = -1 if wv.timestamp_micros < 0 else 1
+    out.write_bigendian_uint64(
+        # Convert to postive number and divide, since python rounds off to the
+        # lower negative number. For ex: -3 / 2 = -2, but we expect it to be -1,
+        # to be consistent across SDKs.
+        # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
+        # precision of timestamps.
+        self._from_normal_time(
+            restore_sign * (abs(wv.timestamp_micros) / 1000)))
     self._windows_coder.encode_to_stream(wv.windows, out, True)
+    # Default PaneInfo encoded byte representing NO_FIRING.
+    # TODO(BEAM-1522): Remove the hard coding here once PaneInfo is supported.
+    out.write_byte(0xF)
+    self._value_coder.encode_to_stream(wv.value, out, nested)
 
   def decode_from_stream(self, in_stream, nested):
+    timestamp = self._to_normal_time(in_stream.read_bigendian_uint64())
+    # Restore MIN/MAX timestamps to their actual values as encoding incurs loss
+    # of precision while converting to millis.
+    # Note: This is only a best effort here as there is no way to know if these
+    # were indeed MIN/MAX timestamps.
+    # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
+    # precision of timestamps.
+    if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000):
+      timestamp = MIN_TIMESTAMP.micros
+    elif timestamp == (MAX_TIMESTAMP.micros / 1000):
+      timestamp = MAX_TIMESTAMP.micros
+    else:
+      timestamp *= 1000
+
+    windows = self._windows_coder.decode_from_stream(in_stream, True)
+    # Read PaneInfo encoded byte.
+    # TODO(BEAM-1522): Ignored for now but should be converted to pane info once
+    # it is supported.
+    in_stream.read_byte()
+    value = self._value_coder.decode_from_stream(in_stream, nested)
     return windowed_value.create(
-        self._value_coder.decode_from_stream(in_stream, True),
+        value,
         # Avoid creation of Timestamp object.
-        in_stream.read_bigendian_int64(),
-        self._windows_coder.decode_from_stream(in_stream, True))
+        timestamp,
+        windows)
 
   def get_estimated_size_and_observables(self, value, nested=False):
     """Returns estimated size of value along with any nested observables."""
@@ -600,13 +644,15 @@ class WindowedValueCoderImpl(StreamCoderImpl):
     observables = []
     value_estimated_size, value_observables = (
         self._value_coder.get_estimated_size_and_observables(
-            value.value, nested=True))
+            value.value, nested=nested))
     estimated_size += value_estimated_size
     observables += value_observables
     estimated_size += (
         self._timestamp_coder.estimate_size(value.timestamp, nested=True))
     estimated_size += (
         self._windows_coder.estimate_size(value.windows, nested=True))
+    # for pane info
+    estimated_size += 1
     return estimated_size, observables
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f22f7e56/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 7284287..338f89e 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -264,13 +264,20 @@ class CodersTest(unittest.TestCase):
         },
         coder.as_cloud_object())
     # Test binary representation
-    self.assertEqual('\x01\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01',
+    self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
                      coder.encode(window.GlobalWindows.windowed_value(1)))
     # Test unnested
     self.check_coder(
         coders.WindowedValueCoder(coders.VarIntCoder()),
         windowed_value.WindowedValue(3, -100, ()),
         windowed_value.WindowedValue(-1, 100, (1, 2, 3)))
+
+    # Test Global Window
+    self.check_coder(
+        coders.WindowedValueCoder(coders.VarIntCoder(),
+                                  coders.GlobalWindowCoder()),
+        window.GlobalWindows.windowed_value(1))
+
     # Test nested
     self.check_coder(
         coders.TupleCoder((

http://git-wip-us.apache.org/repos/asf/beam/blob/f22f7e56/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 1fbf1a1..9df81ac 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -27,8 +27,10 @@ import yaml
 
 from apache_beam import coders
 from apache_beam.coders import coder_impl
+from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import Timestamp
 from apache_beam.transforms.window import IntervalWindow
+from apache_beam.transforms import window
 
 from nose_parameterized import parameterized
 
@@ -55,8 +57,11 @@ class StandardCodersTest(unittest.TestCase):
       'urn:beam:coders:bytes:0.1': coders.BytesCoder,
       'urn:beam:coders:varint:0.1': coders.VarIntCoder,
       'urn:beam:coders:kv:0.1': lambda k, v: coders.TupleCoder((k, v)),
-      'urn:beam:coders:intervalwindow:0.1': coders.IntervalWindowCoder,
+      'urn:beam:coders:interval_window:0.1': coders.IntervalWindowCoder,
       'urn:beam:coders:stream:0.1': lambda t: coders.IterableCoder(t),
+      'urn:beam:coders:global_window:0.1': coders.GlobalWindowCoder,
+      'urn:beam:coders:windowed_value:0.1':
+          lambda v, w: coders.WindowedValueCoder(v, w)
   }
 
   _urn_to_json_value_parser = {
@@ -65,11 +70,16 @@ class StandardCodersTest(unittest.TestCase):
       'urn:beam:coders:kv:0.1':
           lambda x, key_parser, value_parser: (key_parser(x['key']),
                                                value_parser(x['value'])),
-      'urn:beam:coders:intervalwindow:0.1':
+      'urn:beam:coders:interval_window:0.1':
           lambda x: IntervalWindow(
               start=Timestamp(micros=(x['end'] - x['span']) * 1000),
               end=Timestamp(micros=x['end'] * 1000)),
-      'urn:beam:coders:stream:0.1': lambda x, parser: map(parser, x)
+      'urn:beam:coders:stream:0.1': lambda x, parser: map(parser, x),
+      'urn:beam:coders:global_window:0.1': lambda x: window.GlobalWindow(),
+      'urn:beam:coders:windowed_value:0.1':
+          lambda x, value_parser, window_parser: windowed_value.create(
+              value_parser(x['value']), x['timestamp'] * 1000,
+              tuple([window_parser(w) for w in x['windows']]))
   }
 
   @parameterized.expand(_load_test_cases(STANDARD_CODERS_YAML))

http://git-wip-us.apache.org/repos/asf/beam/blob/f22f7e56/sdks/python/apache_beam/tests/data/standard_coders.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/data/standard_coders.yaml b/sdks/python/apache_beam/tests/data/standard_coders.yaml
index ea0b11b..172b6d8 100644
--- a/sdks/python/apache_beam/tests/data/standard_coders.yaml
+++ b/sdks/python/apache_beam/tests/data/standard_coders.yaml
@@ -100,7 +100,7 @@ examples:
 ---
 
 coder:
-  urn: "urn:beam:coders:intervalwindow:0.1"
+  urn: "urn:beam:coders:interval_window:0.1"
 examples:
   "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000,
span: 3600000}
   "\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end:
1456881825000, span: 2592000000}
@@ -126,3 +126,55 @@ examples:
   "\0\0\0\u0001\u0003abc": ["abc"]
   "\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
   "\0\0\0\0": []
+
+---
+
+coder:
+  urn: "urn:beam:coders:stream:0.1"
+  components: [{urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+  "\0\0\0\u0001": [""]
+
+---
+
+coder:
+  urn: "urn:beam:coders:global_window:0.1"
+examples:
+  "": ""
+
+---
+
+# All windowed values consist of pane infos that represent NO_FIRING until full support is
added
+# in the Python SDK (BEAM-1522).
+coder:
+  urn: "urn:beam:coders:windowed_value:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"},
+               {urn: "urn:beam:coders:global_window:0.1"}]
+examples:
+  "\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
+    value: 2,
+    timestamp: 1454293425000,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: ["global"]
+  }
+
+---
+
+coder:
+  urn: "urn:beam:coders:windowed_value:0.1"
+  components: [{urn: "urn:beam:coders:varint:0.1"},
+               {urn: "urn:beam:coders:interval_window:0.1"}]
+examples:
+  "\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004":
{
+    value: 4,
+    timestamp: -400000,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: [{end: 1454293425000, span: 280000}]
+  }
+
+  "\u007f\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u009c\0\0\0\u0002\u0080\0\u0001R\u009a\u00a4\u009bh\u0080\u00dd\u00db\u0001\u007f\u00df;dZ\u001c\u00adv\u00ed\u0002\u000f\u0002":
{
+    value: 2,
+    timestamp: -100,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+    windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
+  }

http://git-wip-us.apache.org/repos/asf/beam/blob/f22f7e56/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 67ca83c..14cf2f6 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -230,7 +230,7 @@ class GlobalWindows(WindowFn):
 
   @classmethod
   def windowed_value(cls, value, timestamp=MIN_TIMESTAMP):
-    return WindowedValue(value, timestamp, [GlobalWindow()])
+    return WindowedValue(value, timestamp, (GlobalWindow(),))
 
   def assign(self, assign_context):
     return [GlobalWindow()]


Mime
View raw message