beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [3/6] beam git commit: [BEAM-2166] Use contextless encode/decode by default.
Date Tue, 09 May 2017 17:47:14 GMT
[BEAM-2166] Use contextless encode/decode by default.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09d1affd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09d1affd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09d1affd

Branch: refs/heads/release-2.0.0
Commit: 09d1affd5d64cc3693eddc8e54c5e0c7f90069b2
Parents: f476d8a
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Fri May 5 16:20:37 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue May 9 10:46:22 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/ApexStreamTuple.java | 11 +++++
 .../UnboundedReadFromBoundedSource.java         | 12 ++---
 .../runners/core/construction/CodersTest.java   |  4 +-
 .../core/construction/PCollectionsTest.java     | 12 ++---
 .../core/ElementAndRestrictionCoder.java        | 12 ++---
 .../beam/runners/core/KeyedWorkItemCoder.java   | 18 +++----
 .../beam/runners/core/TimerInternals.java       | 22 ++++-----
 .../direct/CloningBundleFactoryTest.java        | 20 +++-----
 .../beam/runners/direct/DirectRunnerTest.java   |  4 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |  5 +-
 .../translation/types/CoderTypeSerializer.java  |  4 +-
 .../streaming/SingletonKeyedWorkItemCoder.java  | 16 +++++-
 .../state/FlinkKeyGroupStateInternals.java      |  9 ++--
 .../runners/dataflow/BatchViewOverrides.java    | 16 +++---
 .../runners/dataflow/internal/IsmFormat.java    | 51 ++++++++++----------
 .../runners/dataflow/util/RandomAccessData.java | 11 +++++
 .../runners/dataflow/util/CloudObjectsTest.java |  8 +--
 .../spark/aggregators/NamedAggregators.java     |  4 +-
 .../beam/sdk/annotations/Experimental.java      |  3 ++
 .../org/apache/beam/sdk/coders/AvroCoder.java   |  4 +-
 .../apache/beam/sdk/coders/BigDecimalCoder.java | 15 +++++-
 .../beam/sdk/coders/BigEndianIntegerCoder.java  |  4 +-
 .../beam/sdk/coders/BigEndianLongCoder.java     |  4 +-
 .../apache/beam/sdk/coders/BigIntegerCoder.java | 11 +++++
 .../org/apache/beam/sdk/coders/BitSetCoder.java | 11 +++++
 .../apache/beam/sdk/coders/ByteArrayCoder.java  | 11 +++++
 .../org/apache/beam/sdk/coders/ByteCoder.java   |  4 +-
 .../java/org/apache/beam/sdk/coders/Coder.java  | 38 +++++----------
 .../org/apache/beam/sdk/coders/CustomCoder.java | 47 ------------------
 .../apache/beam/sdk/coders/DelegateCoder.java   | 11 +++++
 .../org/apache/beam/sdk/coders/DoubleCoder.java |  4 +-
 .../apache/beam/sdk/coders/DurationCoder.java   |  8 +--
 .../apache/beam/sdk/coders/InstantCoder.java    |  8 +--
 .../beam/sdk/coders/IterableLikeCoder.java      | 14 +++---
 .../org/apache/beam/sdk/coders/KvCoder.java     | 15 +++++-
 .../beam/sdk/coders/LengthPrefixCoder.java      |  4 +-
 .../org/apache/beam/sdk/coders/MapCoder.java    | 23 ++++++---
 .../apache/beam/sdk/coders/NullableCoder.java   | 11 +++++
 .../beam/sdk/coders/SerializableCoder.java      |  4 +-
 .../beam/sdk/coders/StringDelegateCoder.java    | 11 +++++
 .../apache/beam/sdk/coders/StringUtf8Coder.java | 11 +++++
 .../apache/beam/sdk/coders/StructuredCoder.java | 47 ------------------
 .../beam/sdk/coders/TextualIntegerCoder.java    | 11 +++++
 .../org/apache/beam/sdk/coders/VarIntCoder.java |  4 +-
 .../apache/beam/sdk/coders/VarLongCoder.java    |  4 +-
 .../org/apache/beam/sdk/coders/VoidCoder.java   |  4 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 15 +++---
 .../sdk/transforms/ApproximateQuantiles.java    | 44 ++++++++---------
 .../org/apache/beam/sdk/transforms/Combine.java | 23 +++++++++
 .../apache/beam/sdk/transforms/CombineFns.java  | 17 +++++--
 .../org/apache/beam/sdk/transforms/Count.java   |  4 +-
 .../org/apache/beam/sdk/transforms/Mean.java    | 12 ++---
 .../org/apache/beam/sdk/transforms/Top.java     |  8 +--
 .../beam/sdk/transforms/join/CoGbkResult.java   | 18 +++----
 .../beam/sdk/transforms/join/UnionCoder.java    | 11 +++++
 .../sdk/transforms/windowing/GlobalWindow.java  |  4 +-
 .../transforms/windowing/IntervalWindow.java    | 12 ++---
 .../beam/sdk/transforms/windowing/PaneInfo.java |  5 +-
 .../org/apache/beam/sdk/util/BitSetCoder.java   | 11 +++++
 .../org/apache/beam/sdk/util/WindowedValue.java | 37 ++++++++++----
 .../beam/sdk/values/TimestampedValue.java       | 13 +++--
 .../beam/sdk/values/ValueInSingleWindow.java    | 25 +++++++---
 .../beam/sdk/values/ValueWithRecordId.java      | 15 +++++-
 .../beam/sdk/coders/CoderRegistryTest.java      | 16 ++++--
 .../apache/beam/sdk/coders/CustomCoderTest.java |  4 +-
 .../beam/sdk/coders/NullableCoderTest.java      | 11 +++++
 .../beam/sdk/coders/SerializableCoderTest.java  | 28 +++++------
 .../beam/sdk/coders/StructuredCoderTest.java    | 12 ++---
 .../beam/sdk/testing/CoderPropertiesTest.java   | 36 +++++++-------
 .../apache/beam/sdk/testing/PAssertTest.java    |  4 +-
 .../sdk/testing/SerializableMatchersTest.java   |  5 +-
 .../beam/sdk/testing/WindowSupplierTest.java    |  4 +-
 .../beam/sdk/transforms/CombineFnsTest.java     | 11 +++++
 .../apache/beam/sdk/transforms/CombineTest.java | 25 +++++++---
 .../apache/beam/sdk/transforms/CreateTest.java  | 13 +++--
 .../beam/sdk/transforms/GroupByKeyTest.java     |  4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 12 ++---
 .../apache/beam/sdk/transforms/ViewTest.java    | 11 +++++
 .../transforms/reflect/DoFnInvokersTest.java    |  8 +--
 .../transforms/windowing/GlobalWindowTest.java  |  2 +-
 ...BufferedElementCountingOutputStreamTest.java |  5 +-
 .../apache/beam/sdk/util/CoderUtilsTest.java    |  4 +-
 .../beam/sdk/util/SerializableUtilsTest.java    |  4 +-
 .../extensions/protobuf/ByteStringCoder.java    | 11 +++++
 .../sdk/extensions/protobuf/ProtoCoder.java     | 11 +++++
 .../BeamFnDataBufferingOutboundObserver.java    |  3 +-
 .../harness/data/BeamFnDataInboundObserver.java |  3 +-
 ...BeamFnDataBufferingOutboundObserverTest.java |  3 +-
 .../data/BeamFnDataInboundObserverTest.java     |  3 +-
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    | 11 +++--
 .../io/gcp/bigquery/TableDestinationCoder.java  | 12 ++---
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  | 15 +++++-
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  | 11 +++++
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 16 +++---
 .../pubsub/PubsubMessagePayloadOnlyCoder.java   | 11 +++++
 .../PubsubMessageWithAttributesCoder.java       | 15 +++++-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  | 20 ++++----
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 13 +++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 11 +++++
 .../beam/sdk/io/hadoop/WritableCoder.java       |  4 +-
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |  6 +--
 .../beam/sdk/io/hbase/HBaseResultCoder.java     |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  4 +-
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     | 11 +++++
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java | 38 +++++++--------
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   | 25 ++++++----
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   | 21 +++++---
 107 files changed, 809 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
