beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [03/13] beam git commit: get it compiling
Date Tue, 09 May 2017 04:21:23 GMT
get it compiling


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/45e09b2d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/45e09b2d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/45e09b2d

Branch: refs/heads/master
Commit: 45e09b2df7a97cfbbe0d8013c16cfdd7a55afbde
Parents: b7f3341
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Fri May 5 17:27:13 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Mon May 8 20:17:56 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/BatchViewOverrides.java    |  4 +--
 .../org/apache/beam/sdk/transforms/Combine.java |  3 +-
 .../beam/sdk/transforms/join/CoGbkResult.java   |  2 +-
 .../beam/sdk/transforms/windowing/PaneInfo.java |  1 -
 .../org/apache/beam/sdk/util/WindowedValue.java |  3 +-
 .../beam/sdk/coders/CoderRegistryTest.java      |  6 ++++
 .../beam/sdk/testing/CoderPropertiesTest.java   | 36 ++++++++++----------
 .../sdk/testing/SerializableMatchersTest.java   |  1 -
 .../apache/beam/sdk/transforms/CombineTest.java | 19 +++--------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 17 ++-------
 10 files changed, 37 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 34609df..d640f6e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1351,9 +1351,9 @@ class BatchViewOverrides {
     }
 
     @Override
-    public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, OutputStream
outStream)
+    public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream)
         throws CoderException, IOException {
-      encode(outStream, outStream, Coder.Context.NESTED);
+      encode(value, outStream, Coder.Context.NESTED);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 7e43564..9e1cc71 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2001,7 +2001,8 @@ public class Combine {
         }
 
         @Override
-        public InputOrAccum<InputT, AccumT> decode(InputStream inStream) throws CoderException,
IOException {
+        public InputOrAccum<InputT, AccumT> decode(InputStream inStream)
+            throws CoderException, IOException {
           return decode(inStream, Coder.Context.NESTED);
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index d42de82..877bb07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -264,7 +264,7 @@ public class CoGbkResult {
       }
       List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size());
       for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
-        valueMap.add(tagListCoder(unionTag).decode(inStream, Coder.Context.NESTED));
+        valueMap.add(tagListCoder(unionTag).decode(inStream));
       }
       return new CoGbkResult(schema, valueMap);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index 75df220..1e9a187 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 963886b..444521a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -646,8 +646,7 @@ public abstract class WindowedValue<T> {
                        OutputStream outStream,
                        Context context)
         throws CoderException, IOException {
-      InstantCoder.of().encode(
-          windowedElem.getTimestamp(), outStream, nestedContext);
+      InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
       windowsCoder.encode(windowedElem.getWindows(), outStream);
       PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream);
       valueCoder.encode(windowedElem.getValue(), outStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index c883ca0..b199a06 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -438,6 +438,12 @@ public class CoderRegistryTest {
 
   private static class AutoRegistrationClassCoder extends CustomCoder<AutoRegistrationClass>
{
     private static final AutoRegistrationClassCoder INSTANCE = new AutoRegistrationClassCoder();
+
+    public void encode(AutoRegistrationClass value, OutputStream outStream) {}
+
+    public AutoRegistrationClass decode(InputStream inStream) {
+      return null;
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
index 164d221..ce78411 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
@@ -50,15 +50,15 @@ public class CoderPropertiesTest {
   /** A coder that says it is not deterministic but actually is. */
   public static class NonDeterministicCoder extends AtomicCoder<String> {
     @Override
-    public void encode(String value, OutputStream outStream, Context context)
+    public void encode(String value, OutputStream outStream)
         throws CoderException, IOException {
-      StringUtf8Coder.of().encode(value, outStream, context);
+      StringUtf8Coder.of().encode(value, outStream);
     }
 
     @Override
-    public String decode(InputStream inStream, Context context)
+    public String decode(InputStream inStream)
         throws CoderException, IOException {
-      return StringUtf8Coder.of().decode(inStream, context);
+      return StringUtf8Coder.of().decode(inStream);
     }
 
     public void verifyDeterministic() throws NonDeterministicException {
@@ -96,15 +96,15 @@ public class CoderPropertiesTest {
     }
 
     @Override
-    public void encode(String value, OutputStream outStream, Context context)
+    public void encode(String value, OutputStream outStream)
         throws IOException, CoderException {
-      StringUtf8Coder.of().encode(value + System.nanoTime(), outStream, context);
+      StringUtf8Coder.of().encode(value + System.nanoTime(), outStream);
     }
 
     @Override
-    public String decode(InputStream inStream, Context context)
+    public String decode(InputStream inStream)
         throws CoderException, IOException {
-      return StringUtf8Coder.of().decode(inStream, context);
+      return StringUtf8Coder.of().decode(inStream);
     }
 
     @Override
@@ -136,16 +136,16 @@ public class CoderPropertiesTest {
     }
 
     @Override
-    public void encode(String value, OutputStream outStream, Context context)
+    public void encode(String value, OutputStream outStream)
         throws CoderException, IOException {
       changedState += 1;
-      StringUtf8Coder.of().encode(value + Strings.repeat("A", changedState), outStream, context);
+      StringUtf8Coder.of().encode(value + Strings.repeat("A", changedState), outStream);
     }
 
     @Override
-    public String decode(InputStream inStream, Context context)
+    public String decode(InputStream inStream)
         throws CoderException, IOException {
-      String decodedValue = StringUtf8Coder.of().decode(inStream, context);
+      String decodedValue = StringUtf8Coder.of().decode(inStream);
       return decodedValue.substring(0, decodedValue.length() - changedState);
     }
 
@@ -180,18 +180,18 @@ public class CoderPropertiesTest {
     }
 
     @Override
-    public void encode(String value, OutputStream outStream, Context context)
+    public void encode(String value, OutputStream outStream)
         throws CoderException, IOException {
       if (lostState == 0) {
         throw new RuntimeException("I forgot something...");
       }
-      StringUtf8Coder.of().encode(value, outStream, context);
+      StringUtf8Coder.of().encode(value, outStream);
     }
 
     @Override
-    public String decode(InputStream inStream, Context context)
+    public String decode(InputStream inStream)
         throws CoderException, IOException {
-      return StringUtf8Coder.of().decode(inStream, context);
+      return StringUtf8Coder.of().decode(inStream);
     }
 
     @Override
@@ -216,12 +216,12 @@ public class CoderPropertiesTest {
   /** A coder which closes the underlying stream during encoding and decoding. */
   public static class ClosingCoder extends AtomicCoder<String> {
     @Override
-    public void encode(String value, OutputStream outStream, Context context) throws IOException
{
+    public void encode(String value, OutputStream outStream) throws IOException {
       outStream.close();
     }
 
     @Override
-    public String decode(InputStream inStream, Context context) throws IOException {
+    public String decode(InputStream inStream) throws IOException {
       inStream.close();
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index 375be33..6b17696 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -30,7 +30,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.hamcrest.Matchers;

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index bd8aee4..dc9788f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -876,28 +876,17 @@ public class CombineTest implements Serializable {
      */
     private class CountSumCoder extends AtomicCoder<CountSum> {
       @Override
-      public void encode(CountSum value, OutputStream outStream, OutputStream outStream)
+      public void encode(CountSum value, OutputStream outStream)
           throws CoderException, IOException {
-        encode(outStream, outStream, Context.NESTED);
-      }
-
-      @Override
-      public void encode(CountSum value, OutputStream outStream,
-          Context context) throws CoderException, IOException {
         LONG_CODER.encode(value.count, outStream);
-        DOUBLE_CODER.encode(value.sum, outStream, context);
-      }
-
-      @Override
-      public CountSum decode(InputStream inStream) throws CoderException, IOException {
-        return decode(inStream, Coder.Context.NESTED);
+        DOUBLE_CODER.encode(value.sum, outStream);
       }
 
       @Override
-      public CountSum decode(InputStream inStream, Coder.Context context)
+      public CountSum decode(InputStream inStream)
           throws CoderException, IOException {
         long count = LONG_CODER.decode(inStream);
-        double sum = DOUBLE_CODER.decode(inStream, context);
+        double sum = DOUBLE_CODER.decode(inStream);
         return new CountSum(count, sum);
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/45e09b2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3697211..ef27f4c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1092,24 +1092,13 @@ public class ParDoTest implements Serializable {
     @Override
     public void encode(MyInteger value, OutputStream outStream)
         throws CoderException, IOException {
-      encode(value, outStream, Context.NESTED);
+      delegate.encode(value.getValue(), outStream);
     }
 
     @Override
-    public void encode(MyInteger value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      delegate.encode(value.getValue(), outStream, context);
-    }
-
-    @Override
-    public MyInteger decode(InputStream inStream) throws CoderException {
-      return decode(inStream, Context.NESTED);
-    }
-
-    @Override
-    public MyInteger decode(InputStream inStream, Context context) throws CoderException,
+    public MyInteger decode(InputStream inStream) throws CoderException,
         IOException {
-      return new MyInteger(delegate.decode(inStream, context));
+      return new MyInteger(delegate.decode(inStream));
     }
   }
 


Mime
View raw message