beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [5/5] beam git commit: Add known window serialization for Java.
Date Fri, 02 Jun 2017 16:17:12 GMT
Add known window serialization for Java.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b490e84e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b490e84e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b490e84e

Branch: refs/heads/master
Commit: b490e84ef0b4e56cabc091cfe2cc42f8f1e69caa
Parents: de75786
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Thu May 25 13:26:02 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700

----------------------------------------------------------------------
 .../WindowingStrategyTranslation.java           | 105 +++++++++++++++----
 .../WindowingStrategyTranslationTest.java       |   9 ++
 .../src/main/proto/beam_known_payloads.proto    |  53 ----------
 .../src/main/proto/standard_window_fns.proto    |  53 ++++++++++
 sdks/python/apache_beam/transforms/window.py    |  14 +--
 5 files changed, 154 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index a226624..718efe7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApiPayloads;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -165,6 +165,9 @@ public class WindowingStrategyTranslation implements Serializable {
   // This URN says that the WindowFn is just a UDF blob the Java SDK understands
   // TODO: standardize such things
   public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
+  public static final String OLD_SERIALIZED_JAVA_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+  // Remove this once the dataflow worker understands all the above formats.
+  private static final boolean USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN = true;
 
   /**
    * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
@@ -173,19 +176,80 @@ public class WindowingStrategyTranslation implements Serializable {
    */
   public static SdkFunctionSpec toProto(
       WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components)
{
-    return SdkFunctionSpec.newBuilder()
-        // TODO: Set environment ID
-        .setSpec(
-            FunctionSpec.newBuilder()
-                .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(windowFn)))
-                            .build())))
-        .build();
+    // TODO: Set environment IDs
+    if (USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
+                  .setParameter(
+                      Any.pack(
+                          BytesValue.newBuilder()
+                              .setValue(
+                                  ByteString.copyFrom(
+                                      SerializableUtils.serializeToByteArray(windowFn)))
+                              .build())))
+          .build();
+    } else if (windowFn instanceof GlobalWindows) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_FN))
+          .build();
+    } else if (windowFn instanceof FixedWindows) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(FIXED_WINDOWS_FN)
+                  .setParameter(
+                      Any.pack(
+                          StandardWindowFns.FixedWindowsPayload.newBuilder()
+                              .setSize(Durations.fromMillis(
+                                  ((FixedWindows) windowFn).getSize().getMillis()))
+                              .setOffset(Timestamps.fromMillis(
+                                  ((FixedWindows) windowFn).getOffset().getMillis()))
+                              .build())))
+          .build();
+    } else if (windowFn instanceof SlidingWindows) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(SLIDING_WINDOWS_FN)
+                  .setParameter(
+                      Any.pack(
+                          StandardWindowFns.SlidingWindowsPayload.newBuilder()
+                              .setSize(Durations.fromMillis(
+                                  ((SlidingWindows) windowFn).getSize().getMillis()))
+                              .setOffset(Timestamps.fromMillis(
+                                  ((SlidingWindows) windowFn).getOffset().getMillis()))
+                              .setPeriod(Durations.fromMillis(
+                                  ((SlidingWindows) windowFn).getPeriod().getMillis()))
+                              .build())))
+          .build();
+    } else if (windowFn instanceof Sessions) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(SESSION_WINDOWS_FN)
+                  .setParameter(
+                      Any.pack(
+                          StandardWindowFns.SessionsPayload.newBuilder()
+                              .setGapSize(Durations.fromMillis(
+                                  ((Sessions) windowFn).getGapDuration().getMillis()))
+                              .build())))
+          .build();
+    } else {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
+                  .setParameter(
+                      Any.pack(
+                          BytesValue.newBuilder()
+                              .setValue(
+                                  ByteString.copyFrom(
+                                      SerializableUtils.serializeToByteArray(windowFn)))
+                              .build())))
+          .build();
+    }
   }
 
   /**
@@ -274,27 +338,28 @@ public class WindowingStrategyTranslation implements Serializable {
       case GLOBAL_WINDOWS_FN:
         return new GlobalWindows();
       case FIXED_WINDOWS_FN:
-        RunnerApiPayloads.FixedWindowsPayload fixedParams =
+        StandardWindowFns.FixedWindowsPayload fixedParams =
             windowFnSpec.getSpec().getParameter().unpack(
-                RunnerApiPayloads.FixedWindowsPayload.class);
+                StandardWindowFns.FixedWindowsPayload.class);
         return FixedWindows.of(
             Duration.millis(Durations.toMillis(fixedParams.getSize())))
             .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
       case SLIDING_WINDOWS_FN:
-        RunnerApiPayloads.SlidingWindowsPayload slidingParams =
+        StandardWindowFns.SlidingWindowsPayload slidingParams =
             windowFnSpec.getSpec().getParameter().unpack(
-                RunnerApiPayloads.SlidingWindowsPayload.class);
+                StandardWindowFns.SlidingWindowsPayload.class);
         return SlidingWindows.of(
             Duration.millis(Durations.toMillis(slidingParams.getSize())))
             .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
             .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
       case SESSION_WINDOWS_FN:
-        RunnerApiPayloads.SessionsPayload sessionParams =
+        StandardWindowFns.SessionsPayload sessionParams =
             windowFnSpec.getSpec().getParameter().unpack(
-                RunnerApiPayloads.SessionsPayload.class);
+                StandardWindowFns.SessionsPayload.class);
         return Sessions.withGapDuration(
             Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
       case SERIALIZED_JAVA_WINDOWFN_URN:
+      case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
         return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
             windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
             "WindowFn");

http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index 1e52803..e406545 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -62,6 +64,13 @@ public class WindowingStrategyTranslationTest {
   public static Iterable<ToProtoAndBackSpec> data() {
     return ImmutableList.of(
         toProtoAndBackSpec(WindowingStrategy.globalDefault()),
+        toProtoAndBackSpec(WindowingStrategy.of(
+            FixedWindows.of(Duration.millis(11)).withOffset(Duration.millis(3)))),
+        toProtoAndBackSpec(WindowingStrategy.of(
+            SlidingWindows.of(Duration.millis(37)).every(Duration.millis(3))
+                .withOffset(Duration.millis(2)))),
+        toProtoAndBackSpec(WindowingStrategy.of(
+            Sessions.withGapDuration(Duration.millis(389)))),
         toProtoAndBackSpec(
             WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
                 .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)

http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
deleted file mode 100644
index 446bd59..0000000
--- a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Runner API, which is the runner-independent,
- * SDK-independent definition of the Beam model.
- */
-
-syntax = "proto3";
-
-package org.apache.beam.runner_api.v1;
-
-option java_package = "org.apache.beam.sdk.common.runner.v1";
-option java_outer_classname = "RunnerApiPayloads";
-
-import "google/protobuf/duration.proto";
-import "google/protobuf/timestamp.proto";
-
-// beam:windowfn:global_windows:v0.1
-// empty payload
-
-// beam:windowfn:fixed_windows:v0.1
-message FixedWindowsPayload {
-  google.protobuf.Duration size = 1;
-  google.protobuf.Timestamp offset = 2;
-}
-
-// beam:windowfn:sliding_windows:v0.1
-message SlidingWindowsPayload {
-  google.protobuf.Duration size = 1;
-  google.protobuf.Timestamp offset = 2;
-  google.protobuf.Duration period = 3;
-}
-
-// beam:windowfn:session_windows:v0.1
-message SessionsPayload {
-  google.protobuf.Duration gap_size = 1;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/standard_window_fns.proto b/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
new file mode 100644
index 0000000..0682044
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Runner API, which is the runner-independent,
+ * SDK-independent definition of the Beam model.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.runner_api.v1;
+
+option java_package = "org.apache.beam.sdk.common.runner.v1";
+option java_outer_classname = "StandardWindowFns";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+// beam:windowfn:global_windows:v0.1
+// empty payload
+
+// beam:windowfn:fixed_windows:v0.1
+message FixedWindowsPayload {
+  google.protobuf.Duration size = 1;
+  google.protobuf.Timestamp offset = 2;
+}
+
+// beam:windowfn:sliding_windows:v0.1
+message SlidingWindowsPayload {
+  google.protobuf.Duration size = 1;
+  google.protobuf.Timestamp offset = 2;
+  google.protobuf.Duration period = 3;
+}
+
+// beam:windowfn:session_windows:v0.1
+message SessionsPayload {
+  google.protobuf.Duration gap_size = 1;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index f74c8a9..e87a007 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -56,7 +56,7 @@ from google.protobuf import timestamp_pb2
 
 from apache_beam.coders import coders
 from apache_beam.runners.api import beam_runner_api_pb2
-from apache_beam.runners.api import beam_known_payloads_pb2
+from apache_beam.runners.api import standard_window_fns_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
@@ -343,14 +343,14 @@ class FixedWindows(NonMergingWindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.FIXED_WINDOWS_FN,
-            beam_known_payloads_pb2.FixedWindowsPayload(
+            standard_window_fns_pb2.FixedWindowsPayload(
                 size=proto_utils.from_micros(
                     duration_pb2.Duration, self.size.micros),
                 offset=proto_utils.from_micros(
                     timestamp_pb2.Timestamp, self.offset.micros)))
 
   @urns.RunnerApiFn.register_urn(
-      urns.FIXED_WINDOWS_FN, beam_known_payloads_pb2.FixedWindowsPayload)
+      urns.FIXED_WINDOWS_FN, standard_window_fns_pb2.FixedWindowsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return FixedWindows(
         size=Duration(micros=fn_parameter.size.ToMicroseconds()),
@@ -398,7 +398,7 @@ class SlidingWindows(NonMergingWindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.SLIDING_WINDOWS_FN,
-            beam_known_payloads_pb2.SlidingWindowsPayload(
+            standard_window_fns_pb2.SlidingWindowsPayload(
                 size=proto_utils.from_micros(
                     duration_pb2.Duration, self.size.micros),
                 offset=proto_utils.from_micros(
@@ -408,7 +408,7 @@ class SlidingWindows(NonMergingWindowFn):
 
   @urns.RunnerApiFn.register_urn(
       urns.SLIDING_WINDOWS_FN,
-      beam_known_payloads_pb2.SlidingWindowsPayload)
+      standard_window_fns_pb2.SlidingWindowsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return SlidingWindows(
         size=Duration(micros=fn_parameter.size.ToMicroseconds()),
@@ -465,12 +465,12 @@ class Sessions(WindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.SESSION_WINDOWS_FN,
-            beam_known_payloads_pb2.SessionsPayload(
+            standard_window_fns_pb2.SessionsPayload(
                 gap_size=proto_utils.from_micros(
                     duration_pb2.Duration, self.gap_size.micros)))
 
   @urns.RunnerApiFn.register_urn(
-      urns.SESSION_WINDOWS_FN, beam_known_payloads_pb2.SessionsPayload)
+      urns.SESSION_WINDOWS_FN, standard_window_fns_pb2.SessionsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return Sessions(
         gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds()))


Mime
View raw message