beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [05/13] beam git commit: automated context removal or redirection
Date Tue, 09 May 2017 04:21:25 GMT
automated context removal or redirection


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b7f3341e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b7f3341e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b7f3341e

Branch: refs/heads/master
Commit: b7f3341ed4fc073a834a55773bcb4a2c7f821c52
Parents: 996dce3
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Fri May 5 17:24:02 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Mon May 8 20:17:56 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/ApexStreamTuple.java | 11 ++++++++
 .../UnboundedReadFromBoundedSource.java         |  4 +--
 .../runners/core/construction/CodersTest.java   |  4 +--
 .../core/ElementAndRestrictionCoder.java        |  4 +--
 .../beam/runners/core/KeyedWorkItemCoder.java   |  6 ++---
 .../beam/runners/core/TimerInternals.java       |  6 ++---
 .../direct/CloningBundleFactoryTest.java        | 20 ++++++--------
 .../UnboundedReadEvaluatorFactoryTest.java      |  5 ++--
 .../streaming/SingletonKeyedWorkItemCoder.java  | 11 ++++++++
 .../runners/dataflow/BatchViewOverrides.java    | 11 ++++++++
 .../runners/dataflow/internal/IsmFormat.java    | 28 +++++++++++++-------
 .../runners/dataflow/util/RandomAccessData.java | 11 ++++++++
 .../org/apache/beam/sdk/coders/AvroCoder.java   |  4 +--
 .../apache/beam/sdk/coders/BigDecimalCoder.java | 11 ++++++++
 .../beam/sdk/coders/BigEndianIntegerCoder.java  |  4 +--
 .../beam/sdk/coders/BigEndianLongCoder.java     |  4 +--
 .../apache/beam/sdk/coders/BigIntegerCoder.java | 11 ++++++++
 .../org/apache/beam/sdk/coders/BitSetCoder.java | 11 ++++++++
 .../apache/beam/sdk/coders/ByteArrayCoder.java  | 11 ++++++++
 .../org/apache/beam/sdk/coders/ByteCoder.java   |  4 +--
 .../apache/beam/sdk/coders/DelegateCoder.java   | 11 ++++++++
 .../org/apache/beam/sdk/coders/DoubleCoder.java |  4 +--
 .../apache/beam/sdk/coders/DurationCoder.java   |  4 +--
 .../apache/beam/sdk/coders/InstantCoder.java    |  4 +--
 .../beam/sdk/coders/IterableLikeCoder.java      |  6 ++---
 .../org/apache/beam/sdk/coders/KvCoder.java     | 11 ++++++++
 .../beam/sdk/coders/LengthPrefixCoder.java      |  4 +--
 .../org/apache/beam/sdk/coders/MapCoder.java    | 11 ++++++++
 .../apache/beam/sdk/coders/NullableCoder.java   | 11 ++++++++
 .../beam/sdk/coders/SerializableCoder.java      |  4 +--
 .../beam/sdk/coders/StringDelegateCoder.java    | 11 ++++++++
 .../apache/beam/sdk/coders/StringUtf8Coder.java | 11 ++++++++
 .../beam/sdk/coders/TextualIntegerCoder.java    | 11 ++++++++
 .../org/apache/beam/sdk/coders/VarIntCoder.java |  4 +--
 .../apache/beam/sdk/coders/VarLongCoder.java    |  4 +--
 .../org/apache/beam/sdk/coders/VoidCoder.java   |  4 +--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 11 ++++++++
 .../sdk/transforms/ApproximateQuantiles.java    |  4 +--
 .../org/apache/beam/sdk/transforms/Combine.java | 22 +++++++++++++++
 .../apache/beam/sdk/transforms/CombineFns.java  | 13 +++++++--
 .../org/apache/beam/sdk/transforms/Count.java   |  4 +--
 .../org/apache/beam/sdk/transforms/Mean.java    |  4 +--
 .../org/apache/beam/sdk/transforms/Top.java     |  4 +--
 .../beam/sdk/transforms/join/CoGbkResult.java   |  6 ++---
 .../beam/sdk/transforms/join/UnionCoder.java    | 11 ++++++++
 .../sdk/transforms/windowing/GlobalWindow.java  |  4 +--
 .../transforms/windowing/IntervalWindow.java    |  4 +--
 .../beam/sdk/transforms/windowing/PaneInfo.java |  4 +--
 .../org/apache/beam/sdk/util/BitSetCoder.java   | 11 ++++++++
 .../org/apache/beam/sdk/util/WindowedValue.java | 24 +++++++++++++++--
 .../beam/sdk/values/TimestampedValue.java       |  5 ++--
 .../beam/sdk/values/ValueInSingleWindow.java    | 13 +++++++--
 .../beam/sdk/values/ValueWithRecordId.java      | 11 ++++++++
 .../beam/sdk/coders/CoderRegistryTest.java      |  8 +++---
 .../apache/beam/sdk/coders/CustomCoderTest.java |  4 +--
 .../beam/sdk/coders/NullableCoderTest.java      | 11 ++++++++
 .../beam/sdk/coders/StructuredCoderTest.java    | 12 ++++-----
 .../apache/beam/sdk/testing/PAssertTest.java    |  4 +--
 .../sdk/testing/SerializableMatchersTest.java   |  4 +--
 .../beam/sdk/testing/WindowSupplierTest.java    |  4 +--
 .../beam/sdk/transforms/CombineFnsTest.java     | 11 ++++++++
 .../apache/beam/sdk/transforms/CombineTest.java | 22 +++++++++++++++
 .../apache/beam/sdk/transforms/CreateTest.java  |  9 +++----
 .../beam/sdk/transforms/GroupByKeyTest.java     |  4 +--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 15 +++++++++--
 .../apache/beam/sdk/transforms/ViewTest.java    | 11 ++++++++
 .../transforms/reflect/DoFnInvokersTest.java    |  8 +++---
 .../apache/beam/sdk/util/CoderUtilsTest.java    |  4 +--
 .../beam/sdk/util/SerializableUtilsTest.java    |  4 +--
 .../extensions/protobuf/ByteStringCoder.java    | 11 ++++++++
 .../sdk/extensions/protobuf/ProtoCoder.java     | 11 ++++++++
 .../io/gcp/bigquery/TableDestinationCoder.java  |  2 +-
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  | 11 ++++++++
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  | 11 ++++++++
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  4 +--
 .../pubsub/PubsubMessagePayloadOnlyCoder.java   | 11 ++++++++
 .../PubsubMessageWithAttributesCoder.java       | 11 ++++++++
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  4 +--
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 11 ++++++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 11 ++++++++
 .../beam/sdk/io/hadoop/WritableCoder.java       |  4 +--
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |  6 ++---
 .../beam/sdk/io/hbase/HBaseResultCoder.java     |  4 +--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  4 +--
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     | 11 ++++++++
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  4 +--
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   | 25 ++++++++++-------
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   | 13 +++++++--
 88 files changed, 598 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
