beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [2/6] beam git commit: [BEAM-2166] Use contextless encode/decode by default.
Date Tue, 09 May 2017 17:47:13 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index 4467faa..3e1ff7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -38,12 +38,12 @@ public class VoidCoder extends AtomicCoder<Void> {
   private VoidCoder() {}
 
   @Override
-  public void encode(Void value, OutputStream outStream, Context context) {
+  public void encode(Void value, OutputStream outStream) {
     // Nothing to write!
   }
 
   @Override
-  public Void decode(InputStream inStream, Context context) {
+  public Void decode(InputStream inStream) {
     // Nothing to read!
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 20fab9b..32aa9c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -947,25 +947,24 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     @Override
-    public void encode(FileResult value, OutputStream outStream, Context context)
+    public void encode(FileResult value, OutputStream outStream)
         throws IOException {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
-      stringCoder.encode(value.getFilename().toString(), outStream, context.nested());
+      stringCoder.encode(value.getFilename().toString(), outStream);
       if (value.getDestinationFilename() == null) {
-        stringCoder.encode(null, outStream, context);
+        stringCoder.encode(null, outStream);
       } else {
-        stringCoder.encode(value.getDestinationFilename().toString(), outStream, context);
+        stringCoder.encode(value.getDestinationFilename().toString(), outStream);
       }
     }
 
     @Override
-    public FileResult decode(InputStream inStream, Context context)
-        throws IOException {
-      String filename = stringCoder.decode(inStream, context.nested());
+    public FileResult decode(InputStream inStream) throws IOException {
+      String filename = stringCoder.decode(inStream);
       assert filename != null;  // fixes a compiler warning
-      @Nullable String destinationFilename = stringCoder.decode(inStream, context);
+      @Nullable String destinationFilename = stringCoder.decode(inStream);
       return new FileResult(
           FileSystems.matchNewResource(filename, false /* isDirectory */),
           destinationFilename == null

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 37d5a55..d12d193 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -679,59 +679,55 @@ public class ApproximateQuantiles {
 
     @Override
     public void encode(
-        QuantileState<T, ComparatorT> state, OutputStream outStream, Coder.Context context)
+        QuantileState<T, ComparatorT> state, OutputStream outStream)
         throws CoderException, IOException {
-      Coder.Context nestedContext = context.nested();
-      intCoder.encode(state.numQuantiles, outStream, nestedContext);
-      intCoder.encode(state.bufferSize, outStream, nestedContext);
-      elementCoder.encode(state.min, outStream, nestedContext);
-      elementCoder.encode(state.max, outStream, nestedContext);
+      intCoder.encode(state.numQuantiles, outStream);
+      intCoder.encode(state.bufferSize, outStream);
+      elementCoder.encode(state.min, outStream);
+      elementCoder.encode(state.max, outStream);
       elementListCoder.encode(
-          state.unbufferedElements, outStream, nestedContext);
+          state.unbufferedElements, outStream);
       BigEndianIntegerCoder.of().encode(
-          state.buffers.size(), outStream, nestedContext);
+          state.buffers.size(), outStream);
       for (QuantileBuffer<T> buffer : state.buffers) {
-        encodeBuffer(buffer, outStream, nestedContext);
+        encodeBuffer(buffer, outStream);
       }
     }
 
     @Override
-    public QuantileState<T, ComparatorT> decode(InputStream inStream, Coder.Context context)
+    public QuantileState<T, ComparatorT> decode(InputStream inStream)
         throws CoderException, IOException {
-      Coder.Context nestedContext = context.nested();
-      int numQuantiles = intCoder.decode(inStream, nestedContext);
-      int bufferSize = intCoder.decode(inStream, nestedContext);
-      T min = elementCoder.decode(inStream, nestedContext);
-      T max = elementCoder.decode(inStream, nestedContext);
+      int numQuantiles = intCoder.decode(inStream);
+      int bufferSize = intCoder.decode(inStream);
+      T min = elementCoder.decode(inStream);
+      T max = elementCoder.decode(inStream);
       List<T> unbufferedElements =
-          elementListCoder.decode(inStream, nestedContext);
+          elementListCoder.decode(inStream);
       int numBuffers =
-          BigEndianIntegerCoder.of().decode(inStream, nestedContext);
+          BigEndianIntegerCoder.of().decode(inStream);
       List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers);
       for (int i = 0; i < numBuffers; i++) {
-        buffers.add(decodeBuffer(inStream, nestedContext));
+        buffers.add(decodeBuffer(inStream));
       }
       return new QuantileState<T, ComparatorT>(
           compareFn, numQuantiles, min, max, numBuffers, bufferSize, unbufferedElements, buffers);
     }
 
-    private void encodeBuffer(
-        QuantileBuffer<T> buffer, OutputStream outStream, Coder.Context context)
+    private void encodeBuffer(QuantileBuffer<T> buffer, OutputStream outStream)
         throws CoderException, IOException {
       DataOutputStream outData = new DataOutputStream(outStream);
       outData.writeInt(buffer.level);
       outData.writeLong(buffer.weight);
-      elementListCoder.encode(buffer.elements, outStream, context);
+      elementListCoder.encode(buffer.elements, outStream);
     }
 
-    private QuantileBuffer<T> decodeBuffer(
-        InputStream inStream, Coder.Context context)
+    private QuantileBuffer<T> decodeBuffer(InputStream inStream)
         throws IOException, CoderException {
       DataInputStream inData = new DataInputStream(inStream);
       return new QuantileBuffer<>(
           inData.readInt(),
           inData.readLong(),
-          elementListCoder.decode(inStream, context));
+          elementListCoder.decode(inStream));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index b9cdbd5..9e1cc71 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -532,6 +532,12 @@ public class Combine {
     }
 
     @Override
+    public void encode(Holder<V> accumulator, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(accumulator, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(Holder<V> accumulator, OutputStream outStream, Context context)
         throws CoderException, IOException {
       if (accumulator.present) {
@@ -543,6 +549,11 @@ public class Combine {
     }
 
     @Override
+    public Holder<V> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public Holder<V> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       if (inStream.read() == 1) {
@@ -1971,6 +1982,12 @@ public class Combine {
         }
 
         @Override
+        public void encode(InputOrAccum<InputT, AccumT> value, OutputStream outStream)
+            throws CoderException, IOException {
+          encode(value, outStream, Coder.Context.NESTED);
+        }
+
+        @Override
         public void encode(
             InputOrAccum<InputT, AccumT> value, OutputStream outStream, Coder.Context context)
             throws CoderException, IOException {
@@ -1984,6 +2001,12 @@ public class Combine {
         }
 
         @Override
+        public InputOrAccum<InputT, AccumT> decode(InputStream inStream)
+            throws CoderException, IOException {
+          return decode(inStream, Coder.Context.NESTED);
+        }
+
+        @Override
         public InputOrAccum<InputT, AccumT> decode(InputStream inStream, Coder.Context context)
             throws CoderException, IOException {
           if (inStream.read() == 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index 0515ed5..c619783 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -534,6 +534,12 @@ public class CombineFns {
     }
 
     @Override
+    public void encode(Object[] value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(Object[] value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       checkArgument(value.length == codersCount);
@@ -541,14 +547,18 @@ public class CombineFns {
         return;
       }
       int lastIndex = codersCount - 1;
-      Context nestedContext = context.nested();
       for (int i = 0; i < lastIndex; ++i) {
-        coders.get(i).encode(value[i], outStream, nestedContext);
+        coders.get(i).encode(value[i], outStream);
       }
       coders.get(lastIndex).encode(value[lastIndex], outStream, context);
     }
 
     @Override
+    public Object[] decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public Object[] decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       Object[] ret = new Object[codersCount];
@@ -556,9 +566,8 @@ public class CombineFns {
         return ret;
       }
       int lastIndex = codersCount - 1;
-      Context nestedContext = context.nested();
       for (int i = 0; i < lastIndex; ++i) {
-        ret[i] = coders.get(i).decode(inStream, nestedContext);
+        ret[i] = coders.get(i).decode(inStream);
       }
       ret[lastIndex] = coders.get(lastIndex).decode(inStream, context);
       return ret;

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 497d62b..b405dd1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -169,13 +169,13 @@ public class Count {
                                              Coder<T> inputCoder) {
       return new AtomicCoder<long[]>() {
         @Override
-        public void encode(long[] value, OutputStream outStream, Context context)
+        public void encode(long[] value, OutputStream outStream)
             throws IOException {
           VarInt.encode(value[0], outStream);
         }
 
         @Override
-        public long[] decode(InputStream inStream, Context context)
+        public long[] decode(InputStream inStream)
             throws IOException, CoderException {
           try {
             return new long[] {VarInt.decodeLong(inStream)};

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a309954..8932b03 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -185,18 +185,18 @@ public class Mean {
      private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
 
      @Override
-     public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context)
+     public void encode(CountSum<NumT> value, OutputStream outStream)
          throws CoderException, IOException {
-       LONG_CODER.encode(value.count, outStream, context.nested());
-       DOUBLE_CODER.encode(value.sum, outStream, context);
+       LONG_CODER.encode(value.count, outStream);
+       DOUBLE_CODER.encode(value.sum, outStream);
      }
 
      @Override
-     public CountSum<NumT> decode(InputStream inStream, Coder.Context context)
+     public CountSum<NumT> decode(InputStream inStream)
          throws CoderException, IOException {
        return new CountSum<>(
-           LONG_CODER.decode(inStream, context.nested()),
-           DOUBLE_CODER.decode(inStream, context));
+           LONG_CODER.decode(inStream),
+           DOUBLE_CODER.decode(inStream));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index c0381a7..dd8bc5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -539,15 +539,15 @@ public class Top {
 
     @Override
     public void encode(
-        BoundedHeap<T, ComparatorT> value, OutputStream outStream, Context context)
+        BoundedHeap<T, ComparatorT> value, OutputStream outStream)
         throws CoderException, IOException {
-      listCoder.encode(value.asList(), outStream, context);
+      listCoder.encode(value.asList(), outStream);
     }
 
     @Override
-    public BoundedHeap<T, ComparatorT> decode(InputStream inStream, Coder.Context context)
+    public BoundedHeap<T, ComparatorT> decode(InputStream inStream)
         throws CoderException, IOException {
-      return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream, context));
+      return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index e9a3571..877bb07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -242,8 +242,7 @@ public class CoGbkResult {
     @SuppressWarnings("unchecked")
     public void encode(
         CoGbkResult value,
-        OutputStream outStream,
-        Context context) throws CoderException,
+        OutputStream outStream) throws CoderException,
         IOException {
       if (!schema.equals(value.getSchema())) {
         throw new CoderException("input schema does not match coder schema");
@@ -251,27 +250,22 @@ public class CoGbkResult {
       if (schema.size() == 0) {
         return;
       }
-      int lastIndex = schema.size() - 1;
-      for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
-        tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested());
+      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
+        tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream);
       }
-      tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context);
     }
 
     @Override
     public CoGbkResult decode(
-        InputStream inStream,
-        Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       if (schema.size() == 0) {
         return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of());
       }
-      int lastIndex = schema.size() - 1;
       List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size());
-      for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
-        valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested()));
+      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
+        valueMap.add(tagListCoder(unionTag).decode(inStream));
       }
-      valueMap.add(tagListCoder(lastIndex).decode(inStream, context));
       return new CoGbkResult(schema, valueMap);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index 3194a37..66959d3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -54,6 +54,12 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
     return index;
   }
 
+  @Override
+  public void encode(RawUnionValue union, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(union, outStream, Context.NESTED);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void encode(
@@ -74,6 +80,11 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
   }
 
   @Override
+  public RawUnionValue decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public RawUnionValue decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     int index = VarInt.decodeInt(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 0bfb875..078cbee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -67,10 +67,10 @@ public class GlobalWindow extends BoundedWindow {
     public static final Coder INSTANCE = new Coder();
 
     @Override
-    public void encode(GlobalWindow window, OutputStream outStream, Context context) {}
+    public void encode(GlobalWindow window, OutputStream outStream) {}
 
     @Override
-    public GlobalWindow decode(InputStream inStream, Context context) {
+    public GlobalWindow decode(InputStream inStream) {
       return GlobalWindow.INSTANCE;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index 46ece09..f25a208 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -179,17 +179,17 @@ public class IntervalWindow extends BoundedWindow
     }
 
     @Override
-    public void encode(IntervalWindow window, OutputStream outStream, Context context)
+    public void encode(IntervalWindow window, OutputStream outStream)
         throws IOException, CoderException {
-      instantCoder.encode(window.end, outStream, context.nested());
-      durationCoder.encode(new Duration(window.start, window.end), outStream, context);
+      instantCoder.encode(window.end, outStream);
+      durationCoder.encode(new Duration(window.start, window.end), outStream);
     }
 
     @Override
-    public IntervalWindow decode(InputStream inStream, Context context)
+    public IntervalWindow decode(InputStream inStream)
         throws IOException, CoderException {
-      Instant end = instantCoder.decode(inStream, context.nested());
-      ReadableDuration duration = durationCoder.decode(inStream, context);
+      Instant end = instantCoder.decode(inStream);
+      ReadableDuration duration = durationCoder.decode(inStream);
       return new IntervalWindow(end.minus(duration), end);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index 79ce2f5..1e9a187 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
@@ -347,7 +346,7 @@ public final class PaneInfo {
     private PaneInfoCoder() {}
 
     @Override
-    public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context)
+    public void encode(PaneInfo value, final OutputStream outStream)
         throws CoderException, IOException {
       Encoding encoding = chooseEncoding(value);
       switch (chooseEncoding(value)) {
@@ -369,7 +368,7 @@ public final class PaneInfo {
     }
 
     @Override
-    public PaneInfo decode(final InputStream inStream, Coder.Context context)
+    public PaneInfo decode(final InputStream inStream)
         throws CoderException, IOException {
       byte keyAndTag = (byte) inStream.read();
       PaneInfo base = BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F));

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index a0896f5..b202065 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -43,12 +43,23 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
   }
 
   @Override
+  public void encode(BitSet value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(BitSet value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
   }
 
   @Override
+  public BitSet decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public BitSet decode(InputStream inStream, Context context)
       throws CoderException, IOException {
     return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1b7e335..444521a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -636,26 +636,34 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
+    public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(windowedElem, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(WindowedValue<T> windowedElem,
                        OutputStream outStream,
                        Context context)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      InstantCoder.of().encode(
-          windowedElem.getTimestamp(), outStream, nestedContext);
-      windowsCoder.encode(windowedElem.getWindows(), outStream, nestedContext);
-      PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext);
+      InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
+      windowsCoder.encode(windowedElem.getWindows(), outStream);
+      PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream);
       valueCoder.encode(windowedElem.getValue(), outStream, context);
     }
 
     @Override
+    public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public WindowedValue<T> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
+      Instant timestamp = InstantCoder.of().decode(inStream);
       Collection<? extends BoundedWindow> windows =
-          windowsCoder.decode(inStream, nestedContext);
-      PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+          windowsCoder.decode(inStream);
+      PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
       T value = valueCoder.decode(inStream, context);
       return WindowedValue.of(value, timestamp, windows, pane);
     }
@@ -710,12 +718,23 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
+    public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(windowedElem, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context)
         throws CoderException, IOException {
       valueCoder.encode(windowedElem.getValue(), outStream, context);
     }
 
     @Override
+    public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public WindowedValue<T> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       T value = valueCoder.decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index c172885..a4c8b3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -101,19 +101,18 @@ public class TimestampedValue<V> {
 
     @Override
     public void encode(TimestampedValue<T> windowedElem,
-                       OutputStream outStream,
-                       Context context)
+                       OutputStream outStream)
         throws IOException {
-      valueCoder.encode(windowedElem.getValue(), outStream, context.nested());
+      valueCoder.encode(windowedElem.getValue(), outStream);
       InstantCoder.of().encode(
-          windowedElem.getTimestamp(), outStream, context);
+          windowedElem.getTimestamp(), outStream);
     }
 
     @Override
-    public TimestampedValue<T> decode(InputStream inStream, Context context)
+    public TimestampedValue<T> decode(InputStream inStream)
         throws IOException {
-      T value = valueCoder.decode(inStream, context.nested());
-      Instant timestamp = InstantCoder.of().decode(inStream, context);
+      T value = valueCoder.decode(inStream);
+      Instant timestamp = InstantCoder.of().decode(inStream);
       return TimestampedValue.of(value, timestamp);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index 3ecbaa2..24c3c38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -75,21 +75,30 @@ public abstract class ValueInSingleWindow<T> {
     }
 
     @Override
+    public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream)
+        throws IOException {
+      encode(windowedElem, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context)
         throws IOException {
-      Context nestedContext = context.nested();
-      InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext);
-      windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
-      PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext);
+      InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
+      windowCoder.encode(windowedElem.getWindow(), outStream);
+      PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream);
       valueCoder.encode(windowedElem.getValue(), outStream, context);
     }
 
     @Override
+    public ValueInSingleWindow<T> decode(InputStream inStream) throws IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException {
-      Context nestedContext = context.nested();
-      Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
-      BoundedWindow window = windowCoder.decode(inStream, nestedContext);
-      PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+      Instant timestamp = InstantCoder.of().decode(inStream);
+      BoundedWindow window = windowCoder.decode(inStream);
+      PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
       T value = valueCoder.decode(inStream, context);
       return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
index 3f057e1..96a5f1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -101,17 +101,28 @@ public class ValueWithRecordId<ValueT> {
     }
 
     @Override
+    public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream)
+        throws IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context)
         throws IOException {
-      valueCoder.encode(value.value, outStream, context.nested());
+      valueCoder.encode(value.value, outStream);
       idCoder.encode(value.id, outStream, context);
     }
 
     @Override
+    public ValueWithRecordId<ValueT> decode(InputStream inStream) throws IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context)
         throws IOException {
       return new ValueWithRecordId<ValueT>(
-          valueCoder.decode(inStream, context.nested()),
+          valueCoder.decode(inStream),
           idCoder.decode(inStream, context));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 7ca7fb9..d1113f7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -81,12 +81,12 @@ public class CoderRegistryTest {
   @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
   private class MyListCoder extends AtomicCoder<List> {
     @Override
-    public void encode(List value, OutputStream outStream, Context context)
+    public void encode(List value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public List decode(InputStream inStream, Context context)
+    public List decode(InputStream inStream)
         throws CoderException, IOException {
       return Collections.emptyList();
     }
@@ -375,12 +375,12 @@ public class CoderRegistryTest {
     }
 
     @Override
-    public void encode(MyValue value, OutputStream outStream, Context context)
+    public void encode(MyValue value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public MyValue decode(InputStream inStream, Context context)
+    public MyValue decode(InputStream inStream)
         throws CoderException, IOException {
       return new MyValue();
     }
@@ -438,6 +438,14 @@ public class CoderRegistryTest {
 
   private static class AutoRegistrationClassCoder extends CustomCoder<AutoRegistrationClass> {
     private static final AutoRegistrationClassCoder INSTANCE = new AutoRegistrationClassCoder();
+
+    @Override
+    public void encode(AutoRegistrationClass value, OutputStream outStream) {}
+
+    @Override
+    public AutoRegistrationClass decode(InputStream inStream) {
+      return null;
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
index dfd4ea2..13a7261 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
@@ -48,13 +48,13 @@ public class CustomCoderTest {
     }
 
     @Override
-    public void encode(KV<String, Long> kv, OutputStream out, Context context)
+    public void encode(KV<String, Long> kv, OutputStream out)
             throws IOException {
       new DataOutputStream(out).writeLong(kv.getValue());
     }
 
     @Override
-    public KV<String, Long> decode(InputStream inStream, Context context)
+    public KV<String, Long> decode(InputStream inStream)
         throws IOException {
       return KV.of(key, new DataInputStream(inStream).readLong());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index d6d7de8..9fb0b82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -167,6 +167,12 @@ public class NullableCoderTest {
 
   private static class EntireStreamExpectingCoder extends AtomicCoder<String> {
     @Override
+    public void encode(String value, OutputStream outStream)
+        throws IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(
         String value, OutputStream outStream, Context context) throws IOException {
       checkArgument(context.isWholeStream, "Expected to get entire stream");
@@ -174,6 +180,11 @@ public class NullableCoderTest {
     }
 
     @Override
+    public String decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public String decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       checkArgument(context.isWholeStream, "Expected to get entire stream");

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index d97eea6..adb6652 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -182,15 +182,15 @@ public class SerializableCoderTest implements Serializable {
     // Encode both strings into NESTED form.
     byte[] nestedEncoding;
     try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-      coder.encode(source, os, Coder.Context.NESTED);
-      coder.encode(source2, os, Coder.Context.NESTED);
+      coder.encode(source, os);
+      coder.encode(source2, os);
       nestedEncoding = os.toByteArray();
     }
 
     // Decode from NESTED form.
     try (ByteArrayInputStream is = new ByteArrayInputStream(nestedEncoding)) {
-      assertEquals(source, coder.decode(is, Coder.Context.NESTED));
-      assertEquals(source2, coder.decode(is, Coder.Context.NESTED));
+      assertEquals(source, coder.decode(is));
+      assertEquals(source2, coder.decode(is));
       assertEquals(0, is.available());
     }
   }
@@ -207,20 +207,20 @@ public class SerializableCoderTest implements Serializable {
     Coder<String> coder = SerializableCoder.of(String.class);
     byte[] encodedBytes;
     try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-      coder.encode(null, os, Coder.Context.NESTED);
-      coder.encode("TestValue", os, Coder.Context.NESTED);
-      coder.encode(null, os, Coder.Context.NESTED);
-      coder.encode("TestValue2", os, Coder.Context.NESTED);
-      coder.encode(null, os, Coder.Context.NESTED);
+      coder.encode(null, os);
+      coder.encode("TestValue", os);
+      coder.encode(null, os);
+      coder.encode("TestValue2", os);
+      coder.encode(null, os);
       encodedBytes = os.toByteArray();
     }
 
     try (ByteArrayInputStream is = new ByteArrayInputStream(encodedBytes)) {
-      assertNull(coder.decode(is, Coder.Context.NESTED));
-      assertEquals("TestValue", coder.decode(is,  Coder.Context.NESTED));
-      assertNull(coder.decode(is, Coder.Context.NESTED));
-      assertEquals("TestValue2", coder.decode(is,  Coder.Context.NESTED));
-      assertNull(coder.decode(is, Coder.Context.NESTED));
+      assertNull(coder.decode(is));
+      assertEquals("TestValue", coder.decode(is));
+      assertNull(coder.decode(is));
+      assertEquals("TestValue2", coder.decode(is));
+      assertNull(coder.decode(is));
       assertEquals(0, is.available());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
index af2c94e..7aa2080 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
@@ -47,7 +47,7 @@ public class StructuredCoderTest {
     private static final long serialVersionUID = 0L;
 
     @Override
-    public void encode(@Nullable Boolean value, OutputStream outStream, Context context)
+    public void encode(@Nullable Boolean value, OutputStream outStream)
         throws CoderException, IOException {
       if (value == null) {
         outStream.write(2);
@@ -61,7 +61,7 @@ public class StructuredCoderTest {
     @Override
     @Nullable
     public Boolean decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       int value = inStream.read();
       if (value == 0) {
@@ -110,7 +110,7 @@ public class StructuredCoderTest {
 
     @Override
     public void encode(
-        @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context)
+        @Nullable ObjectIdentityBoolean value, OutputStream outStream)
         throws CoderException, IOException {
       if (value == null) {
         outStream.write(2);
@@ -124,7 +124,7 @@ public class StructuredCoderTest {
     @Override
     @Nullable
     public ObjectIdentityBoolean decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       int value = inStream.read();
       if (value == 0) {
@@ -213,13 +213,13 @@ public class StructuredCoderTest {
   private static class Foo<T> extends StructuredCoder<T> {
 
     @Override
-    public void encode(T value, OutputStream outStream, Coder.Context context)
+    public void encode(T value, OutputStream outStream)
         throws CoderException, IOException {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public T decode(InputStream inStream, Coder.Context context)
+    public T decode(InputStream inStream)
         throws CoderException, IOException {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
index 164d221..ce78411 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
@@ -50,15 +50,15 @@ public class CoderPropertiesTest {
   /** A coder that says it is not deterministic but actually is. */
   public static class NonDeterministicCoder extends AtomicCoder<String> {
     @Override
-    public void encode(String value, OutputStream outStream, Context context)
+    public void encode(String value, OutputStream outStream)
         throws CoderException, IOException {
-      StringUtf8Coder.of().encode(value, outStream, context);
+      StringUtf8Coder.of().encode(value, outStream);
     }
 
     @Override
-    public String decode(InputStream inStream, Context context)
+    public String decode(InputStream inStream)
         throws CoderException, IOException {
-      return StringUtf8Coder.of().decode(inStream, context);
+      return StringUtf8Coder.of().decode(inStream);
     }
 
     public void verifyDeterministic() throws NonDeterministicException {
@@ -96,15 +96,15 @@ public class CoderPropertiesTest {
     }
 
     @Override
-    public void encode(String value, OutputStream outStream, Context context)
+    public void encode(String value, OutputStream outStream)
         throws IOException, CoderException {
-      StringUtf8Coder.of().encode(value + System.nanoTime(), outStream, context);
+      StringUtf8Coder.of().encode(value + System.nanoTime(), outStream);
     }
 
     @Override
-    public String decode(InputStream inStream, Context context)
+    public String decode(InputStream inStream)
         throws CoderException, IOException {
-      return StringUtf8Coder.of().decode(inStream, context);
+      return StringUtf8Coder.of().decode(inStream);
     }
 
     @Override
@@ -136,16 +136,16 @@ public class CoderPropertiesTest {
     }
 
     @Override
-    public void encode(String value, OutputStream outStream, Context context)
+    public void encode(String value, OutputStream outStream)
         throws CoderException, IOException {
       changedState += 1;
-      StringUtf8Coder.of().encode(value + Strings.repeat("A", changedState), outStream, context);
+      StringUtf8Coder.of().encode(value + Strings.repeat("A", changedState), outStream);
     }
 
     @Override
-    public String decode(InputStream inStream, Context context)
+    public String decode(InputStream inStream)
         throws CoderException, IOException {
-      String decodedValue = StringUtf8Coder.of().decode(inStream, context);
+      String decodedValue = StringUtf8Coder.of().decode(inStream);
       return decodedValue.substring(0, decodedValue.length() - changedState);
     }
 
@@ -180,18 +180,18 @@ public class CoderPropertiesTest {
     }
 
     @Override
-    public void encode(String value, OutputStream outStream, Context context)
+    public void encode(String value, OutputStream outStream)
         throws CoderException, IOException {
       if (lostState == 0) {
         throw new RuntimeException("I forgot something...");
       }
-      StringUtf8Coder.of().encode(value, outStream, context);
+      StringUtf8Coder.of().encode(value, outStream);
     }
 
     @Override
-    public String decode(InputStream inStream, Context context)
+    public String decode(InputStream inStream)
         throws CoderException, IOException {
-      return StringUtf8Coder.of().decode(inStream, context);
+      return StringUtf8Coder.of().decode(inStream);
     }
 
     @Override
@@ -216,12 +216,12 @@ public class CoderPropertiesTest {
   /** A coder which closes the underlying stream during encoding and decoding. */
   public static class ClosingCoder extends AtomicCoder<String> {
     @Override
-    public void encode(String value, OutputStream outStream, Context context) throws IOException {
+    public void encode(String value, OutputStream outStream) throws IOException {
       outStream.close();
     }
 
     @Override
-    public String decode(InputStream inStream, Context context) throws IOException {
+    public String decode(InputStream inStream) throws IOException {
       inStream.close();
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 83f348c..37db4ef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -98,12 +98,12 @@ public class PAssertTest implements Serializable {
     }
 
     @Override
-    public void encode(NotSerializableObject value, OutputStream outStream, Context context)
+    public void encode(NotSerializableObject value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public NotSerializableObject decode(InputStream inStream, Context context)
+    public NotSerializableObject decode(InputStream inStream)
         throws CoderException, IOException {
       return new NotSerializableObject();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index db5ff2e..6b17696 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -30,7 +30,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.hamcrest.Matchers;
@@ -153,11 +152,11 @@ public class SerializableMatchersTest implements Serializable {
 
   private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> {
     @Override
-    public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) {
+    public void encode(NotSerializableClass value, OutputStream outStream) {
     }
 
     @Override
-    public NotSerializableClass decode(InputStream inStream, Coder.Context context) {
+    public NotSerializableClass decode(InputStream inStream) {
       return new NotSerializableClass();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
index 546683b..3939800 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -75,14 +75,14 @@ public class WindowSupplierTest {
   private static class FailingCoder extends AtomicCoder<BoundedWindow> {
     @Override
     public void encode(
-        BoundedWindow value, OutputStream outStream, Context context)
+        BoundedWindow value, OutputStream outStream)
         throws CoderException, IOException {
       throw new CoderException("Test Encode Exception");
     }
 
     @Override
     public BoundedWindow decode(
-        InputStream inStream, Context context) throws CoderException, IOException {
+        InputStream inStream) throws CoderException, IOException {
       throw new CoderException("Test Decode Exception");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 8a4d563..33c652a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -336,12 +336,23 @@ public class  CombineFnsTest {
     private static final UserStringCoder INSTANCE = new UserStringCoder();
 
     @Override
+    public void encode(UserString value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(UserString value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       StringUtf8Coder.of().encode(value.strValue, outStream, context);
     }
 
     @Override
+    public UserString decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public UserString decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       return UserString.of(StringUtf8Coder.of().decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index a70af94..dc9788f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -876,17 +876,17 @@ public class CombineTest implements Serializable {
      */
     private class CountSumCoder extends AtomicCoder<CountSum> {
       @Override
-      public void encode(CountSum value, OutputStream outStream,
-          Context context) throws CoderException, IOException {
-        LONG_CODER.encode(value.count, outStream, context.nested());
-        DOUBLE_CODER.encode(value.sum, outStream, context);
+      public void encode(CountSum value, OutputStream outStream)
+          throws CoderException, IOException {
+        LONG_CODER.encode(value.count, outStream);
+        DOUBLE_CODER.encode(value.sum, outStream);
       }
 
       @Override
-      public CountSum decode(InputStream inStream, Coder.Context context)
+      public CountSum decode(InputStream inStream)
           throws CoderException, IOException {
-        long count = LONG_CODER.decode(inStream, context.nested());
-        double sum = DOUBLE_CODER.decode(inStream, context);
+        long count = LONG_CODER.decode(inStream);
+        double sum = DOUBLE_CODER.decode(inStream);
         return new CountSum(count, sum);
       }
 
@@ -925,12 +925,23 @@ public class CombineTest implements Serializable {
       public static Coder<Accumulator> getCoder() {
         return new AtomicCoder<Accumulator>() {
           @Override
+          public void encode(Accumulator accumulator, OutputStream outStream)
+              throws CoderException, IOException {
+            encode(accumulator, outStream, Coder.Context.NESTED);
+          }
+
+          @Override
           public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
               throws CoderException, IOException {
             StringUtf8Coder.of().encode(accumulator.value, outStream, context);
           }
 
           @Override
+          public Accumulator decode(InputStream inStream) throws CoderException, IOException {
+            return decode(inStream, Coder.Context.NESTED);
+          }
+
+          @Override
           public Accumulator decode(InputStream inStream, Coder.Context context)
               throws CoderException, IOException {
             return new Accumulator(StringUtf8Coder.of().decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index a458812..a05d31c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -134,11 +134,11 @@ public class CreateTest {
 
   private static class RecordCoder extends AtomicCoder<Record> {
     @Override
-    public void encode(Record value, OutputStream outStream, Context context)
+    public void encode(Record value, OutputStream outStream)
         throws CoderException, IOException {}
 
     @Override
-    public Record decode(InputStream inStream, Context context) throws CoderException, IOException {
+    public Record decode(InputStream inStream) throws CoderException, IOException {
       return null;
     }
   }
@@ -207,17 +207,16 @@ public class CreateTest {
       @Override
       public void encode(
           UnserializableRecord value,
-          OutputStream outStream,
-          org.apache.beam.sdk.coders.Coder.Context context)
+          OutputStream outStream)
           throws CoderException, IOException {
-        stringCoder.encode(value.myString, outStream, context.nested());
+        stringCoder.encode(value.myString, outStream);
       }
 
       @Override
       public UnserializableRecord decode(
-          InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+          InputStream inStream)
           throws CoderException, IOException {
-        return new UnserializableRecord(stringCoder.decode(inStream, context.nested()));
+        return new UnserializableRecord(stringCoder.decode(inStream));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index aba33eb..0cd885c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -469,13 +469,13 @@ public class GroupByKeyTest {
     private DeterministicKeyCoder() {}
 
     @Override
-    public void encode(BadEqualityKey value, OutputStream outStream, Context context)
+    public void encode(BadEqualityKey value, OutputStream outStream)
         throws IOException {
       new DataOutputStream(outStream).writeLong(value.key);
     }
 
     @Override
-    public BadEqualityKey decode(InputStream inStream, Context context)
+    public BadEqualityKey decode(InputStream inStream)
         throws IOException {
       return new BadEqualityKey(new DataInputStream(inStream).readLong());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d2cb980..ef27f4c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -986,12 +986,12 @@ public class ParDoTest implements Serializable {
     }
 
     @Override
-    public void encode(TestDummy value, OutputStream outStream, Context context)
+    public void encode(TestDummy value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public TestDummy decode(InputStream inStream, Context context)
+    public TestDummy decode(InputStream inStream)
         throws CoderException, IOException {
       return new TestDummy();
     }
@@ -1090,15 +1090,15 @@ public class ParDoTest implements Serializable {
     }
 
     @Override
-    public void encode(MyInteger value, OutputStream outStream, Context context)
+    public void encode(MyInteger value, OutputStream outStream)
         throws CoderException, IOException {
-      delegate.encode(value.getValue(), outStream, context);
+      delegate.encode(value.getValue(), outStream);
     }
 
     @Override
-    public MyInteger decode(InputStream inStream, Context context) throws CoderException,
+    public MyInteger decode(InputStream inStream) throws CoderException,
         IOException {
-      return new MyInteger(delegate.decode(inStream, context));
+      return new MyInteger(delegate.decode(inStream));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 84f3d69..cdd03d9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -507,12 +507,23 @@ public class ViewTest implements Serializable {
 
   private static class NonDeterministicStringCoder extends AtomicCoder<String> {
     @Override
+    public void encode(String value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public void encode(String value, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {
       StringUtf8Coder.of().encode(value, outStream, context);
     }
 
     @Override
+    public String decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public String decode(InputStream inStream, Coder.Context context)
         throws CoderException, IOException {
       return StringUtf8Coder.of().decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 489493a..a8cd35e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -277,10 +277,10 @@ public class DoFnInvokersTest {
     }
 
     @Override
-    public void encode(SomeRestriction value, OutputStream outStream, Context context) {}
+    public void encode(SomeRestriction value, OutputStream outStream) {}
 
     @Override
-    public SomeRestriction decode(InputStream inStream, Context context) {
+    public SomeRestriction decode(InputStream inStream) {
       return null;
     }
   }
@@ -400,10 +400,10 @@ public class DoFnInvokersTest {
 
     @Override
     public void encode(
-        RestrictionWithDefaultTracker value, OutputStream outStream, Context context) {}
+        RestrictionWithDefaultTracker value, OutputStream outStream) {}
 
     @Override
-    public RestrictionWithDefaultTracker decode(InputStream inStream, Context context) {
+    public RestrictionWithDefaultTracker decode(InputStream inStream) {
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
index 314b969..9ae5d68 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
@@ -35,7 +35,7 @@ public class GlobalWindowTest {
     CountingOutputStream out = new CountingOutputStream(ByteStreams.nullOutputStream());
     GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.OUTER);
     assertEquals(0, out.getCount());
-    GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.NESTED);
+    GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out);
     assertEquals(0, out.getCount());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
index 36f7028..894d8a9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
@@ -32,7 +32,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Rule;
 import org.junit.Test;
@@ -180,7 +179,7 @@ public class BufferedElementCountingOutputStreamTest {
     do {
       count = VarInt.decodeLong(is);
       for (int i = 0; i < count; ++i) {
-        values.add(ByteArrayCoder.of().decode(is, Context.NESTED));
+        values.add(ByteArrayCoder.of().decode(is));
       }
     } while(count > 0);
 
@@ -198,7 +197,7 @@ public class BufferedElementCountingOutputStreamTest {
 
     for (byte[] value : values) {
       os.markElementStart();
-      ByteArrayCoder.of().encode(value, os, Context.NESTED);
+      ByteArrayCoder.of().encode(value, os);
     }
     return os;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 7230a8b..f36e5e1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -50,12 +50,12 @@ public class CoderUtilsTest {
     }
 
     @Override
-    public void encode(Integer value, OutputStream outStream, Context context) {
+    public void encode(Integer value, OutputStream outStream) {
       throw new RuntimeException("not expecting to be called");
     }
 
     @Override
-    public Integer decode(InputStream inStream, Context context) {
+    public Integer decode(InputStream inStream) {
       throw new RuntimeException("not expecting to be called");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 6ba1d4a..9a80730 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -89,12 +89,12 @@ public class SerializableUtilsTest {
     private final Object unserializableField = new Object();
 
     @Override
-    public void encode(Object value, OutputStream outStream, Context context)
+    public void encode(Object value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public Object decode(InputStream inStream, Context context)
+    public Object decode(InputStream inStream)
         throws CoderException, IOException {
       return unserializableField;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index 325c69d..73c7977 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -49,6 +49,12 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
   private ByteStringCoder() {}
 
   @Override
+  public void encode(ByteString value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(ByteString value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     if (value == null) {
@@ -63,6 +69,11 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
   }
 
   @Override
+  public ByteString decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public ByteString decode(InputStream inStream, Context context) throws IOException {
     if (context.isWholeStream) {
       return ByteString.readFrom(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 968a2fa..f73bf2b 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -168,6 +168,12 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
   }
 
   @Override
+  public void encode(T value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(T value, OutputStream outStream, Context context) throws IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
@@ -180,6 +186,11 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
   }
 
   @Override
+  public T decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public T decode(InputStream inStream, Context context) throws IOException {
     if (context.isWholeStream) {
       return getParser().parseFrom(inStream, getExtensionRegistry());

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
index 18e0d95..7223e87 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
@@ -27,7 +27,6 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -110,7 +109,7 @@ public class BeamFnDataBufferingOutboundObserver<T>
 
   @Override
   public void accept(WindowedValue<T> t) throws IOException {
-    coder.encode(t, bufferedElements, Context.NESTED);
+    coder.encode(t, bufferedElements);
     counter += 1;
     if (bufferedElements.size() >= bufferLimit) {
       outboundObserver.onNext(convertBufferForTransmission().build());

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
index 24365d8..ac603bd 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
@@ -23,7 +23,6 @@ import java.util.function.Consumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,7 +70,7 @@ public class BeamFnDataInboundObserver<T> implements Consumer<BeamFnApi.Elements
       InputStream inputStream = t.getData().newInput();
       while (inputStream.available() > 0) {
         counter += 1;
-        WindowedValue<T> value = coder.decode(inputStream, Context.NESTED);
+        WindowedValue<T> value = coder.decode(inputStream);
         consumer.accept(value);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
index 7cbf8eb..c2b4542 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -33,7 +33,6 @@ import org.apache.beam.fn.harness.test.TestStreams;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -135,7 +134,7 @@ public class BeamFnDataBufferingOutboundObserverTest {
   private static BeamFnApi.Elements messageWithData(byte[] ... datum) throws IOException {
     ByteString.Output output = ByteString.newOutput();
     for (byte[] data : datum) {
-      CODER.encode(valueInGlobalWindow(data), output, Context.NESTED);
+      CODER.encode(valueInGlobalWindow(data), output);
     }
     return BeamFnApi.Elements.newBuilder()
         .addData(BeamFnApi.Elements.Data.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
index c53f99d..54aba8b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -108,7 +107,7 @@ public class BeamFnDataInboundObserverTest {
             .setName("Test"));
     ByteString.Output output = ByteString.newOutput();
     for (String value : values) {
-      CODER.encode(valueInGlobalWindow(value), output, Context.NESTED);
+      CODER.encode(valueInGlobalWindow(value), output);
     }
     builder.setData(output.toByteString());
     return builder.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
index 7aefcfa..c2b62b7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -53,17 +53,18 @@ class ShardedKeyCoder<KeyT>
   }
 
   @Override
-  public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context)
+  public void encode(ShardedKey<KeyT> key, OutputStream outStream)
       throws IOException {
-    keyCoder.encode(key.getKey(), outStream, context.nested());
-    shardNumberCoder.encode(key.getShardNumber(), outStream, context);
+    keyCoder.encode(key.getKey(), outStream);
+    shardNumberCoder.encode(key.getShardNumber(), outStream);
   }
 
   @Override
-  public ShardedKey<KeyT> decode(InputStream inStream, Context context)
+  public ShardedKey<KeyT> decode(InputStream inStream)
       throws IOException {
     return new ShardedKey<>(
-        keyCoder.decode(inStream, context.nested()), shardNumberCoder.decode(inStream, context));
+        keyCoder.decode(inStream),
+        shardNumberCoder.decode(inStream));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 01bc558..f034a03 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -38,19 +38,19 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
   }
 
   @Override
-  public void encode(TableDestination value, OutputStream outStream, Context context)
+  public void encode(TableDestination value, OutputStream outStream)
       throws IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null value");
     }
-    tableSpecCoder.encode(value.getTableSpec(), outStream, context.nested());
-    tableDescriptionCoder.encode(value.getTableDescription(), outStream, context);
+    tableSpecCoder.encode(value.getTableSpec(), outStream);
+    tableDescriptionCoder.encode(value.getTableDescription(), outStream);
   }
 
   @Override
-  public TableDestination decode(InputStream inStream, Context context) throws IOException {
-    String tableSpec = tableSpecCoder.decode(inStream, context.nested());
-    String tableDescription = tableDescriptionCoder.decode(inStream, context);
+  public TableDestination decode(InputStream inStream) throws IOException {
+    String tableSpec = tableSpecCoder.decode(inStream);
+    String tableDescription = tableDescriptionCoder.decode(inStream);
     return new TableDestination(tableSpec, tableDescription);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 2b1988a..c4707da 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -38,20 +38,31 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
   }
 
   @Override
+  public void encode(TableRowInfo value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(TableRowInfo value, OutputStream outStream, Context context)
       throws IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null value");
     }
-    tableRowCoder.encode(value.tableRow, outStream, context.nested());
+    tableRowCoder.encode(value.tableRow, outStream);
     idCoder.encode(value.uniqueId, outStream, context);
   }
 
   @Override
+  public TableRowInfo decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public TableRowInfo decode(InputStream inStream, Context context)
       throws IOException {
     return new TableRowInfo(
-        tableRowCoder.decode(inStream, context.nested()),
+        tableRowCoder.decode(inStream),
         idCoder.decode(inStream, context));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index cfec991..e4b6f1f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -38,6 +38,12 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
   }
 
   @Override
+  public void encode(TableRow value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(TableRow value, OutputStream outStream, Context context)
       throws IOException {
     String strValue = MAPPER.writeValueAsString(value);
@@ -45,6 +51,11 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
   }
 
   @Override
+  public TableRow decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public TableRow decode(InputStream inStream, Context context)
       throws IOException {
     String strValue = StringUtf8Coder.of().decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 890979b..f014039 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -101,21 +101,21 @@ class WriteBundlesToFiles<DestinationT>
     }
 
     @Override
-    public void encode(Result<DestinationT> value, OutputStream outStream, Context context)
+    public void encode(Result<DestinationT> value, OutputStream outStream)
         throws IOException {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
-      stringCoder.encode(value.filename, outStream, context.nested());
-      longCoder.encode(value.fileByteSize, outStream, context.nested());
-      destinationCoder.encode(value.destination, outStream, context.nested());
+      stringCoder.encode(value.filename, outStream);
+      longCoder.encode(value.fileByteSize, outStream);
+      destinationCoder.encode(value.destination, outStream);
     }
 
     @Override
-    public Result<DestinationT> decode(InputStream inStream, Context context) throws IOException {
-      String filename = stringCoder.decode(inStream, context.nested());
-      long fileByteSize = longCoder.decode(inStream, context.nested());
-      DestinationT destination = destinationCoder.decode(inStream, context.nested());
+    public Result<DestinationT> decode(InputStream inStream) throws IOException {
+      String filename = stringCoder.decode(inStream);
+      long fileByteSize = longCoder.decode(inStream);
+      DestinationT destination = destinationCoder.decode(inStream);
       return new Result<>(filename, fileByteSize, destination);
     }
 


Mime
View raw message