beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types
Date Fri, 05 May 2017 21:45:13 GMT
Repository: beam
Updated Branches:
  refs/heads/master 650598836 -> 947fa68a5


http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 5107355..9568324 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -23,13 +23,11 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.reflect.Type;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -74,28 +72,10 @@ public class CoderRegistryTest {
   private static class NotSerializableClass { }
 
   @Test
-  public void testSerializableFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    registry.setFallbackCoderProvider(SerializableCoder.PROVIDER);
-    Coder<?> serializableCoder = registry.getDefaultCoder(SerializableClass.class);
-
-    assertEquals(serializableCoder, SerializableCoder.of(SerializableClass.class));
-  }
-
-  @Test
-  public void testAvroFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
-    Coder<?> avroCoder = registry.getDefaultCoder(NotSerializableClass.class);
-
-    assertEquals(avroCoder, AvroCoder.of(NotSerializableClass.class));
-  }
-
-  @Test
   public void testRegisterInstantiatedCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
-    registry.registerCoder(MyValue.class, MyValueCoder.of());
-    assertEquals(registry.getDefaultCoder(MyValue.class), MyValueCoder.of());
+    registry.registerCoderForClass(MyValue.class, MyValueCoder.of());
+    assertEquals(registry.getCoder(MyValue.class), MyValueCoder.of());
   }
 
   @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
@@ -121,17 +101,9 @@ public class CoderRegistryTest {
   }
 
   @Test
-  public void testRegisterInstantiatedCoderInvalidRawtype() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("may not be used with unspecialized generic classes");
-    CoderRegistry registry = CoderRegistry.createDefault();
-    registry.registerCoder(List.class, new MyListCoder());
-  }
-
-  @Test
   public void testSimpleDefaultCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
-    assertEquals(StringUtf8Coder.of(), registry.getDefaultCoder(String.class));
+    assertEquals(StringUtf8Coder.of(), registry.getCoder(String.class));
   }
 
   @Test
@@ -139,11 +111,9 @@ public class CoderRegistryTest {
     CoderRegistry registry = CoderRegistry.createDefault();
     thrown.expect(CannotProvideCoderException.class);
     thrown.expectMessage(allOf(
-        containsString(UnknownType.class.getCanonicalName()),
-        containsString("No CoderFactory has been registered"),
-        containsString("does not have a @DefaultCoder annotation"),
-        containsString("does not implement Serializable")));
-    registry.getDefaultCoder(UnknownType.class);
+        containsString(UnknownType.class.getName()),
+        containsString("Unable to provide a Coder for")));
+    registry.getCoder(UnknownType.class);
   }
 
   @Test
@@ -151,14 +121,15 @@ public class CoderRegistryTest {
     CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<List<Integer>> listToken = new TypeDescriptor<List<Integer>>() {};
     assertEquals(ListCoder.of(VarIntCoder.of()),
-                 registry.getDefaultCoder(listToken));
+                 registry.getCoder(listToken));
 
-    registry.registerCoder(MyValue.class, MyValueCoder.class);
+    registry.registerCoderProvider(
+        CoderProviders.fromStaticMethods(MyValue.class, MyValueCoder.class));
     TypeDescriptor<KV<String, List<MyValue>>> kvToken =
         new TypeDescriptor<KV<String, List<MyValue>>>() {};
     assertEquals(KvCoder.of(StringUtf8Coder.of(),
                             ListCoder.of(MyValueCoder.of())),
-                 registry.getDefaultCoder(kvToken));
+                 registry.getCoder(kvToken));
 
   }
 
@@ -167,7 +138,7 @@ public class CoderRegistryTest {
     CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Map<Integer, String>> mapToken = new TypeDescriptor<Map<Integer, String>>() {};
     assertEquals(MapCoder.of(VarIntCoder.of(), StringUtf8Coder.of()),
-                 registry.getDefaultCoder(mapToken));
+                 registry.getCoder(mapToken));
   }
 
   @Test
@@ -177,21 +148,21 @@ public class CoderRegistryTest {
         new TypeDescriptor<Map<Integer, Map<String, Double>>>() {};
     assertEquals(
         MapCoder.of(VarIntCoder.of(), MapCoder.of(StringUtf8Coder.of(), DoubleCoder.of())),
-        registry.getDefaultCoder(mapToken));
+        registry.getCoder(mapToken));
   }
 
   @Test
   public void testParameterizedDefaultSetCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Set<Integer>> setToken = new TypeDescriptor<Set<Integer>>() {};
-    assertEquals(SetCoder.of(VarIntCoder.of()), registry.getDefaultCoder(setToken));
+    assertEquals(SetCoder.of(VarIntCoder.of()), registry.getCoder(setToken));
   }
 
   @Test
   public void testParameterizedDefaultNestedSetCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Set<Set<Integer>>> setToken = new TypeDescriptor<Set<Set<Integer>>>() {};