index 4aa6ee8..1d402eb 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -162,6 +162,12 @@ public interface ApexStreamTuple<T> {
     }
 
     @Override
+    public void encode(ApexStreamTuple<T> value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       if (value instanceof WatermarkTuple) {
@@ -175,6 +181,11 @@ public interface ApexStreamTuple<T> {
     }
 
     @Override
+    public ApexStreamTuple<T> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public ApexStreamTuple<T> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       int b = inStream.read();

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index b74da80..24eb384 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -221,7 +221,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
       }
 
       @Override
-      public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+      public void encode(Checkpoint<T> value, OutputStream outStream)
           throws CoderException, IOException {
         elemsCoder.encode(value.residualElements, outStream);
         sourceCoder.encode(value.residualSource, outStream);
@@ -229,7 +229,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
 
       @SuppressWarnings("unchecked")
       @Override
-      public Checkpoint<T> decode(InputStream inStream, Context context)
+      public Checkpoint<T> decode(InputStream inStream)
           throws CoderException, IOException {
         return new Checkpoint<>(
             elemsCoder.decode(inStream),

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index 765723c..42fba7c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -151,11 +151,11 @@ public class CodersTest {
 
     private static class RecordCoder extends AtomicCoder<Record> {
       @Override
-      public void encode(Record value, OutputStream outStream, Context context)
+      public void encode(Record value, OutputStream outStream)
           throws CoderException, IOException {}
 
       @Override
-      public Record decode(InputStream inStream, Context context)
+      public Record decode(InputStream inStream)
           throws CoderException, IOException {
         return new Record();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index fcb1deb..4440b85 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -50,7 +50,7 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
 
   @Override
   public void encode(
-      ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream, Context context)
+      ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream)
       throws IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null ElementAndRestriction");
@@ -60,7 +60,7 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
   }
 
   @Override
-  public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context)
+  public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream)
       throws IOException {
     ElementT key = elementCoder.decode(inStream);
     RestrictionT value = restrictionCoder.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index 0869244..b1cb1a6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -69,18 +69,16 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<
   }
 
   @Override
-  public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
+  public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream)
       throws CoderException, IOException {
-    Coder.Context nestedContext = context.nested();
     keyCoder.encode(value.key(), outStream);
     timersCoder.encode(value.timersIterable(), outStream);
     elemsCoder.encode(value.elementsIterable(), outStream);
   }
 
   @Override
-  public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context)
+  public KeyedWorkItem<K, ElemT> decode(InputStream inStream)
       throws CoderException, IOException {
-    Coder.Context nestedContext = context.nested();
     K key = keyCoder.decode(inStream);
     Iterable<TimerData> timers = timersCoder.decode(inStream);
     Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index f0a62cd..f4a12d0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -238,9 +238,8 @@ public interface TimerInternals {
     }
 
     @Override
-    public void encode(TimerData timer, OutputStream outStream, Context context)
+    public void encode(TimerData timer, OutputStream outStream)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
       STRING_CODER.encode(timer.getTimerId(), outStream);
       STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
       INSTANT_CODER.encode(timer.getTimestamp(), outStream);
@@ -248,9 +247,8 @@ public interface TimerInternals {
     }
 
     @Override
-    public TimerData decode(InputStream inStream, Context context)
+    public TimerData decode(InputStream inStream)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
       String timerId = STRING_CODER.decode(inStream);
       StateNamespace namespace =
           StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 33d171e..5bc8077 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -178,15 +178,14 @@ public class CloningBundleFactoryTest {
     @Override
     public void encode(
         Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
+        OutputStream outStream)
         throws IOException {
       throw new CoderException("Encode not allowed");
     }
 
     @Override
     public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws IOException {
       return null;
     }
@@ -196,13 +195,12 @@ public class CloningBundleFactoryTest {
     @Override
     public void encode(
         Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
+        OutputStream outStream)
         throws IOException {}
 
     @Override
     public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws IOException {
       throw new CoderException("Decode not allowed");
     }
@@ -212,13 +210,12 @@ public class CloningBundleFactoryTest {
     @Override
     public void encode(
         Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
+        OutputStream outStream)
         throws CoderException, IOException {}
 
     @Override
     public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       return new Record() {
         @Override
@@ -244,13 +241,12 @@ public class CloningBundleFactoryTest {
     @Override
     public void encode(
         Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
+        OutputStream outStream)
         throws CoderException, IOException {}
 
     @Override
     public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       return new Record() {
         @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index b9ba7f4..2a01db5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -590,15 +590,14 @@ public class UnboundedReadEvaluatorFactoryTest {
       @Override
       public void encode(
           TestCheckpointMark value,
-          OutputStream outStream,
-          org.apache.beam.sdk.coders.Coder.Context context)
+          OutputStream outStream)
           throws IOException {
         VarInt.encode(value.index, outStream);
       }
 
       @Override
       public TestCheckpointMark decode(
-          InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+          InputStream inStream)
           throws IOException {
         TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream));
         decoded.decoded = true;

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index d7bae7e..b62fc16 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -66,6 +66,12 @@ public class SingletonKeyedWorkItemCoder<K, ElemT>
   }
 
   @Override
+  public void encode(SingletonKeyedWorkItem<K, ElemT> value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(SingletonKeyedWorkItem<K, ElemT> value,
                      OutputStream outStream,
                      Context context)
@@ -75,6 +81,11 @@ public class SingletonKeyedWorkItemCoder<K, ElemT>
   }
 
   @Override
+  public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
       throws CoderException, IOException {
     K key = keyCoder.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 0e60fa0..34609df 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1351,6 +1351,12 @@ class BatchViewOverrides {
     }
 
     @Override
+    public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(outStream, outStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
         Coder.Context context) throws CoderException, IOException {
       transformCoder.encode(value.transform, outStream);
@@ -1358,6 +1364,11 @@ class BatchViewOverrides {
     }
 
     @Override
+    public TransformedMap<K, V1, V2> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public TransformedMap<K, V1, V2> decode(
         InputStream inStream, Coder.Context context) throws CoderException, IOException {
       return new TransformedMap<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 0f0cd4d..8cfae81 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -231,8 +231,7 @@ public class IsmFormat {
     }
 
     @Override
-    public void encode(IsmRecord<V> value, OutputStream outStream,
-        Coder.Context context) throws CoderException, IOException {
+    public void encode(IsmRecord<V> value, OutputStream outStream) throws CoderException, IOException {
       if (value.getKeyComponents().size() != keyComponentCoders.size()) {
         throw new CoderException(String.format(
             "Expected %s key component(s) but received key component(s) %s.",
@@ -249,7 +248,7 @@ public class IsmFormat {
     }
 
     @Override
-    public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
+    public IsmRecord<V> decode(InputStream inStream)
         throws CoderException, IOException {
       List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
       for (Coder<?> keyCoder : keyComponentCoders) {
@@ -493,7 +492,7 @@ public class IsmFormat {
     }
 
     @Override
-    public void encode(K value, OutputStream outStream, Coder.Context context)
+    public void encode(K value, OutputStream outStream)
         throws CoderException, IOException {
       if (value == METADATA_KEY) {
         outStream.write(0);
@@ -504,7 +503,7 @@ public class IsmFormat {
     }
 
     @Override
-    public K decode(InputStream inStream, Coder.Context context)
+    public K decode(InputStream inStream)
         throws CoderException, IOException {
       int marker = inStream.read();
       if (marker == 0) {
@@ -621,6 +620,12 @@ public class IsmFormat {
     private IsmShardCoder() {}
 
     @Override
+    public void encode(IsmShard value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {
       checkState(value.getIndexOffset() >= 0,
@@ -632,6 +637,11 @@ public class IsmFormat {
     }
 
     @Override
+    public IsmShard decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public IsmShard decode(
         InputStream inStream, Coder.Context context) throws CoderException, IOException {
       return IsmShard.of(
@@ -683,14 +693,14 @@ public class IsmFormat {
     }
 
     @Override
-    public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
+    public void encode(KeyPrefix value, OutputStream outStream)
         throws CoderException, IOException {
       VarInt.encode(value.getSharedKeySize(), outStream);
       VarInt.encode(value.getUnsharedKeySize(), outStream);
     }
 
     @Override
-    public KeyPrefix decode(InputStream inStream, Coder.Context context)
+    public KeyPrefix decode(InputStream inStream)
         throws CoderException, IOException {
       return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
     }
@@ -755,7 +765,7 @@ public class IsmFormat {
     }
 
     @Override
-    public void encode(Footer value, OutputStream outStream, Coder.Context context)
+    public void encode(Footer value, OutputStream outStream)
         throws CoderException, IOException {
       DataOutputStream dataOut = new DataOutputStream(outStream);
       dataOut.writeLong(value.getIndexPosition());
@@ -765,7 +775,7 @@ public class IsmFormat {
     }
 
     @Override
-    public Footer decode(InputStream inStream, Coder.Context context)
+    public Footer decode(InputStream inStream)
         throws CoderException, IOException {
       DataInputStream dataIn = new DataInputStream(inStream);
       Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index f36bd78..5ea9f07 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -63,6 +63,12 @@ public class RandomAccessData {
     }
 
     @Override
+    public void encode(RandomAccessData value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {
       if (value == POSITIVE_INFINITY) {
@@ -75,6 +81,11 @@ public class RandomAccessData {
     }
 
     @Override
+    public RandomAccessData decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public RandomAccessData decode(InputStream inStream, Coder.Context context)
         throws CoderException, IOException {
       RandomAccessData rval = new RandomAccessData();

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index f82c065..bba669d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -300,7 +300,7 @@ public class AvroCoder<T> extends CustomCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
+  public void encode(T value, OutputStream outStream) throws IOException {
     // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
     BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
     // Save the potentially-new instance for reuse later.
@@ -310,7 +310,7 @@ public class AvroCoder<T> extends CustomCoder<T> {
   }
 
   @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
+  public T decode(InputStream inStream) throws IOException {
     // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
     BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
     // Save the potentially-new instance for later.

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index e2166cf..e890d11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -48,6 +48,12 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
   private BigDecimalCoder() {}
 
   @Override
+  public void encode(BigDecimal value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(BigDecimal value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
@@ -56,6 +62,11 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
   }
 
   @Override
+  public BigDecimal decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public BigDecimal decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     int scale = VAR_INT_CODER.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index a61f099..efb1e4b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -43,7 +43,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
   private BigEndianIntegerCoder() {}
 
   @Override
-  public void encode(Integer value, OutputStream outStream, Context context)
+  public void encode(Integer value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Integer");
@@ -52,7 +52,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
   }
 
   @Override
-  public Integer decode(InputStream inStream, Context context)
+  public Integer decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return new DataInputStream(inStream).readInt();

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 868160a..ab85e17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -43,7 +43,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
   private BigEndianLongCoder() {}
 
   @Override
-  public void encode(Long value, OutputStream outStream, Context context)
+  public void encode(Long value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Long");
@@ -52,7 +52,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
   }
 
   @Override
-  public Long decode(InputStream inStream, Context context)
+  public Long decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return new DataInputStream(inStream).readLong();

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index 3b038af..d54accf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -42,6 +42,12 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
   private BigIntegerCoder() {}
 
   @Override
+  public void encode(BigInteger value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(BigInteger value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
@@ -49,6 +55,11 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
   }
 
   @Override
+  public BigInteger decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public BigInteger decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     return new BigInteger(BYTE_ARRAY_CODER.decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
index f49776b..8115017 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -36,6 +36,12 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
   }
 
   @Override
+  public void encode(BitSet value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(BitSet value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     if (value == null) {
@@ -45,6 +51,11 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
   }
 
   @Override
+  public BitSet decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public BitSet decode(InputStream inStream, Context context)
       throws CoderException, IOException {
     return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index c9393a1..3b38388 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -52,6 +52,12 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
   private ByteArrayCoder() {}
 
   @Override
+  public void encode(byte[] value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(byte[] value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     if (value == null) {
@@ -86,6 +92,11 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
   }
 
   @Override
+  public byte[] decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public byte[] decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     if (context.isWholeStream) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 7f449d6..2d23a64 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -41,7 +41,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
   private ByteCoder() {}
 
   @Override
-  public void encode(Byte value, OutputStream outStream, Context context)
+  public void encode(Byte value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Byte");
@@ -50,7 +50,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
   }
 
   @Override
-  public Byte decode(InputStream inStream, Context context)
+  public Byte decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       // value will be between 0-255, -1 for EOF

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
index 86077eb..f51b156 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
@@ -66,12 +66,23 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
   }
 
   @Override
+  public void encode(T value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     coder.encode(applyAndWrapExceptions(toFn, value), outStream, context);
   }
 
   @Override
+  public T decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException {
     return applyAndWrapExceptions(fromFn, coder.decode(inStream, context));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 8eff6ba..deb18f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -43,7 +43,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
   private DoubleCoder() {}
 
   @Override
-  public void encode(Double value, OutputStream outStream, Context context)
+  public void encode(Double value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Double");
@@ -52,7 +52,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
   }
 
   @Override
-  public Double decode(InputStream inStream, Context context)
+  public Double decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return new DataInputStream(inStream).readDouble();

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index b7db305..90de26f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -54,7 +54,7 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
   }
 
   @Override
-  public void encode(ReadableDuration value, OutputStream outStream, Context context)
+  public void encode(ReadableDuration value, OutputStream outStream)
       throws CoderException, IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null ReadableDuration");
@@ -63,7 +63,7 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
   }
 
   @Override
-  public ReadableDuration decode(InputStream inStream, Context context)
+  public ReadableDuration decode(InputStream inStream)
       throws CoderException, IOException {
       return fromLong(LONG_CODER.decode(inStream));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 22b11a3..648493e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -68,7 +68,7 @@ public class InstantCoder extends AtomicCoder<Instant> {
   }
 
   @Override
-  public void encode(Instant value, OutputStream outStream, Context context)
+  public void encode(Instant value, OutputStream outStream)
       throws CoderException, IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null Instant");
@@ -77,7 +77,7 @@ public class InstantCoder extends AtomicCoder<Instant> {
   }
 
   @Override
-  public Instant decode(InputStream inStream, Context context)
+  public Instant decode(InputStream inStream)
       throws CoderException, IOException {
     return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 59d5424..248c26c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -84,12 +84,11 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
 
   @Override
   public void encode(
-      IterableT iterable, OutputStream outStream, Context context)
+      IterableT iterable, OutputStream outStream)
       throws IOException, CoderException  {
     if (iterable == null) {
       throw new CoderException("cannot encode a null " + iterableName);
     }
-    Context nestedContext = context.nested();
     DataOutputStream dataOutStream = new DataOutputStream(outStream);
     if (iterable instanceof Collection) {
       // We can know the size of the Iterable.  Use an encoding with a
@@ -117,9 +116,8 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
   }
 
   @Override
-  public IterableT decode(InputStream inStream, Context context)
+  public IterableT decode(InputStream inStream)
       throws IOException, CoderException {
-    Context nestedContext = context.nested();
     DataInputStream dataInStream = new DataInputStream(inStream);
     int size = dataInStream.readInt();
     if (size >= 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 0bb53ec..9c01886 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -58,6 +58,12 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
   }
 
   @Override
+  public void encode(KV<K, V> kv, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(kv, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(KV<K, V> kv, OutputStream outStream, Context context)
       throws IOException, CoderException  {
     if (kv == null) {
@@ -68,6 +74,11 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
   }
 
   @Override
+  public KV<K, V> decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public KV<K, V> decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     K key = keyCoder.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 7dd2a32..b24f66d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -53,7 +53,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context)
+  public void encode(T value, OutputStream outStream)
       throws CoderException, IOException {
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     valueCoder.encode(value, bos, Context.OUTER);
@@ -62,7 +62,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
   }
 
   @Override
-  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
+  public T decode(InputStream inStream) throws CoderException, IOException {
     long size = VarInt.decodeLong(inStream);
     return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index f20eb93..d8b3f1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -69,6 +69,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
   }
 
   @Override
+  public void encode(Map<K, V> map, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(map, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(
       Map<K, V> map,
       OutputStream outStream,
@@ -100,6 +106,11 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
   }
 
   @Override
+  public Map<K, V> decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public Map<K, V> decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     DataInputStream dataInStream = new DataInputStream(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index e46591e..64229e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -61,6 +61,12 @@ public class NullableCoder<T> extends StructuredCoder<T> {
   }
 
   @Override
+  public void encode(@Nullable T value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(@Nullable T value, OutputStream outStream, Context context)
       throws IOException, CoderException  {
     if (value == null) {
@@ -72,6 +78,11 @@ public class NullableCoder<T> extends StructuredCoder<T> {
   }
 
   @Override
+  public T decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   @Nullable
   public T decode(InputStream inStream, Context context) throws IOException, CoderException {
     int b = inStream.read();

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index e3b2959..9aa8493 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -119,7 +119,7 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context)
+  public void encode(T value, OutputStream outStream)
       throws IOException, CoderException {
     try {
       ObjectOutputStream oos = new ObjectOutputStream(outStream);
@@ -131,7 +131,7 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
   }
 
   @Override
-  public T decode(InputStream inStream, Context context)
+  public T decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       ObjectInputStream ois = new ObjectInputStream(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index 1f4538f..2161291 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -100,12 +100,23 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> {
   }
 
   @Override
+  public void encode(T value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     delegateCoder.encode(value, outStream, context);
   }
 
   @Override
+  public T decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException {
     return delegateCoder.decode(inStream, context);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index 44856e8..3bbc983 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -67,6 +67,12 @@ public class StringUtf8Coder extends AtomicCoder<String> {
   private StringUtf8Coder() {}
 
   @Override
+  public void encode(String value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(String value, OutputStream outStream, Context context)
       throws IOException {
     if (value == null) {
@@ -85,6 +91,11 @@ public class StringUtf8Coder extends AtomicCoder<String> {
   }
 
   @Override
+  public String decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public String decode(InputStream inStream, Context context)
       throws IOException {
     if (context.isWholeStream) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 718811c..6078fa3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -39,6 +39,12 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
   protected TextualIntegerCoder() {}
 
   @Override
+  public void encode(Integer value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(Integer value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     if (value == null) {
@@ -49,6 +55,11 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
   }
 
   @Override
+  public Integer decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public Integer decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     String textualValue = StringUtf8Coder.of().decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index bda66bb..3a9abe7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -44,7 +44,7 @@ public class VarIntCoder extends AtomicCoder<Integer> {
   private VarIntCoder() {}
 
   @Override
-  public void encode(Integer value, OutputStream outStream, Context context)
+  public void encode(Integer value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Integer");
@@ -53,7 +53,7 @@ public class VarIntCoder extends AtomicCoder<Integer> {
   }
 
   @Override
-  public Integer decode(InputStream inStream, Context context)
+  public Integer decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return VarInt.decodeInt(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index bf651c3..37ad8f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -45,7 +45,7 @@ public class VarLongCoder extends StructuredCoder<Long> {
   private VarLongCoder() {}
 
   @Override
-  public void encode(Long value, OutputStream outStream, Context context)
+  public void encode(Long value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Long");
@@ -54,7 +54,7 @@ public class VarLongCoder extends StructuredCoder<Long> {
   }
 
   @Override
-  public Long decode(InputStream inStream, Context context)
+  public Long decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return VarInt.decodeLong(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index 4467faa..3e1ff7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -38,12 +38,12 @@ public class VoidCoder extends AtomicCoder<Void> {
   private VoidCoder() {}
 
   @Override
-  public void encode(Void value, OutputStream outStream, Context context) {
+  public void encode(Void value, OutputStream outStream) {
     // Nothing to write!
   }
 
   @Override
-  public Void decode(InputStream inStream, Context context) {
+  public Void decode(InputStream inStream) {
     // Nothing to read!
     return null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d8a98cd..3620c22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -947,6 +947,12 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     @Override
+    public void encode(FileResult value, OutputStream outStream)
+        throws IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(FileResult value, OutputStream outStream, Context context)
         throws IOException {
       if (value == null) {
@@ -961,6 +967,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     @Override
+    public FileResult decode(InputStream inStream) throws IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public FileResult decode(InputStream inStream, Context context)
         throws IOException {
       String filename = stringCoder.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 9b9d3f8..d12d193 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -679,7 +679,7 @@ public class ApproximateQuantiles {
 
     @Override
     public void encode(
-        QuantileState<T, ComparatorT> state, OutputStream outStream, Coder.Context context)
+        QuantileState<T, ComparatorT> state, OutputStream outStream)
         throws CoderException, IOException {
       intCoder.encode(state.numQuantiles, outStream);
       intCoder.encode(state.bufferSize, outStream);
@@ -695,7 +695,7 @@ public class ApproximateQuantiles {
     }
 
     @Override
-    public QuantileState<T, ComparatorT> decode(InputStream inStream, Coder.Context context)
+    public QuantileState<T, ComparatorT> decode(InputStream inStream)
         throws CoderException, IOException {
       int numQuantiles = intCoder.decode(inStream);
       int bufferSize = intCoder.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index b9cdbd5..7e43564 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -532,6 +532,12 @@ public class Combine {
     }
 
     @Override
+    public void encode(Holder<V> accumulator, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(accumulator, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(Holder<V> accumulator, OutputStream outStream, Context context)
         throws CoderException, IOException {
       if (accumulator.present) {
@@ -543,6 +549,11 @@ public class Combine {
     }
 
     @Override
+    public Holder<V> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public Holder<V> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       if (inStream.read() == 1) {
@@ -1971,6 +1982,12 @@ public class Combine {
         }
 
         @Override
+        public void encode(InputOrAccum<InputT, AccumT> value, OutputStream outStream)
+            throws CoderException, IOException {
+          encode(value, outStream, Coder.Context.NESTED);
+        }
+
+        @Override
         public void encode(
             InputOrAccum<InputT, AccumT> value, OutputStream outStream, Coder.Context context)
             throws CoderException, IOException {
@@ -1984,6 +2001,11 @@ public class Combine {
         }
 
         @Override
+        public InputOrAccum<InputT, AccumT> decode(InputStream inStream) throws CoderException, IOException {
+          return decode(inStream, Coder.Context.NESTED);
+        }
+
+        @Override
         public InputOrAccum<InputT, AccumT> decode(InputStream inStream, Coder.Context context)
             throws CoderException, IOException {
           if (inStream.read() == 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index c45df04..c619783 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -534,6 +534,12 @@ public class CombineFns {
     }
 
     @Override
+    public void encode(Object[] value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(Object[] value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       checkArgument(value.length == codersCount);
@@ -541,7 +547,6 @@ public class CombineFns {
         return;
       }
       int lastIndex = codersCount - 1;
-      Context nestedContext = context.nested();
       for (int i = 0; i < lastIndex; ++i) {
         coders.get(i).encode(value[i], outStream);
       }
@@ -549,6 +554,11 @@ public class CombineFns {
     }
 
     @Override
+    public Object[] decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public Object[] decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       Object[] ret = new Object[codersCount];
@@ -556,7 +566,6 @@ public class CombineFns {
         return ret;
       }
       int lastIndex = codersCount - 1;
-      Context nestedContext = context.nested();
       for (int i = 0; i < lastIndex; ++i) {
         ret[i] = coders.get(i).decode(inStream);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 497d62b..b405dd1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -169,13 +169,13 @@ public class Count {
                                              Coder<T> inputCoder) {
       return new AtomicCoder<long[]>() {
         @Override
-        public void encode(long[] value, OutputStream outStream, Context context)
+        public void encode(long[] value, OutputStream outStream)
             throws IOException {
           VarInt.encode(value[0], outStream);
         }
 
         @Override
-        public long[] decode(InputStream inStream, Context context)
+        public long[] decode(InputStream inStream)
             throws IOException, CoderException {
           try {
             return new long[] {VarInt.decodeLong(inStream)};

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index c8e0d95..8932b03 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -185,14 +185,14 @@ public class Mean {
      private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
 
      @Override
-     public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context)
+     public void encode(CountSum<NumT> value, OutputStream outStream)
          throws CoderException, IOException {
        LONG_CODER.encode(value.count, outStream);
        DOUBLE_CODER.encode(value.sum, outStream);
      }
 
      @Override
-     public CountSum<NumT> decode(InputStream inStream, Coder.Context context)
+     public CountSum<NumT> decode(InputStream inStream)
          throws CoderException, IOException {
        return new CountSum<>(
            LONG_CODER.decode(inStream),

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 7aec667..dd8bc5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -539,13 +539,13 @@ public class Top {
 
     @Override
     public void encode(
-        BoundedHeap<T, ComparatorT> value, OutputStream outStream, Context context)
+        BoundedHeap<T, ComparatorT> value, OutputStream outStream)
         throws CoderException, IOException {
       listCoder.encode(value.asList(), outStream);
     }
 
     @Override
-    public BoundedHeap<T, ComparatorT> decode(InputStream inStream, Coder.Context context)
+    public BoundedHeap<T, ComparatorT> decode(InputStream inStream)
         throws CoderException, IOException {
       return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 6603325..d42de82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -242,8 +242,7 @@ public class CoGbkResult {
     @SuppressWarnings("unchecked")
     public void encode(
         CoGbkResult value,
-        OutputStream outStream,
-        Context context) throws CoderException,
+        OutputStream outStream) throws CoderException,
         IOException {
       if (!schema.equals(value.getSchema())) {
         throw new CoderException("input schema does not match coder schema");
@@ -258,8 +257,7 @@ public class CoGbkResult {
 
     @Override
     public CoGbkResult decode(
-        InputStream inStream,
-        Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       if (schema.size() == 0) {
         return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of());

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index 3194a37..66959d3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -54,6 +54,12 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
     return index;
   }
 
+  @Override
+  public void encode(RawUnionValue union, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(union, outStream, Context.NESTED);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void encode(
@@ -74,6 +80,11 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
   }
 
   @Override
+  public RawUnionValue decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public RawUnionValue decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     int index = VarInt.decodeInt(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 0bfb875..078cbee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -67,10 +67,10 @@ public class GlobalWindow extends BoundedWindow {
     public static final Coder INSTANCE = new Coder();
 
     @Override
-    public void encode(GlobalWindow window, OutputStream outStream, Context context) {}
+    public void encode(GlobalWindow window, OutputStream outStream) {}
 
     @Override
-    public GlobalWindow decode(InputStream inStream, Context context) {
+    public GlobalWindow decode(InputStream inStream) {
       return GlobalWindow.INSTANCE;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index 318dc4c..f25a208 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -179,14 +179,14 @@ public class IntervalWindow extends BoundedWindow
     }
 
     @Override
-    public void encode(IntervalWindow window, OutputStream outStream, Context context)
+    public void encode(IntervalWindow window, OutputStream outStream)
         throws IOException, CoderException {
       instantCoder.encode(window.end, outStream);
       durationCoder.encode(new Duration(window.start, window.end), outStream);
     }
 
     @Override
-    public IntervalWindow decode(InputStream inStream, Context context)
+    public IntervalWindow decode(InputStream inStream)
         throws IOException, CoderException {
       Instant end = instantCoder.decode(inStream);
       ReadableDuration duration = durationCoder.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index 79ce2f5..75df220 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -347,7 +347,7 @@ public final class PaneInfo {
     private PaneInfoCoder() {}
 
     @Override
-    public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context)
+    public void encode(PaneInfo value, final OutputStream outStream)
         throws CoderException, IOException {
       Encoding encoding = chooseEncoding(value);
       switch (chooseEncoding(value)) {
@@ -369,7 +369,7 @@ public final class PaneInfo {
     }
 
     @Override
-    public PaneInfo decode(final InputStream inStream, Coder.Context context)
+    public PaneInfo decode(final InputStream inStream)
         throws CoderException, IOException {
       byte keyAndTag = (byte) inStream.read();
       PaneInfo base = BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F));

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index a0896f5..b202065 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -43,12 +43,23 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
   }
 
   @Override
+  public void encode(BitSet value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(BitSet value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
   }
 
   @Override
+  public BitSet decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public BitSet decode(InputStream inStream, Context context)
       throws CoderException, IOException {
     return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index e3e61cf..963886b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -636,11 +636,16 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
+    public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(windowedElem, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(WindowedValue<T> windowedElem,
                        OutputStream outStream,
                        Context context)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
       InstantCoder.of().encode(
           windowedElem.getTimestamp(), outStream, nestedContext);
       windowsCoder.encode(windowedElem.getWindows(), outStream);
@@ -649,9 +654,13 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
+    public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public WindowedValue<T> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
       Instant timestamp = InstantCoder.of().decode(inStream);
       Collection<? extends BoundedWindow> windows =
           windowsCoder.decode(inStream);
@@ -710,12 +719,23 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
+    public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(windowedElem, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context)
         throws CoderException, IOException {
       valueCoder.encode(windowedElem.getValue(), outStream, context);
     }
 
     @Override
+    public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public WindowedValue<T> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       T value = valueCoder.decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index 95a3152..a4c8b3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -101,8 +101,7 @@ public class TimestampedValue<V> {
 
     @Override
     public void encode(TimestampedValue<T> windowedElem,
-                       OutputStream outStream,
-                       Context context)
+                       OutputStream outStream)
         throws IOException {
       valueCoder.encode(windowedElem.getValue(), outStream);
       InstantCoder.of().encode(
@@ -110,7 +109,7 @@ public class TimestampedValue<V> {
     }
 
     @Override
-    public TimestampedValue<T> decode(InputStream inStream, Context context)
+    public TimestampedValue<T> decode(InputStream inStream)
         throws IOException {
       T value = valueCoder.decode(inStream);
       Instant timestamp = InstantCoder.of().decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index e8a2dfd..24c3c38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -75,9 +75,14 @@ public abstract class ValueInSingleWindow<T> {
     }
 
     @Override
+    public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream)
+        throws IOException {
+      encode(windowedElem, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context)
         throws IOException {
-      Context nestedContext = context.nested();
       InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
       windowCoder.encode(windowedElem.getWindow(), outStream);
       PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream);
@@ -85,8 +90,12 @@ public abstract class ValueInSingleWindow<T> {
     }
 
     @Override
+    public ValueInSingleWindow<T> decode(InputStream inStream) throws IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException {
-      Context nestedContext = context.nested();
       Instant timestamp = InstantCoder.of().decode(inStream);
       BoundedWindow window = windowCoder.decode(inStream);
       PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
index f06317b..96a5f1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -101,6 +101,12 @@ public class ValueWithRecordId<ValueT> {
     }
 
     @Override
+    public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream)
+        throws IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context)
         throws IOException {
       valueCoder.encode(value.value, outStream);
@@ -108,6 +114,11 @@ public class ValueWithRecordId<ValueT> {
     }
 
     @Override
+    public ValueWithRecordId<ValueT> decode(InputStream inStream) throws IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context)
         throws IOException {
       return new ValueWithRecordId<ValueT>(

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 7ca7fb9..c883ca0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -81,12 +81,12 @@ public class CoderRegistryTest {
   @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
   private class MyListCoder extends AtomicCoder<List> {
     @Override
-    public void encode(List value, OutputStream outStream, Context context)
+    public void encode(List value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public List decode(InputStream inStream, Context context)
+    public List decode(InputStream inStream)
         throws CoderException, IOException {
       return Collections.emptyList();
     }
@@ -375,12 +375,12 @@ public class CoderRegistryTest {
     }
 
     @Override
-    public void encode(MyValue value, OutputStream outStream, Context context)
+    public void encode(MyValue value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public MyValue decode(InputStream inStream, Context context)
+    public MyValue decode(InputStream inStream)
         throws CoderException, IOException {
       return new MyValue();
     }


Mime
View raw message