index 4aa6ee8..1d402eb 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -162,6 +162,12 @@ public interface ApexStreamTuple<T> {
     }
 
     @Override
+    public void encode(ApexStreamTuple<T> value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       if (value instanceof WatermarkTuple) {
@@ -175,6 +181,11 @@ public interface ApexStreamTuple<T> {
     }
 
     @Override
+    public ApexStreamTuple<T> decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public ApexStreamTuple<T> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       int b = inStream.read();

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 1424b8b..24eb384 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -221,19 +221,19 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
       }
 
       @Override
-      public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+      public void encode(Checkpoint<T> value, OutputStream outStream)
           throws CoderException, IOException {
-        elemsCoder.encode(value.residualElements, outStream, context.nested());
-        sourceCoder.encode(value.residualSource, outStream, context);
+        elemsCoder.encode(value.residualElements, outStream);
+        sourceCoder.encode(value.residualSource, outStream);
       }
 
       @SuppressWarnings("unchecked")
       @Override
-      public Checkpoint<T> decode(InputStream inStream, Context context)
+      public Checkpoint<T> decode(InputStream inStream)
           throws CoderException, IOException {
         return new Checkpoint<>(
-            elemsCoder.decode(inStream, context.nested()),
-            sourceCoder.decode(inStream, context));
+            elemsCoder.decode(inStream),
+            sourceCoder.decode(inStream));
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index 765723c..42fba7c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -151,11 +151,11 @@ public class CodersTest {
 
     private static class RecordCoder extends AtomicCoder<Record> {
       @Override
-      public void encode(Record value, OutputStream outStream, Context context)
+      public void encode(Record value, OutputStream outStream)
           throws CoderException, IOException {}
 
       @Override
-      public Record decode(InputStream inStream, Context context)
+      public Record decode(InputStream inStream)
           throws CoderException, IOException {
         return new Record();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
index 2c45cbd..c38dbc0 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
@@ -130,13 +130,13 @@ public class PCollectionsTest {
   @AutoValue
   abstract static class CustomIntCoder extends CustomCoder<Integer> {
     @Override
-    public void encode(Integer value, OutputStream outStream, Context context) throws IOException {
-      VarInt.encode(value, outStream);
+    public Integer decode(InputStream inStream) throws IOException {
+      return VarInt.decodeInt(inStream);
     }
 
     @Override
-    public Integer decode(InputStream inStream, Context context) throws IOException {
-      return VarInt.decodeInt(inStream);
+    public void encode(Integer value, OutputStream outStream) throws IOException {
+      VarInt.encode(value, outStream);
     }
   }
 
@@ -163,13 +163,13 @@ public class PCollectionsTest {
         @Override public void verifyDeterministic() {}
 
         @Override
-        public void encode(BoundedWindow value, OutputStream outStream, Context context)
+        public void encode(BoundedWindow value, OutputStream outStream)
             throws IOException {
           VarInt.encode(value.maxTimestamp().getMillis(), outStream);
         }
 
         @Override
-        public BoundedWindow decode(InputStream inStream, Context context) throws IOException {
+        public BoundedWindow decode(InputStream inStream) throws IOException {
           final Instant ts = new Instant(VarInt.decodeLong(inStream));
           return new BoundedWindow() {
             @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 83c4e62..4440b85 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -50,20 +50,20 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
 
   @Override
   public void encode(
-      ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream, Context context)
+      ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream)
       throws IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null ElementAndRestriction");
     }
-    elementCoder.encode(value.element(), outStream, context.nested());
-    restrictionCoder.encode(value.restriction(), outStream, context);
+    elementCoder.encode(value.element(), outStream);
+    restrictionCoder.encode(value.restriction(), outStream);
   }
 
   @Override
-  public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context)
+  public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream)
       throws IOException {
-    ElementT key = elementCoder.decode(inStream, context.nested());
-    RestrictionT value = restrictionCoder.decode(inStream, context);
+    ElementT key = elementCoder.decode(inStream);
+    RestrictionT value = restrictionCoder.decode(inStream);
     return ElementAndRestriction.of(key, value);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index e1872b5..b1cb1a6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -69,21 +69,19 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<
   }
 
   @Override
-  public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
+  public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream)
       throws CoderException, IOException {
-    Coder.Context nestedContext = context.nested();
-    keyCoder.encode(value.key(), outStream, nestedContext);
-    timersCoder.encode(value.timersIterable(), outStream, nestedContext);
-    elemsCoder.encode(value.elementsIterable(), outStream, context);
+    keyCoder.encode(value.key(), outStream);
+    timersCoder.encode(value.timersIterable(), outStream);
+    elemsCoder.encode(value.elementsIterable(), outStream);
   }
 
   @Override
-  public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context)
+  public KeyedWorkItem<K, ElemT> decode(InputStream inStream)
       throws CoderException, IOException {
-    Coder.Context nestedContext = context.nested();
-    K key = keyCoder.decode(inStream, nestedContext);
-    Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
-    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context);
+    K key = keyCoder.decode(inStream);
+    Iterable<TimerData> timers = timersCoder.decode(inStream);
+    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream);
     return KeyedWorkItems.workItem(key, timers, elems);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 888c11f..f4a12d0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -238,24 +238,22 @@ public interface TimerInternals {
     }
 
     @Override
-    public void encode(TimerData timer, OutputStream outStream, Context context)
+    public void encode(TimerData timer, OutputStream outStream)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
-      STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext);
-      INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
-      STRING_CODER.encode(timer.getDomain().name(), outStream, context);
+      STRING_CODER.encode(timer.getTimerId(), outStream);
+      STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
+      INSTANT_CODER.encode(timer.getTimestamp(), outStream);
+      STRING_CODER.encode(timer.getDomain().name(), outStream);
     }
 
     @Override
-    public TimerData decode(InputStream inStream, Context context)
+    public TimerData decode(InputStream inStream)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      String timerId = STRING_CODER.decode(inStream, nestedContext);
+      String timerId = STRING_CODER.decode(inStream);
       StateNamespace namespace =
-          StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder);
-      Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
-      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context));
+          StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
+      Instant timestamp = INSTANT_CODER.decode(inStream);
+      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
       return TimerData.of(timerId, namespace, timestamp, domain);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 33d171e..5bc8077 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -178,15 +178,14 @@ public class CloningBundleFactoryTest {
     @Override
     public void encode(
         Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
+        OutputStream outStream)
         throws IOException {
       throw new CoderException("Encode not allowed");
     }
 
     @Override
     public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws IOException {
       return null;
     }
@@ -196,13 +195,12 @@ public class CloningBundleFactoryTest {
     @Override
     public void encode(
         Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
+        OutputStream outStream)
         throws IOException {}
 
     @Override
     public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws IOException {
       throw new CoderException("Decode not allowed");
     }
@@ -212,13 +210,12 @@ public class CloningBundleFactoryTest {
     @Override
     public void encode(
         Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
+        OutputStream outStream)
         throws CoderException, IOException {}
 
     @Override
     public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       return new Record() {
         @Override
@@ -244,13 +241,12 @@ public class CloningBundleFactoryTest {
     @Override
     public void encode(
         Record value,
-        OutputStream outStream,
-        org.apache.beam.sdk.coders.Coder.Context context)
+        OutputStream outStream)
         throws CoderException, IOException {}
 
     @Override
     public Record decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       return new Record() {
         @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 85e55eb..943d27c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -526,11 +526,11 @@ public class DirectRunnerTest implements Serializable {
   private static class LongNoDecodeCoder extends AtomicCoder<Long> {
     @Override
     public void encode(
-        Long value, OutputStream outStream, Context context) throws IOException {
+        Long value, OutputStream outStream) throws IOException {
     }
 
     @Override
-    public Long decode(InputStream inStream, Context context) throws IOException {
+    public Long decode(InputStream inStream) throws IOException {
       throw new CoderException("Cannot decode a long");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index b9ba7f4..2a01db5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -590,15 +590,14 @@ public class UnboundedReadEvaluatorFactoryTest {
       @Override
       public void encode(
           TestCheckpointMark value,
-          OutputStream outStream,
-          org.apache.beam.sdk.coders.Coder.Context context)
+          OutputStream outStream)
           throws IOException {
         VarInt.encode(value.index, outStream);
       }
 
       @Override
       public TestCheckpointMark decode(
-          InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+          InputStream inStream)
           throws IOException {
         TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream));
         decoded.decoded = true;

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index e210ed9..e003119 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -77,14 +77,14 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> {
   @Override
   public void serialize(T t, DataOutputView dataOutputView) throws IOException {
     DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
-    coder.encode(t, outputWrapper, Coder.Context.NESTED);
+    coder.encode(t, outputWrapper);
   }
 
   @Override
   public T deserialize(DataInputView dataInputView) throws IOException {
     try {
       DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
-      return coder.decode(inputWrapper, Coder.Context.NESTED);
+      return coder.decode(inputWrapper);
     } catch (CoderException e) {
       Throwable cause = e.getCause();
       if (cause instanceof EOFException) {

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index f218693..2ed2055 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -66,18 +66,30 @@ public class SingletonKeyedWorkItemCoder<K, ElemT>
   }
 
   @Override
+  public void encode(SingletonKeyedWorkItem<K, ElemT> value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(SingletonKeyedWorkItem<K, ElemT> value,
                      OutputStream outStream,
                      Context context)
       throws CoderException, IOException {
-    keyCoder.encode(value.key(), outStream, context.nested());
+    keyCoder.encode(value.key(), outStream);
     valueCoder.encode(value.value, outStream, context);
   }
 
   @Override
+  public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream)
+      throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
       throws CoderException, IOException {
-    K key = keyCoder.decode(inStream, context.nested());
+    K key = keyCoder.decode(inStream);
     WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
     return new SingletonKeyedWorkItem<>(key, value);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index d6af4f9..512e4ef 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -31,7 +31,6 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -430,8 +429,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals {
       Map<String, ?> map = entry.getValue().f1;
       out.writeInt(map.size());
       for (Map.Entry<String, ?> entry1 : map.entrySet()) {
-        StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED);
-        coder.encode(entry1.getValue(), out, Context.NESTED);
+        StringUtf8Coder.of().encode(entry1.getKey(), out);
+        coder.encode(entry1.getValue(), out);
       }
     }
   }
@@ -463,8 +462,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals {
       Map<String, Object> map = (Map<String, Object>) tuple2.f1;
       int mapSize = in.readInt();
       for (int j = 0; j < mapSize; j++) {
-        String namespace = StringUtf8Coder.of().decode(in, Context.NESTED);
-        Object value = coder.decode(in, Context.NESTED);
+        String namespace = StringUtf8Coder.of().decode(in);
+        Object value = coder.decode(in);
         map.put(namespace, value);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ecd0365..b4a6e64 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1351,18 +1351,18 @@ class BatchViewOverrides {
     }
 
     @Override
-    public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
-        Coder.Context context) throws CoderException, IOException {
-      transformCoder.encode(value.transform, outStream, context.nested());
-      originalMapCoder.encode(value.originalMap, outStream, context);
+    public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream)
+        throws CoderException, IOException {
+      transformCoder.encode(value.transform, outStream);
+      originalMapCoder.encode(value.originalMap, outStream);
     }
 
     @Override
-    public TransformedMap<K, V1, V2> decode(
-        InputStream inStream, Coder.Context context) throws CoderException, IOException {
+    public TransformedMap<K, V1, V2> decode(InputStream inStream)
+        throws CoderException, IOException {
       return new TransformedMap<>(
-          transformCoder.decode(inStream, context.nested()),
-          originalMapCoder.decode(inStream, context));
+          transformCoder.decode(inStream),
+          originalMapCoder.decode(inStream));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 00e0c54..81ac2a2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -231,35 +231,35 @@ public class IsmFormat {
     }
 
     @Override
-    public void encode(IsmRecord<V> value, OutputStream outStream,
-        Coder.Context context) throws CoderException, IOException {
+    public void encode(IsmRecord<V> value, OutputStream outStream)
+        throws CoderException, IOException {
       if (value.getKeyComponents().size() != keyComponentCoders.size()) {
         throw new CoderException(String.format(
             "Expected %s key component(s) but received key component(s) %s.",
             keyComponentCoders.size(), value.getKeyComponents()));
       }
       for (int i = 0; i < keyComponentCoders.size(); ++i) {
-        getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested());
+        getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream);
       }
       if (isMetadataKey(value.getKeyComponents())) {
-        ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested());
+        ByteArrayCoder.of().encode(value.getMetadata(), outStream);
       } else {
-        valueCoder.encode(value.getValue(), outStream, context.nested());
+        valueCoder.encode(value.getValue(), outStream);
       }
     }
 
     @Override
-    public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
+    public IsmRecord<V> decode(InputStream inStream)
         throws CoderException, IOException {
       List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
       for (Coder<?> keyCoder : keyComponentCoders) {
-        keyComponents.add(keyCoder.decode(inStream, context.nested()));
+        keyComponents.add(keyCoder.decode(inStream));
       }
       if (isMetadataKey(keyComponents)) {
         return IsmRecord.<V>meta(
-            keyComponents, ByteArrayCoder.of().decode(inStream, context.nested()));
+            keyComponents, ByteArrayCoder.of().decode(inStream));
       } else {
-        return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested()));
+        return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream));
       }
     }
 
@@ -493,24 +493,24 @@ public class IsmFormat {
     }
 
     @Override
-    public void encode(K value, OutputStream outStream, Coder.Context context)
+    public void encode(K value, OutputStream outStream)
         throws CoderException, IOException {
       if (value == METADATA_KEY) {
         outStream.write(0);
       } else {
         outStream.write(1);
-        keyCoder.encode(value, outStream, context.nested());
+        keyCoder.encode(value, outStream);
       }
     }
 
     @Override
-    public K decode(InputStream inStream, Coder.Context context)
+    public K decode(InputStream inStream)
         throws CoderException, IOException {
       int marker = inStream.read();
       if (marker == 0) {
         return (K) getMetadataKey();
       } else if (marker == 1) {
-        return keyCoder.decode(inStream, context.nested());
+        return keyCoder.decode(inStream);
       } else {
         throw new CoderException(String.format("Expected marker but got %s.", marker));
       }
@@ -621,23 +621,22 @@ public class IsmFormat {
     private IsmShardCoder() {}
 
     @Override
-    public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
+    public void encode(IsmShard value, OutputStream outStream)
         throws CoderException, IOException {
       checkState(value.getIndexOffset() >= 0,
           "%s attempting to be written without index offset.",
           value);
-      VarIntCoder.of().encode(value.getId(), outStream, context.nested());
-      VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
-      VarLongCoder.of().encode(value.getIndexOffset(), outStream, context);
+      VarIntCoder.of().encode(value.getId(), outStream);
+      VarLongCoder.of().encode(value.getBlockOffset(), outStream);
+      VarLongCoder.of().encode(value.getIndexOffset(), outStream);
     }
 
     @Override
-    public IsmShard decode(
-        InputStream inStream, Coder.Context context) throws CoderException, IOException {
+    public IsmShard decode(InputStream inStream) throws CoderException, IOException {
       return IsmShard.of(
-          VarIntCoder.of().decode(inStream, context.nested()),
-          VarLongCoder.of().decode(inStream, context.nested()),
-          VarLongCoder.of().decode(inStream, context));
+          VarIntCoder.of().decode(inStream),
+          VarLongCoder.of().decode(inStream),
+          VarLongCoder.of().decode(inStream));
     }
 
     @Override
@@ -683,14 +682,14 @@ public class IsmFormat {
     }
 
     @Override
-    public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
+    public void encode(KeyPrefix value, OutputStream outStream)
         throws CoderException, IOException {
       VarInt.encode(value.getSharedKeySize(), outStream);
       VarInt.encode(value.getUnsharedKeySize(), outStream);
     }
 
     @Override
-    public KeyPrefix decode(InputStream inStream, Coder.Context context)
+    public KeyPrefix decode(InputStream inStream)
         throws CoderException, IOException {
       return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
     }
@@ -755,7 +754,7 @@ public class IsmFormat {
     }
 
     @Override
-    public void encode(Footer value, OutputStream outStream, Coder.Context context)
+    public void encode(Footer value, OutputStream outStream)
         throws CoderException, IOException {
       DataOutputStream dataOut = new DataOutputStream(outStream);
       dataOut.writeLong(value.getIndexPosition());
@@ -765,7 +764,7 @@ public class IsmFormat {
     }
 
     @Override
-    public Footer decode(InputStream inStream, Coder.Context context)
+    public Footer decode(InputStream inStream)
         throws CoderException, IOException {
       DataInputStream dataIn = new DataInputStream(inStream);
       Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index f36bd78..5ea9f07 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -63,6 +63,12 @@ public class RandomAccessData {
     }
 
     @Override
+    public void encode(RandomAccessData value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {
       if (value == POSITIVE_INFINITY) {
@@ -75,6 +81,11 @@ public class RandomAccessData {
     }
 
     @Override
+    public RandomAccessData decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public RandomAccessData decode(InputStream inStream, Coder.Context context)
         throws CoderException, IOException {
       RandomAccessData rval = new RandomAccessData();

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 64c0dbd..59a5431 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -171,12 +171,12 @@ public class CloudObjectsTest {
 
   private static class ObjectCoder extends CustomCoder<Object> {
     @Override
-    public void encode(Object value, OutputStream outStream, Context context)
+    public void encode(Object value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public Object decode(InputStream inStream, Context context)
+    public Object decode(InputStream inStream)
         throws CoderException, IOException {
       return new Object();
     }
@@ -197,11 +197,11 @@ public class CloudObjectsTest {
    */
   private static class ArbitraryCoder extends StructuredCoder<Record> {
     @Override
-    public void encode(Record value, OutputStream outStream, Context context)
+    public void encode(Record value, OutputStream outStream)
         throws CoderException, IOException {}
 
     @Override
-    public Record decode(InputStream inStream, Context context) throws CoderException, IOException {
+    public Record decode(InputStream inStream) throws CoderException, IOException {
       return new Record();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index c836ca5..27f2ec8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -207,7 +207,7 @@ public class NamedAggregators implements Serializable {
       oos.writeObject(inCoder);
       try {
         combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .encode(state, oos, Coder.Context.NESTED);
+            .encode(state, oos);
       } catch (CannotProvideCoderException e) {
         throw new IllegalStateException("Could not determine coder for accumulator", e);
       }
@@ -220,7 +220,7 @@ public class NamedAggregators implements Serializable {
       inCoder = (Coder<InputT>) ois.readObject();
       try {
         state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .decode(ois, Coder.Context.NESTED);
+            .decode(ois);
       } catch (CannotProvideCoderException e) {
         throw new IllegalStateException("Could not determine coder for accumulator", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 7255a01..2e3a711 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -84,6 +84,9 @@ public @interface Experimental {
     /** Metrics-related experimental APIs. */
     METRICS,
 
+    /** Experimental feature related to alternative, unnested encodings for coders. */
+    CODER_CONTEXT,
+
     /** Experimental runner APIs. Should not be used by pipeline authors. */
     CORE_RUNNERS_ONLY,
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index f82c065..bba669d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -300,7 +300,7 @@ public class AvroCoder<T> extends CustomCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
+  public void encode(T value, OutputStream outStream) throws IOException {
     // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
     BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
     // Save the potentially-new instance for reuse later.
@@ -310,7 +310,7 @@ public class AvroCoder<T> extends CustomCoder<T> {
   }
 
   @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
+  public T decode(InputStream inStream) throws IOException {
     // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
     BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
     // Save the potentially-new instance for later.

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index 97559a9..e890d11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -48,17 +48,28 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
   private BigDecimalCoder() {}
 
   @Override
+  public void encode(BigDecimal value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(BigDecimal value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
-    VAR_INT_CODER.encode(value.scale(), outStream, context.nested());
+    VAR_INT_CODER.encode(value.scale(), outStream);
     BIG_INT_CODER.encode(value.unscaledValue(), outStream, context);
   }
 
   @Override
+  public BigDecimal decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public BigDecimal decode(InputStream inStream, Context context)
       throws IOException, CoderException {
-    int scale = VAR_INT_CODER.decode(inStream, context.nested());
+    int scale = VAR_INT_CODER.decode(inStream);
     BigInteger bigInteger = BIG_INT_CODER.decode(inStream, context);
     return new BigDecimal(bigInteger, scale);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index a61f099..efb1e4b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -43,7 +43,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
   private BigEndianIntegerCoder() {}
 
   @Override
-  public void encode(Integer value, OutputStream outStream, Context context)
+  public void encode(Integer value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Integer");
@@ -52,7 +52,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
   }
 
   @Override
-  public Integer decode(InputStream inStream, Context context)
+  public Integer decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return new DataInputStream(inStream).readInt();

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 868160a..ab85e17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -43,7 +43,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
   private BigEndianLongCoder() {}
 
   @Override
-  public void encode(Long value, OutputStream outStream, Context context)
+  public void encode(Long value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Long");
@@ -52,7 +52,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
   }
 
   @Override
-  public Long decode(InputStream inStream, Context context)
+  public Long decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return new DataInputStream(inStream).readLong();

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index 3b038af..d54accf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -42,6 +42,12 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
   private BigIntegerCoder() {}
 
   @Override
+  public void encode(BigInteger value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(BigInteger value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
@@ -49,6 +55,11 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
   }
 
   @Override
+  public BigInteger decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public BigInteger decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     return new BigInteger(BYTE_ARRAY_CODER.decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
index f49776b..8115017 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -36,6 +36,12 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
   }
 
   @Override
+  public void encode(BitSet value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(BitSet value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     if (value == null) {
@@ -45,6 +51,11 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
   }
 
   @Override
+  public BitSet decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public BitSet decode(InputStream inStream, Context context)
       throws CoderException, IOException {
     return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index c9393a1..3b38388 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -52,6 +52,12 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
   private ByteArrayCoder() {}
 
   @Override
+  public void encode(byte[] value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(byte[] value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     if (value == null) {
@@ -86,6 +92,11 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
   }
 
   @Override
+  public byte[] decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public byte[] decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     if (context.isWholeStream) {

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 7f449d6..2d23a64 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -41,7 +41,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
   private ByteCoder() {}
 
   @Override
-  public void encode(Byte value, OutputStream outStream, Context context)
+  public void encode(Byte value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Byte");
@@ -50,7 +50,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
   }
 
   @Override
-  public Byte decode(InputStream inStream, Context context)
+  public Byte decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       // value will be between 0-255, -1 for EOF

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index d140e89..2ee532d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -64,6 +64,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 public abstract class Coder<T> implements Serializable {
   /** The context in which encoding or decoding is being done. */
   @Deprecated
+  @Experimental(Kind.CODER_CONTEXT)
   public static class Context {
     /**
      * The outer context: the value being encoded or decoded takes
@@ -127,18 +128,6 @@ public abstract class Coder<T> implements Serializable {
 
   /**
    * Encodes the given value of type {@code T} onto the given output stream
-   * in the outer context.
-   *
-   * @throws IOException if writing to the {@code OutputStream} fails
-   * for some reason
-   * @throws CoderException if the value could not be encoded for some reason
-   */
-  @Deprecated
-  public abstract void encodeOuter(T value, OutputStream outStream)
-      throws CoderException, IOException;
-
-  /**
-   * Encodes the given value of type {@code T} onto the given output stream
    * in the given context.
    *
    * @throws IOException if writing to the {@code OutputStream} fails
@@ -146,8 +135,11 @@ public abstract class Coder<T> implements Serializable {
    * @throws CoderException if the value could not be encoded for some reason
    */
   @Deprecated
-  public abstract void encode(T value, OutputStream outStream, Context context)
-      throws CoderException, IOException;
+  @Experimental(Kind.CODER_CONTEXT)
+  public void encode(T value, OutputStream outStream, Context context)
+      throws CoderException, IOException {
+    encode(value, outStream);
+  }
 
   /**
    * Decodes a value of type {@code T} from the given input stream in
@@ -161,17 +153,6 @@ public abstract class Coder<T> implements Serializable {
 
   /**
    * Decodes a value of type {@code T} from the given input stream in
-   * the outer context.  Returns the decoded value.
-   *
-   * @throws IOException if reading from the {@code InputStream} fails
-   * for some reason
-   * @throws CoderException if the value could not be decoded for some reason
-   */
-  @Deprecated
-  public abstract T decodeOuter(InputStream inStream) throws CoderException, IOException;
-
-  /**
-   * Decodes a value of type {@code T} from the given input stream in
    * the given context.  Returns the decoded value.
    *
    * @throws IOException if reading from the {@code InputStream} fails
@@ -179,8 +160,11 @@ public abstract class Coder<T> implements Serializable {
    * @throws CoderException if the value could not be decoded for some reason
    */
   @Deprecated
-  public abstract T decode(InputStream inStream, Context context)
-      throws CoderException, IOException;
+  @Experimental(Kind.CODER_CONTEXT)
+  public T decode(InputStream inStream, Context context)
+      throws CoderException, IOException {
+    return decode(inStream);
+  }
 
   /**
    * If this is a {@code Coder} for a parameterized type, returns the

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index edbaa7f..c581923 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
@@ -39,50 +36,6 @@ import java.util.List;
 public abstract class CustomCoder<T> extends Coder<T>
     implements Serializable {
 
-  @Override
-  public void encode(T value, OutputStream outStream)
-      throws CoderException, IOException {
-    encode(value, outStream, Coder.Context.NESTED);
-  }
-
-  @Deprecated
-  @Override
-  public void encodeOuter(T value, OutputStream outStream)
-      throws CoderException, IOException {
-    encode(value, outStream, Coder.Context.OUTER);
-  }
-
-  @Deprecated
-  public void encode(T value, OutputStream outStream, Coder.Context context)
-      throws CoderException, IOException {
-    if (context == Coder.Context.NESTED) {
-      encode(value, outStream);
-    } else {
-      encodeOuter(value, outStream);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream) throws CoderException, IOException {
-    return decode(inStream, Coder.Context.NESTED);
-  }
-
-  @Deprecated
-  @Override
-  public T decodeOuter(InputStream inStream) throws CoderException, IOException {
-    return decode(inStream, Coder.Context.OUTER);
-  }
-
-  @Deprecated
-  public T decode(InputStream inStream, Coder.Context context)
-      throws CoderException, IOException {
-    if (context == Coder.Context.NESTED) {
-      return decode(inStream);
-    } else {
-      return decodeOuter(inStream);
-    }
-  }
-
   /**
    * {@inheritDoc}.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
index 86077eb..f51b156 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
@@ -66,12 +66,23 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
   }
 
   @Override
+  public void encode(T value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     coder.encode(applyAndWrapExceptions(toFn, value), outStream, context);
   }
 
   @Override
+  public T decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException {
     return applyAndWrapExceptions(fromFn, coder.decode(inStream, context));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 8eff6ba..deb18f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -43,7 +43,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
   private DoubleCoder() {}
 
   @Override
-  public void encode(Double value, OutputStream outStream, Context context)
+  public void encode(Double value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Double");
@@ -52,7 +52,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
   }
 
   @Override
-  public Double decode(InputStream inStream, Context context)
+  public Double decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return new DataInputStream(inStream).readDouble();

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 8b4ae1d..90de26f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -54,18 +54,18 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
   }
 
   @Override
-  public void encode(ReadableDuration value, OutputStream outStream, Context context)
+  public void encode(ReadableDuration value, OutputStream outStream)
       throws CoderException, IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null ReadableDuration");
     }
-    LONG_CODER.encode(toLong(value), outStream, context);
+    LONG_CODER.encode(toLong(value), outStream);
   }
 
   @Override
-  public ReadableDuration decode(InputStream inStream, Context context)
+  public ReadableDuration decode(InputStream inStream)
       throws CoderException, IOException {
-      return fromLong(LONG_CODER.decode(inStream, context));
+      return fromLong(LONG_CODER.decode(inStream));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 000f406..648493e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -68,18 +68,18 @@ public class InstantCoder extends AtomicCoder<Instant> {
   }
 
   @Override
-  public void encode(Instant value, OutputStream outStream, Context context)
+  public void encode(Instant value, OutputStream outStream)
       throws CoderException, IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null Instant");
     }
-    LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
+    LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream);
   }
 
   @Override
-  public Instant decode(InputStream inStream, Context context)
+  public Instant decode(InputStream inStream)
       throws CoderException, IOException {
-    return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream, context));
+    return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 9994b3f..248c26c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -84,12 +84,11 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
 
   @Override
   public void encode(
-      IterableT iterable, OutputStream outStream, Context context)
+      IterableT iterable, OutputStream outStream)
       throws IOException, CoderException  {
     if (iterable == null) {
       throw new CoderException("cannot encode a null " + iterableName);
     }
-    Context nestedContext = context.nested();
     DataOutputStream dataOutStream = new DataOutputStream(outStream);
     if (iterable instanceof Collection) {
       // We can know the size of the Iterable.  Use an encoding with a
@@ -97,7 +96,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
       Collection<T> collection = (Collection<T>) iterable;
       dataOutStream.writeInt(collection.size());
       for (T elem : collection) {
-        elementCoder.encode(elem, dataOutStream, nestedContext);
+        elementCoder.encode(elem, dataOutStream);
       }
     } else {
       // We don't know the size without traversing it so use a fixed size buffer
@@ -108,7 +107,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
           new BufferedElementCountingOutputStream(dataOutStream);
       for (T elem : iterable) {
         countingOutputStream.markElementStart();
-        elementCoder.encode(elem, countingOutputStream, nestedContext);
+        elementCoder.encode(elem, countingOutputStream);
       }
       countingOutputStream.finish();
     }
@@ -117,15 +116,14 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
   }
 
   @Override
-  public IterableT decode(InputStream inStream, Context context)
+  public IterableT decode(InputStream inStream)
       throws IOException, CoderException {
-    Context nestedContext = context.nested();
     DataInputStream dataInStream = new DataInputStream(inStream);
     int size = dataInStream.readInt();
     if (size >= 0) {
       List<T> elements = new ArrayList<>(size);
       for (int i = 0; i < size; i++) {
-        elements.add(elementCoder.decode(dataInStream, nestedContext));
+        elements.add(elementCoder.decode(dataInStream));
       }
       return decodeToIterable(elements);
     }
@@ -134,7 +132,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
     // each block of elements.
     long count = VarInt.decodeLong(dataInStream);
     while (count > 0L) {
-      elements.add(elementCoder.decode(dataInStream, nestedContext));
+      elements.add(elementCoder.decode(dataInStream));
       --count;
       if (count == 0L) {
           count = VarInt.decodeLong(dataInStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 1df4460..9c01886 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -58,19 +58,30 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
   }
 
   @Override
+  public void encode(KV<K, V> kv, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(kv, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(KV<K, V> kv, OutputStream outStream, Context context)
       throws IOException, CoderException  {
     if (kv == null) {
       throw new CoderException("cannot encode a null KV");
     }
-    keyCoder.encode(kv.getKey(), outStream, context.nested());
+    keyCoder.encode(kv.getKey(), outStream);
     valueCoder.encode(kv.getValue(), outStream, context);
   }
 
   @Override
+  public KV<K, V> decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public KV<K, V> decode(InputStream inStream, Context context)
       throws IOException, CoderException {
-    K key = keyCoder.decode(inStream, context.nested());
+    K key = keyCoder.decode(inStream);
     V value = valueCoder.decode(inStream, context);
     return KV.of(key, value);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 7dd2a32..b24f66d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -53,7 +53,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context)
+  public void encode(T value, OutputStream outStream)
       throws CoderException, IOException {
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     valueCoder.encode(value, bos, Context.OUTER);
@@ -62,7 +62,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
   }
 
   @Override
-  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
+  public T decode(InputStream inStream) throws CoderException, IOException {
     long size = VarInt.decodeLong(inStream);
     return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index 7df9ca9..d8b3f1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -69,6 +69,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
   }
 
   @Override
+  public void encode(Map<K, V> map, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(map, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(
       Map<K, V> map,
       OutputStream outStream,
@@ -89,17 +95,22 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
     Iterator<Entry<K, V>> iterator = map.entrySet().iterator();
     Entry<K, V> entry = iterator.next();
     while (iterator.hasNext()) {
-      keyCoder.encode(entry.getKey(), outStream, context.nested());
-      valueCoder.encode(entry.getValue(), outStream, context.nested());
+      keyCoder.encode(entry.getKey(), outStream);
+      valueCoder.encode(entry.getValue(), outStream);
       entry = iterator.next();
     }
 
-    keyCoder.encode(entry.getKey(), outStream, context.nested());
+    keyCoder.encode(entry.getKey(), outStream);
     valueCoder.encode(entry.getValue(), outStream, context);
     // no flush needed as DataOutputStream does not buffer
   }
 
   @Override
+  public Map<K, V> decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public Map<K, V> decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     DataInputStream dataInStream = new DataInputStream(inStream);
@@ -110,12 +121,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
 
     Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
     for (int i = 0; i < size - 1; ++i) {
-      K key = keyCoder.decode(inStream, context.nested());
-      V value = valueCoder.decode(inStream, context.nested());
+      K key = keyCoder.decode(inStream);
+      V value = valueCoder.decode(inStream);
       retval.put(key, value);
     }
 
-    K key = keyCoder.decode(inStream, context.nested());
+    K key = keyCoder.decode(inStream);
     V value = valueCoder.decode(inStream, context);
     retval.put(key, value);
     return retval;

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index e46591e..64229e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -61,6 +61,12 @@ public class NullableCoder<T> extends StructuredCoder<T> {
   }
 
   @Override
+  public void encode(@Nullable T value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(@Nullable T value, OutputStream outStream, Context context)
       throws IOException, CoderException  {
     if (value == null) {
@@ -72,6 +78,11 @@ public class NullableCoder<T> extends StructuredCoder<T> {
   }
 
   @Override
+  public T decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   @Nullable
   public T decode(InputStream inStream, Context context) throws IOException, CoderException {
     int b = inStream.read();

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index e3b2959..9aa8493 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -119,7 +119,7 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context)
+  public void encode(T value, OutputStream outStream)
       throws IOException, CoderException {
     try {
       ObjectOutputStream oos = new ObjectOutputStream(outStream);
@@ -131,7 +131,7 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
   }
 
   @Override
-  public T decode(InputStream inStream, Context context)
+  public T decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       ObjectInputStream ois = new ObjectInputStream(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index 1f4538f..2161291 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -100,12 +100,23 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> {
   }
 
   @Override
+  public void encode(T value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     delegateCoder.encode(value, outStream, context);
   }
 
   @Override
+  public T decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException {
     return delegateCoder.decode(inStream, context);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index 44856e8..3bbc983 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -67,6 +67,12 @@ public class StringUtf8Coder extends AtomicCoder<String> {
   private StringUtf8Coder() {}
 
   @Override
+  public void encode(String value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(String value, OutputStream outStream, Context context)
       throws IOException {
     if (value == null) {
@@ -85,6 +91,11 @@ public class StringUtf8Coder extends AtomicCoder<String> {
   }
 
   @Override
+  public String decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public String decode(InputStream inStream, Context context)
       throws IOException {
     if (context.isWholeStream) {

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
index 437f10d..42c0598 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
@@ -18,9 +18,6 @@
 package org.apache.beam.sdk.coders;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -101,50 +98,6 @@ public abstract class StructuredCoder<T> extends Coder<T> {
     return builder.toString();
   }
 
-  @Override
-  public void encode(T value, OutputStream outStream)
-      throws CoderException, IOException {
-    encode(value, outStream, Coder.Context.NESTED);
-  }
-
-  @Deprecated
-  @Override
-  public void encodeOuter(T value, OutputStream outStream)
-      throws CoderException, IOException {
-    encode(value, outStream, Coder.Context.OUTER);
-  }
-
-  @Deprecated
-  public void encode(T value, OutputStream outStream, Coder.Context context)
-      throws CoderException, IOException {
-    if (context == Coder.Context.NESTED) {
-      encode(value, outStream);
-    } else {
-      encodeOuter(value, outStream);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream) throws CoderException, IOException {
-    return decode(inStream, Coder.Context.NESTED);
-  }
-
-  @Deprecated
-  @Override
-  public T decodeOuter(InputStream inStream) throws CoderException, IOException {
-    return decode(inStream, Coder.Context.OUTER);
-  }
-
-  @Deprecated
-  public T decode(InputStream inStream, Coder.Context context)
-      throws CoderException, IOException {
-    if (context == Coder.Context.NESTED) {
-      return decode(inStream);
-    } else {
-      return decodeOuter(inStream);
-    }
-  }
-
   protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
       throws NonDeterministicException {
     for (Coder<?> coder : coders) {

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 718811c..6078fa3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -39,6 +39,12 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
   protected TextualIntegerCoder() {}
 
   @Override
+  public void encode(Integer value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(Integer value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     if (value == null) {
@@ -49,6 +55,11 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
   }
 
   @Override
+  public Integer decode(InputStream inStream) throws IOException, CoderException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public Integer decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     String textualValue = StringUtf8Coder.of().decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index bda66bb..3a9abe7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -44,7 +44,7 @@ public class VarIntCoder extends AtomicCoder<Integer> {
   private VarIntCoder() {}
 
   @Override
-  public void encode(Integer value, OutputStream outStream, Context context)
+  public void encode(Integer value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Integer");
@@ -53,7 +53,7 @@ public class VarIntCoder extends AtomicCoder<Integer> {
   }
 
   @Override
-  public Integer decode(InputStream inStream, Context context)
+  public Integer decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return VarInt.decodeInt(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index bf651c3..37ad8f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -45,7 +45,7 @@ public class VarLongCoder extends StructuredCoder<Long> {
   private VarLongCoder() {}
 
   @Override
-  public void encode(Long value, OutputStream outStream, Context context)
+  public void encode(Long value, OutputStream outStream)
       throws IOException, CoderException {
     if (value == null) {
       throw new CoderException("cannot encode a null Long");
@@ -54,7 +54,7 @@ public class VarLongCoder extends StructuredCoder<Long> {
   }
 
   @Override
-  public Long decode(InputStream inStream, Context context)
+  public Long decode(InputStream inStream)
       throws IOException, CoderException {
     try {
       return VarInt.decodeLong(inStream);


Mime
View raw message