-    assertEquals(SetCoder.of(SetCoder.of(VarIntCoder.of())), registry.getDefaultCoder(setToken));
+    assertEquals(SetCoder.of(SetCoder.of(VarIntCoder.of())), registry.getCoder(setToken));
   }
 
   @Test
@@ -201,11 +172,11 @@ public class CoderRegistryTest {
 
     thrown.expect(CannotProvideCoderException.class);
     thrown.expectMessage(String.format(
-        "Cannot provide coder for parameterized type %s: Unable to provide a default Coder for %s",
+        "Cannot provide coder for parameterized type %s: Unable to provide a Coder for %s",
         listUnknownToken,
-        UnknownType.class.getCanonicalName()));
+        UnknownType.class.getName()));
 
-    registry.getDefaultCoder(listUnknownToken);
+    registry.getCoder(listUnknownToken);
   }
 
   @Test
@@ -214,7 +185,7 @@ public class CoderRegistryTest {
     MyGenericClass<MyValue, List<MyValue>> instance =
         new MyGenericClass<MyValue, List<MyValue>>() {};
 
-    Coder<?> bazCoder = registry.getDefaultCoder(
+    Coder<?> bazCoder = registry.getCoder(
         instance.getClass(),
         MyGenericClass.class,
         Collections.<Type, Coder<?>>singletonMap(
@@ -230,7 +201,7 @@ public class CoderRegistryTest {
     MyGenericClass<MyValue, List<MyValue>> instance =
         new MyGenericClass<MyValue, List<MyValue>>() {};
 
-    Coder<?> fooCoder = registry.getDefaultCoder(
+    Coder<?> fooCoder = registry.getCoder(
         instance.getClass(),
         MyGenericClass.class,
         Collections.<Type, Coder<?>>singletonMap(
@@ -242,49 +213,6 @@ public class CoderRegistryTest {
   }
 
   @Test
-  public void testGetDefaultCoderFromIntegerValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    Integer i = 13;
-    Coder<Integer> coder = registry.getDefaultCoder(i);
-    assertEquals(VarIntCoder.of(), coder);
-  }
-
-  @Test
-  public void testGetDefaultCoderFromNullValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    assertEquals(VoidCoder.of(), registry.getDefaultCoder((Void) null));
-  }
-
-  @Test
-  public void testGetDefaultCoderFromKvValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    KV<Integer, String> kv = KV.of(13, "hello");
-    Coder<KV<Integer, String>> coder = registry.getDefaultCoder(kv);
-    assertEquals(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()),
-        coder);
-  }
-
-  @Test
-  public void testGetDefaultCoderFromKvNullValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    KV<Void, Void> kv = KV.of((Void) null, (Void) null);
-    assertEquals(KvCoder.of(VoidCoder.of(), VoidCoder.of()),
-        registry.getDefaultCoder(kv));
-  }
-
-  @Test
-  public void testGetDefaultCoderFromNestedKvValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    KV<Integer, KV<Long, KV<String, String>>> kv = KV.of(13, KV.of(17L, KV.of("hello", "goodbye")));
-    Coder<KV<Integer, KV<Long, KV<String, String>>>> coder = registry.getDefaultCoder(kv);
-    assertEquals(
-        KvCoder.of(VarIntCoder.of(),
-            KvCoder.of(VarLongCoder.of(),
-                KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
-        coder);
-  }
-
-  @Test
   public void testTypeCompatibility() throws Exception {
     CoderRegistry.verifyCompatible(BigEndianIntegerCoder.of(), Integer.class);
     CoderRegistry.verifyCompatible(
@@ -335,7 +263,7 @@ public class CoderRegistryTest {
   public void testDefaultCoderAnnotationGenericRawtype() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     assertEquals(
-        registry.getDefaultCoder(MySerializableGeneric.class),
+        registry.getCoder(MySerializableGeneric.class),
         SerializableCoder.of(MySerializableGeneric.class));
   }
 
@@ -343,7 +271,7 @@ public class CoderRegistryTest {
   public void testDefaultCoderAnnotationGeneric() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     assertEquals(
-        registry.getDefaultCoder(new TypeDescriptor<MySerializableGeneric<String>>() {}),
+        registry.getCoder(new TypeDescriptor<MySerializableGeneric<String>>() {}),
         SerializableCoder.of(MySerializableGeneric.class));
   }
 
@@ -371,11 +299,8 @@ public class CoderRegistryTest {
     CoderRegistry registry = CoderRegistry.createDefault();
 
     thrown.expect(CannotProvideCoderException.class);
-    thrown.expectMessage(allOf(
-        containsString("No CoderFactory has been registered"),
-        containsString("does not have a @DefaultCoder annotation"),
-        containsString("does not implement Serializable")));
-    registry.getDefaultCoder(TypeDescriptor.of(
+    thrown.expectMessage("Unable to provide a Coder");
+    registry.getCoder(TypeDescriptor.of(
         TestGenericClass.class.getTypeParameters()[0]));
   }
 
@@ -388,8 +313,7 @@ public class CoderRegistryTest {
 
     TypeDescriptor type = TypeDescriptor.of(
         TestSerializableGenericClass.class.getTypeParameters()[0]);
-    assertEquals(registry.getDefaultCoder(type),
-        SerializableCoder.of(type));
+    assertEquals(SerializableCoder.of(type), registry.getCoder(type));
   }
 
   private static class TestSerializableGenericClass<TestGenericT extends Serializable> {}
@@ -450,12 +374,6 @@ public class CoderRegistryTest {
       return INSTANCE;
     }
 
-    @SuppressWarnings("unused")
-    public static List<Object> getInstanceComponents(
-        @SuppressWarnings("unused") MyValue exampleValue) {
-      return Arrays.asList();
-    }
-
     @Override
     public void encode(MyValue value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -498,7 +416,12 @@ public class CoderRegistryTest {
     }
   }
 
-  private static class UnknownType { }
+  /**
+   * This type is incompatible with all known coder providers such as Serializable,
+   * {@code @DefaultCoder} which allows testing scenarios where coder lookup fails.
+   */
+  private static class UnknownType {
+  }
 
   @DefaultCoder(SerializableCoder.class)
   private static class MySerializableGeneric<T extends Serializable> implements Serializable {
@@ -506,20 +429,34 @@ public class CoderRegistryTest {
     private T foo;
   }
 
+  /**
+   * This type is incompatible with all known coder providers such as Serializable,
+   * {@code @DefaultCoder} which allows testing the automatic registration mechanism.
+   */
+  private static class AutoRegistrationClass {
+  }
+
+  private static class AutoRegistrationClassCoder extends CustomCoder<AutoRegistrationClass> {
+    private static final AutoRegistrationClassCoder INSTANCE = new AutoRegistrationClassCoder();
+  }
+
   @Test
-  public void testAutomaticRegistrationOfCoders() throws Exception {
-    assertEquals(CoderRegistry.createDefault().getDefaultCoder(MyValue.class), MyValueCoder.of());
+  public void testAutomaticRegistrationOfCoderProviders() throws Exception {
+    assertEquals(AutoRegistrationClassCoder.INSTANCE,
+        CoderRegistry.createDefault().getCoder(AutoRegistrationClass.class));
   }
 
   /**
-   * A {@link CoderRegistrar} to demonstrate default {@link Coder} registration.
+   * A {@link CoderProviderRegistrar} to demonstrate default {@link Coder} registration.
    */
-  @AutoService(CoderRegistrar.class)
-  public static class RegisteredTestCoderRegistrar implements CoderRegistrar {
+  @AutoService(CoderProviderRegistrar.class)
+  public static class RegisteredTestCoderProviderRegistrar implements CoderProviderRegistrar {
     @Override
-    public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
-      return ImmutableMap.<Class<?>, CoderFactory>of(
-          MyValue.class, CoderFactories.forCoder(MyValueCoder.of()));
+    public List<CoderProvider> getCoderProviders() {
+      return ImmutableList.of(
+          CoderProviders.forCoder(
+              TypeDescriptor.of(AutoRegistrationClass.class),
+              AutoRegistrationClassCoder.INSTANCE));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
index d335b18..aa8d94c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
@@ -18,13 +18,15 @@
 package org.apache.beam.sdk.coders;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
+import org.apache.beam.sdk.coders.DefaultCoder.DefaultCoderProviderRegistrar.DefaultCoderProvider;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -32,15 +34,12 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests of Coder defaults.
+ * Tests for {@link DefaultCoder}.
  */
 @RunWith(JUnit4.class)
 public class DefaultCoderTest {
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  public CoderRegistry registry = CoderRegistry.createDefault();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @DefaultCoder(AvroCoder.class)
   private static class AvroRecord {
@@ -75,6 +74,17 @@ public class DefaultCoderTest {
     protected CustomSerializableCoder() {
       super(CustomRecord.class, TypeDescriptor.of(CustomRecord.class));
     }
+
+    @SuppressWarnings("unused")
+    public static CoderProvider getCoderProvider() {
+      return new CoderProvider() {
+        @Override
+        public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+            List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+          return CustomSerializableCoder.of((TypeDescriptor) typeDescriptor);
+        }
+      };
+    }
   }
 
   private static class OldCustomSerializableCoder extends SerializableCoder<OldCustomRecord> {
@@ -89,33 +99,52 @@ public class DefaultCoderTest {
     protected OldCustomSerializableCoder() {
       super(OldCustomRecord.class, TypeDescriptor.of(OldCustomRecord.class));
     }
+
+    @SuppressWarnings("unused")
+    public static CoderProvider getCoderProvider() {
+      return new CoderProvider() {
+        @Override
+        public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+            List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+          return OldCustomSerializableCoder.of((Class) typeDescriptor.getRawType());
+        }
+      };
+    }
   }
 
   @Test
-  public void testDefaultCoderClasses() throws Exception {
-    assertThat(registry.getDefaultCoder(AvroRecord.class), instanceOf(AvroCoder.class));
-    assertThat(registry.getDefaultCoder(SerializableBase.class),
-        instanceOf(SerializableCoder.class));
-    assertThat(registry.getDefaultCoder(SerializableRecord.class),
+  public void testCodersWithoutComponents() throws Exception {
+    CoderRegistry registry = CoderRegistry.createDefault();
+    registry.registerCoderProvider(
+        new DefaultCoderProvider());
+    assertThat(registry.getCoder(AvroRecord.class),
+        instanceOf(AvroCoder.class));
+    assertThat(registry.getCoder(SerializableRecord.class),
         instanceOf(SerializableCoder.class));
-    assertThat(registry.getDefaultCoder(CustomRecord.class),
+    assertThat(registry.getCoder(CustomRecord.class),
         instanceOf(CustomSerializableCoder.class));
-    assertThat(registry.getDefaultCoder(OldCustomRecord.class),
+    assertThat(registry.getCoder(OldCustomRecord.class),
         instanceOf(OldCustomSerializableCoder.class));
   }
 
   @Test
   public void testDefaultCoderInCollection() throws Exception {
-    assertThat(registry.getDefaultCoder(new TypeDescriptor<List<AvroRecord>>(){}),
-        instanceOf(ListCoder.class));
-    assertThat(registry.getDefaultCoder(new TypeDescriptor<List<SerializableRecord>>(){}),
-        equalTo((Coder<List<SerializableRecord>>)
+    CoderRegistry registry = CoderRegistry.createDefault();
+    registry.registerCoderProvider(
+        new DefaultCoderProvider());
+    Coder<List<AvroRecord>> avroRecordCoder =
+        registry.getCoder(new TypeDescriptor<List<AvroRecord>>(){});
+    assertThat(avroRecordCoder, instanceOf(ListCoder.class));
+    assertThat(((ListCoder) avroRecordCoder).getElemCoder(), instanceOf(AvroCoder.class));
+    assertThat(registry.getCoder(new TypeDescriptor<List<SerializableRecord>>(){}),
+        Matchers.<Coder<List<SerializableRecord>>>equalTo(
             ListCoder.of(SerializableCoder.of(SerializableRecord.class))));
   }
 
   @Test
   public void testUnknown() throws Exception {
     thrown.expect(CannotProvideCoderException.class);
-    registry.getDefaultCoder(Unknown.class);
+    new DefaultCoderProvider().coderFor(
+        TypeDescriptor.of(Unknown.class), Collections.<Coder<?>>emptyList());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index 1e56135..1141fb5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.coders;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.util.Arrays;
@@ -62,21 +60,6 @@ public class IterableCoderTest {
   }
 
   @Test
-  public void testGetInstanceComponentsNonempty() {
-    Iterable<Integer> iterable = Arrays.asList(2, 58, 99, 5);
-    List<Object> components = IterableCoder.getInstanceComponents(iterable);
-    assertEquals(1, components.size());
-    assertEquals(2, components.get(0));
-  }
-
-  @Test
-  public void testGetInstanceComponentsEmpty() {
-    Iterable<Integer> iterable = Arrays.asList();
-    List<Object> components = IterableCoder.getInstanceComponents(iterable);
-    assertNull(components);
-  }
-
-  @Test
   public void testCoderSerializable() throws Exception {
     CoderProperties.coderSerializable(TEST_CODER);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
index 35239d6..bcc4400 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.coders;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.util.Arrays;
@@ -62,21 +60,6 @@ public class ListCoderTest {
   }
 
   @Test
-  public void testGetInstanceComponentsNonempty() throws Exception {
-    List<Integer> list = Arrays.asList(21, 5, 3, 5);
-    List<Object> components = ListCoder.getInstanceComponents(list);
-    assertEquals(1, components.size());
-    assertEquals(21, components.get(0));
-  }
-
-  @Test
-  public void testGetInstanceComponentsEmpty() throws Exception {
-    List<Integer> list = Arrays.asList();
-    List<Object> components = ListCoder.getInstanceComponents(list);
-    assertNull(components);
-  }
-
-  @Test
   public void testEmptyList() throws Exception {
     List<Integer> list = Collections.emptyList();
     Coder<List<Integer>> coder = ListCoder.of(VarIntCoder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
index a52e6cb..19c895e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
@@ -18,14 +18,11 @@
 package org.apache.beam.sdk.coders;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -64,23 +61,6 @@ public class MapCoderTest {
         MapCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
   }
 
-  @Test
-  public void testGetInstanceComponentsNonempty() {
-    Map<Integer, String> map = new HashMap<>();
-    map.put(17, "foozle");
-    List<Object> components = MapCoder.getInstanceComponents(map);
-    assertEquals(2, components.size());
-    assertEquals(17, components.get(0));
-    assertEquals("foozle", components.get(1));
-  }
-
-  @Test
-  public void testGetInstanceComponentsEmpty() {
-    Map<Integer, String> map = new HashMap<>();
-    List<Object> components = MapCoder.getInstanceComponents(map);
-    assertNull(components);
-  }
-
   /**
    * Generated data to check that the wire format has not changed. To regenerate, see
    * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index ef6df34..d97eea6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -230,4 +231,14 @@ public class SerializableCoderTest implements Serializable {
         SerializableCoder.of(MyRecord.class).getEncodedTypeDescriptor(),
         Matchers.equalTo(TypeDescriptor.of(MyRecord.class)));
   }
+
+  private static class AutoRegistration implements Serializable {
+    private static final long serialVersionUID = 42L;
+  }
+
+  @Test
+  public void testSerializableCoderProviderIsRegistered() throws Exception {
+    assertThat(CoderRegistry.createDefault().getCoder(AutoRegistration.class),
+        instanceOf(SerializableCoder.class));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 62edac9..8a4d563 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -104,7 +104,7 @@ public class  CombineFnsTest {
   @Test
   @Category(ValidatesRunner.class)
   public void testComposedCombine() {
-    p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
+    p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
 
     PCollection<KV<String, KV<Integer, UserString>>> perKeyInput = p.apply(
         Create.timestamped(
@@ -156,7 +156,7 @@ public class  CombineFnsTest {
   @Test
   @Category(ValidatesRunner.class)
   public void testComposedCombineWithContext() {
-    p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
+    p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
 
     PCollectionView<String> view = p
         .apply(Create.of("I"))
@@ -218,8 +218,10 @@ public class  CombineFnsTest {
   @Test
   @Category(ValidatesRunner.class)
   public void testComposedCombineNullValues() {
-    p.getCoderRegistry().registerCoder(UserString.class, NullableCoder.of(UserStringCoder.of()));
-    p.getCoderRegistry().registerCoder(String.class, NullableCoder.of(StringUtf8Coder.of()));
+    p.getCoderRegistry().registerCoderForClass(
+        UserString.class, NullableCoder.of(UserStringCoder.of()));
+    p.getCoderRegistry().registerCoderForClass(
+        String.class, NullableCoder.of(StringUtf8Coder.of()));
 
     PCollection<KV<String, KV<Integer, UserString>>> perKeyInput = p.apply(
         Create.timestamped(

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 76f61b3..a458812 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -321,7 +321,7 @@ public class CreateTest {
   @Test
   public void testCreateTimestampedDefaultOutputCoderUsingTypeDescriptor() throws Exception {
     Coder<Record> coder = new RecordCoder();
-    p.getCoderRegistry().registerCoder(Record.class, coder);
+    p.getCoderRegistry().registerCoderForClass(Record.class, coder);
     PBegin pBegin = PBegin.in(p);
     Create.TimestampedValues<Record> values =
         Create.timestamped(
@@ -364,7 +364,7 @@ public class CreateTest {
   @Test
   public void testCreateDefaultOutputCoderUsingInference() throws Exception {
     Coder<Record> coder = new RecordCoder();
-    p.getCoderRegistry().registerCoder(Record.class, coder);
+    p.getCoderRegistry().registerCoderForClass(Record.class, coder);
     PBegin pBegin = PBegin.in(p);
     Create.Values<Record> values = Create.of(new Record(), new Record(), new Record());
     Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
@@ -384,7 +384,7 @@ public class CreateTest {
   @Test
   public void testCreateDefaultOutputCoderUsingTypeDescriptor() throws Exception {
     Coder<Record> coder = new RecordCoder();
-    p.getCoderRegistry().registerCoder(Record.class, coder);
+    p.getCoderRegistry().registerCoderForClass(Record.class, coder);
     PBegin pBegin = PBegin.in(p);
     Create.Values<Record> values =
         Create.of(new Record(), new Record2()).withType(new TypeDescriptor<Record>() {});

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index b24071e..11f284f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -94,8 +94,8 @@ public class FlatMapElementsTest implements Serializable {
 
     assertThat(output.getTypeDescriptor(),
         equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {}));
-    assertThat(pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()),
-        equalTo(pipeline.getCoderRegistry().getDefaultCoder(new TypeDescriptor<String>() {})));
+    assertThat(pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()),
+        equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {})));
 
     // Make sure the pipeline runs
     pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 6982e01..aba33eb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -392,7 +393,8 @@ public class GroupByKeyTest {
     final int numValues = 10;
     final int numKeys = 5;
 
-    p.getCoderRegistry().registerCoder(BadEqualityKey.class, DeterministicKeyCoder.class);
+    p.getCoderRegistry().registerCoderProvider(
+        CoderProviders.fromStaticMethods(BadEqualityKey.class, DeterministicKeyCoder.class));
 
     // construct input data
     List<KV<BadEqualityKey, Long>> input = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 7bf94a0..241b60e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -187,8 +187,8 @@ public class MapElementsTest implements Serializable {
         }));
     assertThat(output.getTypeDescriptor(),
         equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {}));
-    assertThat(pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()),
-        equalTo(pipeline.getCoderRegistry().getDefaultCoder(new TypeDescriptor<String>() {})));
+    assertThat(pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()),
+        equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {})));
 
     // Make sure the pipeline runs too
     pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 073957f..cbbe7f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -985,11 +985,6 @@ public class ParDoTest implements Serializable {
       return INSTANCE;
     }
 
-    @SuppressWarnings("unused") // used to create a CoderFactory
-    public static List<Object> getInstanceComponents(TestDummy exampleValue) {
-      return Collections.emptyList();
-    }
-
     @Override
     public void encode(TestDummy value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -1657,7 +1652,7 @@ public class ParDoTest implements Serializable {
   public void testValueStateCoderInference() {
     final String stateId = "foo";
     MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, Integer>, MyInteger> fn =
         new DoFn<KV<String, Integer>, MyInteger>() {
@@ -1750,7 +1745,7 @@ public class ParDoTest implements Serializable {
   public void testCoderInferenceOfList() {
     final String stateId = "foo";
     MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, Integer>, List<MyInteger>> fn =
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
@@ -1966,7 +1961,7 @@ public class ParDoTest implements Serializable {
   public void testBagStateCoderInference() {
     final String stateId = "foo";
     Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, Integer>, List<MyInteger>> fn =
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
@@ -2085,7 +2080,7 @@ public class ParDoTest implements Serializable {
     final String stateId = "foo";
     final String countStateId = "count";
     Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, Integer>, Set<MyInteger>> fn =
         new DoFn<KV<String, Integer>, Set<MyInteger>>() {
@@ -2217,7 +2212,7 @@ public class ParDoTest implements Serializable {
     final String stateId = "foo";
     final String countStateId = "count";
     Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn =
         new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
@@ -2345,7 +2340,7 @@ public class ParDoTest implements Serializable {
   @Test
   @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testCombiningStateCoderInference() {
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, MyIntegerCoder.of());
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, MyIntegerCoder.of());
 
     final String stateId = "foo";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index f26dd59..489493a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.state.StateSpec;
@@ -422,7 +423,8 @@ public class DoFnInvokersTest {
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
 
     CoderRegistry coderRegistry = CoderRegistry.createDefault();
-    coderRegistry.registerCoder(RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class);
+    coderRegistry.registerCoderProvider(CoderProviders.fromStaticMethods(
+        RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class));
     assertThat(
         invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
         instanceOf(CoderForDefaultTracker.class));

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index f752b1c..e1eb2fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -77,14 +78,10 @@ public class TypedPValueTest {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
+    thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
 
-    tuple.get(untypedOutputTag).getCoder();
+    Coder<?> coder = tuple.get(untypedOutputTag).getCoder();
+    System.out.println(coder);
   }
 
   @Test
@@ -96,12 +93,7 @@ public class TypedPValueTest {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
+    thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
 
     tuple.get(untypedOutputTag).getCoder();
   }
@@ -126,7 +118,10 @@ public class TypedPValueTest {
     assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class));
   }
 
-  // A simple class for which there should be no obvious Coder.
+  /**
+   * This type is incompatible with all known coder providers such as Serializable,
+   * {@code @DefaultCoder} which allows testing coder registry lookup failure cases.
+   */
   static class EmptyClass {
   }
 
@@ -149,10 +144,7 @@ public class TypedPValueTest {
     thrown.expectMessage(not(containsString("erasure")));
     thrown.expectMessage(not(containsString("see TupleTag Javadoc")));
     // Instead, expect output suggesting other possible fixes.
-    thrown.expectMessage(containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(containsString("Building a Coder from the fallback CoderProvider failed"));
+    thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
 
     input.getCoder();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 9577c6e..968a2fa 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -31,6 +31,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
@@ -41,6 +42,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderProvider;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -105,13 +107,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  */
 public class ProtoCoder<T extends Message> extends CustomCoder<T> {
 
-  /**
-   * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
-   * {@link ExtensionRegistry}.
-   */
-  public static CoderProvider coderProvider() {
-    return PROVIDER;
-  }
 
   /**
    * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
@@ -291,36 +286,47 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
     return memoizedParser;
   }
 
-  static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
+  /**
+   * Returns a {@link CoderProvider} which uses the {@link ProtoCoder} for
+   * {@link Message proto messages}.
+   *
+   * <p>This method is invoked reflectively from {@link DefaultCoder}.
+   */
+  public static CoderProvider getCoderProvider() {
+    return new ProtoCoderProvider();
+  }
+
+  static final TypeDescriptor<Message> MESSAGE_TYPE = new TypeDescriptor<Message>() {};
 
   /**
-   * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
-   * {@link #coderProvider()}.
+   * A {@link CoderProvider} for {@link Message proto messages}.
    */
-  private static final CoderProvider PROVIDER =
-      new CoderProvider() {
-        @Override
-        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-          if (!type.isSubtypeOf(CHECK)) {
-            throw new CannotProvideCoderException(
-                String.format(
-                    "Cannot provide %s because %s is not a subclass of %s",
-                    ProtoCoder.class.getSimpleName(),
-                    type,
-                    Message.class.getName()));
-          }
+  private static class ProtoCoderProvider extends CoderProvider {
 
-          @SuppressWarnings("unchecked")
-          TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
-          try {
-            @SuppressWarnings("unchecked")
-            Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
-            return coder;
-          } catch (IllegalArgumentException e) {
-            throw new CannotProvideCoderException(e);
-          }
-        }
-      };
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      if (!typeDescriptor.isSubtypeOf(MESSAGE_TYPE)) {
+        throw new CannotProvideCoderException(
+            String.format(
+                "Cannot provide %s because %s is not a subclass of %s",
+                ProtoCoder.class.getSimpleName(),
+                typeDescriptor,
+                Message.class.getName()));
+      }
+
+      @SuppressWarnings("unchecked")
+      TypeDescriptor<? extends Message> messageType =
+          (TypeDescriptor<? extends Message>) typeDescriptor;
+      try {
+        @SuppressWarnings("unchecked")
+        Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
+        return coder;
+      } catch (IllegalArgumentException e) {
+        throw new CannotProvideCoderException(e);
+      }
+    }
+  }
 
   private SortedSet<String> getSortedExtensionClasses() {
     SortedSet<String> ret = new TreeSet<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
new file mode 100644
index 0000000..1cb79d7
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with Google Protobuf.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class ProtobufCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(ByteString.class), ByteStringCoder.of()),
+        ProtoCoder.getCoderProvider());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
deleted file mode 100644
index fb39a14..0000000
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.protobuf;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/**
- * A {@link CoderRegistrar} for standard types used with Google Protobuf.
- */
-@AutoService(CoderRegistrar.class)
-public class ProtobufCoderRegistrar implements CoderRegistrar {
-  @Override
-  public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
-    return ImmutableMap.<Class<?>, CoderFactory>of(
-        ByteString.class, CoderFactories.forCoder(ByteStringCoder.of()));
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
index 6d00b86..d79bf4c 100644
--- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
+++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
@@ -53,7 +54,8 @@ public class ProtoCoderTest {
 
     assertEquals(
         ProtoCoder.of(new TypeDescriptor<MessageA>() {}),
-        ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() {}));
+        ProtoCoder.getCoderProvider().coderFor(
+            new TypeDescriptor<MessageA>() {}, Collections.<Coder<?>>emptyList()));
   }
 
   @Test
@@ -61,7 +63,8 @@ public class ProtoCoderTest {
     thrown.expect(CannotProvideCoderException.class);
     thrown.expectMessage("java.lang.Integer is not a subclass of com.google.protobuf.Message");
 
-    ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {});
+    ProtoCoder.getCoderProvider().coderFor(
+        new TypeDescriptor<Integer>() {}, Collections.<Coder<?>>emptyList());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
new file mode 100644
index 0000000..c6fd849
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with {@link BigQueryIO}.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class BigQueryCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(TableRow.class), TableRowJsonCoder.of()),
+        CoderProviders.forCoder(TypeDescriptor.of(TableRowInfo.class), TableRowInfoCoder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
deleted file mode 100644
index 847c7b5..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/**
- * A {@link CoderRegistrar} for standard types used with {@link BigQueryIO}.
- */
-@AutoService(CoderRegistrar.class)
-public class BigQueryCoderRegistrar implements CoderRegistrar {
-  @Override
-  public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
-    return ImmutableMap.of(
-        TableRow.class, CoderFactories.forCoder(TableRowJsonCoder.of()),
-        TableRowInfo.class, CoderFactories.forCoder(TableRowInfoCoder.of()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index dc8bcff..edb1e0d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -168,7 +168,7 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
           Type parameter = parameterized.getActualTypeArguments()[1];
           @SuppressWarnings("unchecked")
           Class<DestinationT> parameterClass = (Class<DestinationT>) parameter;
-          return registry.getDefaultCoder(parameterClass);
+          return registry.getCoder(parameterClass);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
new file mode 100644
index 0000000..57bc903
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link CoderProviderRegistrar} for standard types used with {@link PubsubIO}. */
+@AutoService(CoderProviderRegistrar.class)
+public class PubsubCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class),
+            PubsubMessageWithAttributesCoder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
deleted file mode 100644
index 062f350..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/** A {@link CoderRegistrar} for standard types used with {@link PubsubIO}. */
-@AutoService(CoderRegistrar.class)
-public class PubsubCoderRegistrar implements CoderRegistrar {
-  @Override
-  public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
-    return ImmutableMap.<Class<?>, CoderFactory>of(
-        PubsubMessage.class, CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
new file mode 100644
index 0000000..bd5246a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link BigQueryCoderProviderRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryCoderProviderRegistrarTest {
+  @Test
+  public void testTableRowCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getCoder(TableRow.class);
+  }
+
+  @Test
+  public void testTableRowInfoCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getCoder(TableRowInfo.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
deleted file mode 100644
index e7e9fe1..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableRow;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link BigQueryCoderRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryCoderRegistrarTest {
-  @Test
-  public void testTableRowCoderIsRegistered() throws Exception {
-    CoderRegistry.createDefault().getDefaultCoder(TableRow.class);
-  }
-
-  @Test
-  public void testTableRowInfoCoderIsRegistered() throws Exception {
-    CoderRegistry.createDefault().getDefaultCoder(TableRowInfo.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index ebb4b39..9054f99 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -77,5 +77,12 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 15877b0..8fddfe0 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.hadoop;
 
+import com.google.auto.service.AutoService;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -25,9 +26,14 @@ import java.io.OutputStream;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
 import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -110,4 +116,54 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> {
     return type.hashCode();
   }
 
+  /**
+   * Returns a {@link CoderProvider} which uses the {@link WritableCoder} for Hadoop
+   * {@link Writable writable types}.
+   *
+   * <p>This method is invoked reflectively from {@link DefaultCoder}.
+   */
+  public static CoderProvider getCoderProvider() {
+    return new WritableCoderProvider();
+  }
+
+  /**
+   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
+   * {@link Writable writable types}.
+   */
+  @AutoService(CoderProviderRegistrar.class)
+  public static class WritableCoderProviderRegistrar implements CoderProviderRegistrar {
+
+    @Override
+    public List<CoderProvider> getCoderProviders() {
+      return Collections.singletonList(getCoderProvider());
+    }
+  }
+
+  /**
+   * A {@link CoderProvider} for Hadoop {@link Writable writable types}.
+   */
+  private static class WritableCoderProvider extends CoderProvider {
+    private static final TypeDescriptor<Writable> WRITABLE_TYPE = new TypeDescriptor<Writable>() {};
+
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      if (!typeDescriptor.isSubtypeOf(WRITABLE_TYPE)) {
+        throw new CannotProvideCoderException(
+            String.format(
+                "Cannot provide %s because %s does not implement the interface %s",
+                WritableCoder.class.getSimpleName(),
+                typeDescriptor,
+                Writable.class.getName()));
+      }
+
+      try {
+        @SuppressWarnings("unchecked")
+        Coder<T> coder = WritableCoder.of((Class) typeDescriptor.getRawType());
+        return coder;
+      } catch (IllegalArgumentException e) {
+        throw new CannotProvideCoderException(e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
index 8127773..9f2a54d 100644
--- a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
+++ b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.sdk.io.hadoop;
 
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -42,4 +46,10 @@ public class WritableCoderTest {
 
     CoderProperties.coderDecodeEncodeEqual(coder, value);
   }
+
+  @Test
+  public void testAutomaticRegistrationOfCoderProvider() throws Exception {
+    assertThat(CoderRegistry.createDefault().getCoder(NullWritable.class),
+        instanceOf(WritableCoder.class));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e676455..ba84c2a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -304,7 +304,7 @@ public class KafkaIO {
         Class<T> clazz = (Class<T>) parameter;
 
         try {
-          return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
+          return NullableCoder.of(coderRegistry.getCoder(clazz));
         } catch (CannotProvideCoderException e) {
           throw new RuntimeException(
                   String.format("Unable to automatically infer a Coder for "

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
index 790f51e..61df23b 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
@@ -89,13 +88,6 @@ public class DistinctJava8Test {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");
-    thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
 
     // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is
     // implemented with

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
index a3481e1..b38250e 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
@@ -91,7 +91,7 @@ public class FilterJava8Test implements Serializable {
         .apply(Filter.by(s -> true));
 
     thrown.expect(CannotProvideCoderException.class);
-    pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor());
+    pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
index 7d97740..94353a5 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
@@ -69,6 +69,6 @@ public class PartitionJava8Test implements Serializable {
         .apply(Partition.of(1, (element, numPartitions) -> 0));
 
     thrown.expect(CannotProvideCoderException.class);
-    pipeline.getCoderRegistry().getDefaultCoder(output.get(0).getTypeDescriptor());
+    pipeline.getCoderRegistry().getCoder(output.get(0).getTypeDescriptor());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index 4f0361e..260dd24 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.hamcrest.Matchers.containsString;
-
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -69,13 +67,6 @@ public class WithKeysJava8Test {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys");
-    thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
 
     p.run();
   }


Mime
View raw message