beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [2/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types
Date Fri, 05 May 2017 21:45:14 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 7f0f632..2ba548a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -20,12 +20,10 @@ package org.apache.beam.sdk.coders;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
@@ -37,12 +35,13 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -57,102 +56,118 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A {@link CoderRegistry} allows registering the default {@link Coder} to use for a Java class,
- * and looking up and instantiating the default {@link Coder} for a Java type.
+ * A {@link CoderRegistry} allows creating a {@link Coder} for a given Java {@link Class class} or
+ * {@link TypeDescriptor type descriptor}.
  *
- * <p>{@link CoderRegistry} uses the following mechanisms to determine a default {@link Coder} for a
- * Java class, in order of precedence:
- * <ol>
- *   <li>Registration:
- *     <ul>
- *       <li>A {@link CoderFactory} can be registered to handle a particular class via
- *           {@link #registerCoder(Class, CoderFactory)}.</li>
- *       <li>A {@link Coder} class with the static methods to satisfy
- *           {@link CoderFactories#fromStaticMethods} can be registered via
- *           {@link #registerCoder(Class, Class)}.</li>
- *       <li>Types can be automatically registered via {@link CoderRegistrar coder registrars}.</li>
- *     </ul>
- *   <li>Annotations: {@link DefaultCoder} can be used to annotate a type with
- *       the default {@code Coder} type. The {@link Coder} class must satisfy the requirements
- *       of {@link CoderProviders#fromStaticMethods}.
- *   <li>Fallback: A fallback {@link CoderProvider} is used to attempt to provide a {@link Coder}
- *       for any type. By default, there is {@link SerializableCoder#PROVIDER}, which can provide a
- *       {@link Coder} for any type that is serializable via Java serialization. The fallback
- *       {@link CoderProvider} can be get and set respectively using
- *       {@link #getFallbackCoderProvider()} and {@link #setFallbackCoderProvider}. Multiple
- *       fallbacks can be chained together using {@link CoderProviders#firstOf}.
- * </ol>
+ * <p>Creation of the {@link Coder} is delegated to one of the many registered
+ * {@link CoderProvider coder providers} based upon the registration order.
+ *
+ * <p>By default, the {@link CoderProvider coder provider} precedence order is as follows:
+ * <ul>
+ *   <li>Coder providers registered programmatically with
+ *       {@link CoderRegistry#registerCoderProvider(CoderProvider)}.</li>
+ *   <li>A default coder provider for common Java (Byte, Double, List, ...) and
+ *       Apache Beam (KV, ...) types.</li>
+ *   <li>Coder providers registered automatically through a {@link CoderProviderRegistrar} using
+ *       a {@link ServiceLoader}. Note that the {@link ServiceLoader} registration order is
+ *       consistent but may change due to the addition or removal of libraries exposed
+ *       to the application. This can impact the coder returned if multiple coder providers
+ *       are capable of supplying a coder for the specified type.</li>
+ * </ul>
+ *
+ * <p>Note that if multiple {@link CoderProvider coder providers} can provide a {@link Coder} for
+ * a given type, the precedence order above defines which {@link CoderProvider} is chosen.
  */
-public class CoderRegistry implements CoderProvider {
+public class CoderRegistry {
 
   private static final Logger LOG = LoggerFactory.getLogger(CoderRegistry.class);
-  private static final Map<Class<?>, CoderFactory> REGISTERED_CODER_FACTORIES_PER_CLASS;
+  private static final List<CoderProvider> REGISTERED_CODER_FACTORIES;
+
+  /** A {@link CoderProvider} for common Java SDK and Apache Beam SDK types. */
+  private static class CommonTypes extends CoderProvider {
+    private final Map<Class<?>, CoderProvider> commonTypesToCoderProviders;
+
+    private CommonTypes() {
+      ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder();
+      builder.put(Byte.class,
+          CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class));
+      builder.put(BitSet.class,
+          CoderProviders.fromStaticMethods(BitSet.class, BitSetCoder.class));
+      builder.put(Double.class,
+          CoderProviders.fromStaticMethods(Double.class, DoubleCoder.class));
+      builder.put(Instant.class,
+          CoderProviders.fromStaticMethods(Instant.class, InstantCoder.class));
+      builder.put(Integer.class,
+          CoderProviders.fromStaticMethods(Integer.class, VarIntCoder.class));
+      builder.put(Iterable.class,
+          CoderProviders.fromStaticMethods(Iterable.class, IterableCoder.class));
+      builder.put(KV.class,
+          CoderProviders.fromStaticMethods(KV.class, KvCoder.class));
+      builder.put(List.class,
+          CoderProviders.fromStaticMethods(List.class, ListCoder.class));
+      builder.put(Long.class,
+          CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class));
+      builder.put(Map.class,
+          CoderProviders.fromStaticMethods(Map.class, MapCoder.class));
+      builder.put(Set.class,
+          CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
+      builder.put(String.class,
+          CoderProviders.fromStaticMethods(String.class, StringUtf8Coder.class));
+      builder.put(TimestampedValue.class,
+          CoderProviders.fromStaticMethods(
+              TimestampedValue.class, TimestampedValue.TimestampedValueCoder.class));
+      builder.put(Void.class,
+          CoderProviders.fromStaticMethods(Void.class, VoidCoder.class));
+      builder.put(byte[].class,
+          CoderProviders.fromStaticMethods(byte[].class, ByteArrayCoder.class));
+      builder.put(IntervalWindow.class,
+          CoderProviders.forCoder(
+              TypeDescriptor.of(IntervalWindow.class), IntervalWindow.getCoder()));
+      commonTypesToCoderProviders = builder.build();
+    }
+
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      CoderProvider factory = commonTypesToCoderProviders.get(typeDescriptor.getRawType());
+      if (factory == null) {
+        throw new CannotProvideCoderException(
+            String.format("%s is not one of the common types.", typeDescriptor));
+      }
+      return factory.coderFor(typeDescriptor, componentCoders);
+    }
+  }
 
   static {
-    // Register the standard coders first so they are chosen as the default
-    Multimap<Class<?>, CoderFactory> codersToRegister = HashMultimap.create();
-    codersToRegister.put(Byte.class, CoderFactories.fromStaticMethods(ByteCoder.class));
-    codersToRegister.put(BitSet.class, CoderFactories.fromStaticMethods(BitSetCoder.class));
-    codersToRegister.put(Double.class, CoderFactories.fromStaticMethods(DoubleCoder.class));
-    codersToRegister.put(Instant.class, CoderFactories.fromStaticMethods(InstantCoder.class));
-    codersToRegister.put(Integer.class, CoderFactories.fromStaticMethods(VarIntCoder.class));
-    codersToRegister.put(Iterable.class, CoderFactories.fromStaticMethods(IterableCoder.class));
-    codersToRegister.put(KV.class, CoderFactories.fromStaticMethods(KvCoder.class));
-    codersToRegister.put(List.class, CoderFactories.fromStaticMethods(ListCoder.class));
-    codersToRegister.put(Long.class, CoderFactories.fromStaticMethods(VarLongCoder.class));
-    codersToRegister.put(Map.class, CoderFactories.fromStaticMethods(MapCoder.class));
-    codersToRegister.put(Set.class, CoderFactories.fromStaticMethods(SetCoder.class));
-    codersToRegister.put(String.class, CoderFactories.fromStaticMethods(StringUtf8Coder.class));
-    codersToRegister.put(TimestampedValue.class,
-        CoderFactories.fromStaticMethods(TimestampedValue.TimestampedValueCoder.class));
-    codersToRegister.put(Void.class, CoderFactories.fromStaticMethods(VoidCoder.class));
-    codersToRegister.put(byte[].class, CoderFactories.fromStaticMethods(ByteArrayCoder.class));
-    codersToRegister.put(IntervalWindow.class, CoderFactories.forCoder(IntervalWindow.getCoder()));
+    // Register the standard coders first so they are chosen over ServiceLoader ones
+    List<CoderProvider> codersToRegister = new ArrayList<>();
+    codersToRegister.add(new CommonTypes());
 
     // Enumerate all the CoderRegistrars in a deterministic order, adding all coders to register
-    Set<CoderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+    Set<CoderProviderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
     registrars.addAll(Lists.newArrayList(
-        ServiceLoader.load(CoderRegistrar.class, ReflectHelpers.findClassLoader())));
-    for (CoderRegistrar registrar : registrars) {
-      for (Map.Entry<Class<?>, CoderFactory> entry
-          : registrar.getCoderFactoriesToUseForClasses().entrySet()) {
-        codersToRegister.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    // Warn the user if multiple coders want to be registered for the same class
-    Map<Class<?>, Collection<CoderFactory>> multipleRegistrations =
-        Maps.filterValues(codersToRegister.asMap(), new Predicate<Collection<CoderFactory>>() {
-      @Override
-      public boolean apply(@Nonnull Collection<CoderFactory> input) {
-        return input.size() > 1;
-      }
-    });
-    for (Map.Entry<Class<?>, Collection<CoderFactory>> entry : multipleRegistrations.entrySet()) {
-      LOG.warn("Multiple CoderFactory registrations {} found for class {}, using {}.",
-          entry.getKey(), entry.getValue(), entry.getValue().iterator().next());
+        ServiceLoader.load(CoderProviderRegistrar.class, ReflectHelpers.findClassLoader())));
+    for (CoderProviderRegistrar registrar : registrars) {
+        codersToRegister.addAll(registrar.getCoderProviders());
     }
 
-    // Build a map choosing the first coder within the multimap as the default
-    ImmutableMap.Builder<Class<?>, CoderFactory> registeredCoderFactoriesPerClassBuilder =
-        ImmutableMap.builder();
-    for (Map.Entry<Class<?>, Collection<CoderFactory>> entry
-        : codersToRegister.asMap().entrySet()) {
-      registeredCoderFactoriesPerClassBuilder.put(
-          entry.getKey(), entry.getValue().iterator().next());
-    }
-    REGISTERED_CODER_FACTORIES_PER_CLASS = registeredCoderFactoriesPerClassBuilder.build();
+    REGISTERED_CODER_FACTORIES = ImmutableList.copyOf(codersToRegister);
   }
 
   /**
    * Creates a CoderRegistry containing registrations for all standard coders part of the core Java
-   * Apache Beam SDK and also any registrations provided by {@link CoderRegistrar coder registrars}.
+   * Apache Beam SDK and also any registrations provided by
+   * {@link CoderProviderRegistrar coder registrars}.
    *
-   * <p>Multiple registrations for the same class result in the (in order of precedence):
+   * <p>Multiple registrations which can produce a coder for a given type result in a Coder created
+   * by the (in order of precedence):
    * <ul>
-   *   <li>Standard coder part of the core Apache Beam Java SDK being used.</li>
-   *   <li>The coder from the {@link CoderRegistrar} with the lexicographically smallest
-   *   {@link Class#getName() class name} being used.</li>
+   *   <li>{@link CoderProvider coder providers} registered programmatically through
+   *   {@link CoderRegistry#registerCoderProvider}.</li>
+   *   <li>{@link CoderProvider coder providers} for core types found within the Apache Beam Java
+   *   SDK being used.</li>
+   *   <li>The {@link CoderProvider coder providers} from the {@link CoderProviderRegistrar}
+   *   with the lexicographically smallest {@link Class#getName() class name} being used.</li>
    * </ul>
    */
   public static CoderRegistry createDefault() {
@@ -160,97 +175,69 @@ public class CoderRegistry implements CoderProvider {
   }
 
   private CoderRegistry() {
-    coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS);
-    setFallbackCoderProvider(
-        CoderProviders.firstOf(SerializableCoder.PROVIDER));
+    coderProviders = new LinkedList<>(REGISTERED_CODER_FACTORIES);
   }
 
   /**
-   * Registers {@code coderClazz} as the default {@link Coder} class to handle encoding and
-   * decoding instances of {@code clazz}, overriding prior registrations if any exist.
-   *
-   * <p>Supposing {@code T} is the static type corresponding to the {@code clazz}, then
-   * {@code coderClazz} should have a static factory method with the following signature:
-   *
-   * <pre> {@code
-   * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
-   * } </pre>
+   * Registers {@code coderProvider} as a potential {@link CoderProvider} which can produce
+   * {@code Coder} instances.
    *
-   * <p>This method will be called to create instances of {@code Coder<T>} for values of type
-   * {@code T}, passing Coders for each of the generic type parameters of {@code T}.  If {@code T}
-   * takes no generic type parameters, then the {@code of()} factory method should have no
-   * arguments.
+   * <p>This method prioritizes this {@link CoderProvider} over all prior registered coders.
    *
-   * <p>If {@code T} is a parameterized type, then it should additionally have a method with the
-   * following signature:
-   *
-   * <pre> {@code
-   * public static List<Object> getInstanceComponents(T exampleValue);
-   * } </pre>
-   *
-   * <p>This method will be called to decompose a value during the {@link Coder} inference process,
-   * to automatically choose {@link Coder Coders} for the components.
-   *
-   * @param clazz the class of objects to be encoded
-   * @param coderClazz a class with static factory methods to provide {@link Coder Coders}
+   * <p>See {@link CoderProviders} for common {@link CoderProvider} patterns.
    */
-  public void registerCoder(Class<?> clazz, Class<?> coderClazz) {
-    registerCoder(clazz, CoderFactories.fromStaticMethods(coderClazz));
+  public void registerCoderProvider(CoderProvider coderProvider) {
+    coderProviders.addFirst(coderProvider);
   }
 
   /**
-   * Registers {@code coderFactory} as the default {@link CoderFactory} to produce {@code Coder}
-   * instances to decode and encode instances of {@code clazz}. This will override prior
-   * registrations if any exist.
+   * Registers the provided {@link Coder} for the given class.
+   *
+   * <p>Note that this is equivalent to {@code registerCoderForType(TypeDescriptor.of(clazz))}. See
+   * {@link #registerCoderForType(TypeDescriptor, Coder)} for further details.
    */
-  public void registerCoder(Class<?> clazz, CoderFactory coderFactory) {
-    coderFactoryMap.put(clazz, coderFactory);
+  public void registerCoderForClass(Class<?> clazz, Coder<?> coder) {
+    registerCoderForType(TypeDescriptor.of(clazz), coder);
   }
 
   /**
-   * Register the provided {@link Coder} for encoding all values of the specified {@code Class}.
-   * This will override prior registrations if any exist.
+   * Registers the provided {@link Coder} for the given type.
    *
-   * <p>Not for use with generic rawtypes. Instead, register a {@link CoderFactory} via
-   * {@link #registerCoder(Class, CoderFactory)} or ensure your {@code Coder} class has the
-   * appropriate static methods and register it directly via {@link #registerCoder(Class, Class)}.
+   * <p>Note that this is equivalent to
+   * {@code registerCoderProvider(CoderProviders.forCoder(type, coder))}. See
+   * {@link #registerCoderProvider} and {@link CoderProviders#forCoder} for further details.
    */
-  public <T> void registerCoder(Class<T> rawClazz, Coder<T> coder) {
-    checkArgument(
-        rawClazz.getTypeParameters().length == 0,
-        "CoderRegistry.registerCoder(Class<T>, Coder<T>) may not be used "
-            + "with unspecialized generic classes");
-
-    CoderFactory factory = CoderFactories.forCoder(coder);
-    registerCoder(rawClazz, factory);
+  public void registerCoderForType(TypeDescriptor<?> type, Coder<?> coder) {
+    registerCoderProvider(CoderProviders.forCoder(type, coder));
   }
 
   /**
-   * Returns the {@link Coder} to use by default for values of the given type.
-   *
-   * @throws CannotProvideCoderException if there is no default Coder.
+   * Returns the {@link Coder} to use for values of the given class.
+
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  public <T> Coder<T> getDefaultCoder(TypeDescriptor<T> typeDescriptor)
-      throws CannotProvideCoderException {
-    return getDefaultCoder(typeDescriptor, Collections.<Type, Coder<?>>emptyMap());
+  public <T> Coder<T> getCoder(Class<T> clazz) throws CannotProvideCoderException {
+    return getCoder(TypeDescriptor.of(clazz));
   }
 
   /**
-   * See {@link #getDefaultCoder(TypeDescriptor)}.
+   * Returns the {@link Coder} to use for values of the given type.
+   *
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  @Override
-  public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
-      throws CannotProvideCoderException {
-    return getDefaultCoder(typeDescriptor);
+  public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+    return getCoderFromTypeDescriptor(type, Collections.<Type, Coder<?>>emptyMap());
   }
 
   /**
-   * Returns the {@link Coder} to use by default for values of the given type, where the given input
+   * Returns the {@link Coder} for values of the given type, where the given input
    * type uses the given {@link Coder}.
    *
-   * @throws CannotProvideCoderException if there is no default Coder.
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  public <InputT, OutputT> Coder<OutputT> getDefaultCoder(
+  @Deprecated
+  @Internal
+  public <InputT, OutputT> Coder<OutputT> getCoder(
       TypeDescriptor<OutputT> typeDescriptor,
       TypeDescriptor<InputT> inputTypeDescriptor,
       Coder<InputT> inputCoder)
@@ -258,22 +245,26 @@ public class CoderRegistry implements CoderProvider {
     checkArgument(typeDescriptor != null);
     checkArgument(inputTypeDescriptor != null);
     checkArgument(inputCoder != null);
-    return getDefaultCoder(
+    return getCoderFromTypeDescriptor(
         typeDescriptor, getTypeToCoderBindings(inputTypeDescriptor.getType(), inputCoder));
   }
 
   /**
    * Returns the {@link Coder} to use on elements produced by this function, given the {@link Coder}
    * used for its input elements.
+   *
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  public <InputT, OutputT> Coder<OutputT> getDefaultOutputCoder(
+  @Deprecated
+  @Internal
+  public <InputT, OutputT> Coder<OutputT> getOutputCoder(
       SerializableFunction<InputT, OutputT> fn, Coder<InputT> inputCoder)
       throws CannotProvideCoderException {
 
     ParameterizedType fnType = (ParameterizedType)
         TypeDescriptor.of(fn.getClass()).getSupertype(SerializableFunction.class).getType();
 
-    return getDefaultCoder(
+    return getCoder(
         fn.getClass(),
         SerializableFunction.class,
         ImmutableMap.of(fnType.getActualTypeArguments()[0], inputCoder),
@@ -284,9 +275,11 @@ public class CoderRegistry implements CoderProvider {
    * Returns the {@link Coder} to use for the specified type parameter specialization of the
    * subclass, given {@link Coder Coders} to use for all other type parameters (if any).
    *
-   * @throws CannotProvideCoderException if there is no default Coder.
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  public <T, OutputT> Coder<OutputT> getDefaultCoder(
+  @Deprecated
+  @Internal
+  public <T, OutputT> Coder<OutputT> getCoder(
       Class<? extends T> subClass,
       Class<T> baseClass,
       Map<Type, ? extends Coder<?>> knownCoders,
@@ -305,147 +298,11 @@ public class CoderRegistry implements CoderProvider {
     }
   }
 
-  /**
-   * Returns the {@link Coder} to use for the provided example value, if it can be determined.
-   *
-   * @throws CannotProvideCoderException if there is no default {@link Coder} or
-   * more than one {@link Coder} matches
-   */
-  public <T> Coder<T> getDefaultCoder(T exampleValue) throws CannotProvideCoderException {
-    Class<?> clazz = exampleValue == null ? Void.class : exampleValue.getClass();
-
-    if (clazz.getTypeParameters().length == 0) {
-      // Trust that getDefaultCoder returns a valid
-      // Coder<T> for non-generic clazz.
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) getDefaultCoder(clazz);
-      return coder;
-    } else {
-      CoderFactory factory = getDefaultCoderFactory(clazz);
-
-      List<Object> components = factory.getInstanceComponents(exampleValue);
-      if (components == null) {
-        throw new CannotProvideCoderException(String.format(
-            "Cannot provide coder based on value with class %s: The registered CoderFactory with "
-            + "class %s failed to decompose the value, which is required in order to provide "
-            + "Coders for the components.",
-            clazz.getCanonicalName(), factory.getClass().getCanonicalName()));
-      }
-
-      // componentcoders = components.map(this.getDefaultCoder)
-      List<Coder<?>> componentCoders = new ArrayList<>();
-      for (Object component : components) {
-        try {
-          Coder<?> componentCoder = getDefaultCoder(component);
-          componentCoders.add(componentCoder);
-        } catch (CannotProvideCoderException exc) {
-          throw new CannotProvideCoderException(
-              String.format("Cannot provide coder based on value with class %s",
-                  clazz.getCanonicalName()),
-              exc);
-        }
-      }
-
-      // Trust that factory.create maps from valid component Coders
-      // to a valid Coder<T>.
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) factory.create(componentCoders);
-      return coder;
-    }
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given class. The following three
-   * sources for a {@link Coder} will be attempted, in order:
-   *
-   * <ol>
-   *   <li>A {@link Coder} class registered explicitly via a call to {@link #registerCoder},
-   *   <li>A {@link DefaultCoder} annotation on the class,
-   *   <li>This registry's fallback {@link CoderProvider}, which may be able to generate a
-   *   {@link Coder} for an arbitrary class.
-   * </ol>
-   *
-   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
-   */
-  public <T> Coder<T> getDefaultCoder(Class<T> clazz) throws CannotProvideCoderException {
-
-    CannotProvideCoderException factoryException;
-    try {
-      CoderFactory coderFactory = getDefaultCoderFactory(clazz);
-      LOG.debug("Default coder for {} found by factory", clazz);
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) coderFactory.create(Collections.<Coder<?>>emptyList());
-      return coder;
-    } catch (CannotProvideCoderException exc) {
-      factoryException = exc;
-    }
-
-    CannotProvideCoderException annotationException;
-    try {
-      return getDefaultCoderFromAnnotation(clazz);
-    } catch (CannotProvideCoderException exc) {
-      annotationException = exc;
-    }
-
-    CannotProvideCoderException fallbackException;
-    if (getFallbackCoderProvider() != null) {
-      try {
-        return getFallbackCoderProvider().getCoder(TypeDescriptor.<T>of(clazz));
-      } catch (CannotProvideCoderException exc) {
-        fallbackException = exc;
-      }
-    } else {
-      fallbackException = new CannotProvideCoderException("no fallback CoderProvider configured");
-    }
-
-    // Build up the error message and list of causes.
-    StringBuilder messageBuilder = new StringBuilder()
-        .append("Unable to provide a default Coder for ").append(clazz.getCanonicalName())
-        .append(". Correct one of the following root causes:");
-
-    messageBuilder
-        .append("\n  Building a Coder using a registered CoderFactory failed: ")
-        .append(factoryException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the @DefaultCoder annotation failed: ")
-        .append(annotationException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the fallback CoderProvider failed: ")
-        .append(fallbackException.getMessage());
-
-    throw new CannotProvideCoderException(messageBuilder.toString());
-  }
-
-  /**
-   * Sets the fallback {@link CoderProvider} for this registry. If no other method succeeds in
-   * providing a {@code Coder<T>} for a type {@code T}, then the registry will attempt to create
-   * a {@link Coder} using this {@link CoderProvider}.
-   *
-   * <p>By default, this is set to {@link SerializableCoder#PROVIDER}.
-   *
-   * <p>See {@link #getFallbackCoderProvider}.
-   */
-  public void setFallbackCoderProvider(CoderProvider coderProvider) {
-    fallbackCoderProvider = coderProvider;
-  }
-
-  /**
-   * Returns the fallback {@link CoderProvider} for this registry.
-   *
-   * <p>See {@link #setFallbackCoderProvider}.
-   */
-  public CoderProvider getFallbackCoderProvider() {
-    return fallbackCoderProvider;
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**
    * Returns a {@code Map} from each of {@code baseClass}'s type parameters to the {@link Coder} to
-   * use by default for it, in the context of {@code subClass}'s specialization of
-   * {@code baseClass}.
+   * use for it, in the context of {@code subClass}'s specialization of {@code baseClass}.
    *
    * <p>If no {@link Coder} can be inferred for a particular type parameter, then that type variable
    * will be absent from the returned {@code Map}.
@@ -465,7 +322,7 @@ public class CoderRegistry implements CoderProvider {
    * <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a
    * {@link Coder} for a particular type variable cannot be inferred, but merely omits the entry
    * from the returned {@code Map}. It is the responsibility of the caller (usually
-   * {@link #getDefaultCoder} to extract the desired coder or throw a
+   * {@link #getCoderFromTypeDescriptor} to extract the desired coder or throw a
    * {@link CannotProvideCoderException} when appropriate.
    *
    * @param subClass the concrete type whose specializations are being inferred
@@ -495,8 +352,7 @@ public class CoderRegistry implements CoderProvider {
 
   /**
    * Returns an array listing, for each of {@code baseClass}'s type parameters, the {@link Coder} to
-   * use by default for it, in the context of {@code subClass}'s specialization of
-   * {@code baseClass}.
+   * use for it, in the context of {@code subClass}'s specialization of {@code baseClass}.
    *
    * <p>If a {@link Coder} cannot be inferred for a type variable, its slot in the resulting array
    * will be {@code null}.
@@ -518,7 +374,7 @@ public class CoderRegistry implements CoderProvider {
    * <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a
    * {@link Coder} for a particular type variable cannot be inferred. Instead, it results in a
    * {@code null} in the array. It is the responsibility of the caller (usually
-   * {@link #getDefaultCoder} to extract the desired coder or throw a
+   * {@link #getCoderFromTypeDescriptor} to extract the desired coder or throw a
    * {@link CannotProvideCoderException} when appropriate.
    *
    * @param subClass the concrete type whose specializations are being inferred
@@ -571,7 +427,7 @@ public class CoderRegistry implements CoderProvider {
         result[i] = knownCoders[i];
       } else {
         try {
-          result[i] = getDefaultCoder(typeArgs[i], context);
+          result[i] = getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgs[i]), context);
         } catch (CannotProvideCoderException exc) {
           result[i] = null;
         }
@@ -695,89 +551,31 @@ public class CoderRegistry implements CoderProvider {
   }
 
   /**
-   * The map of classes to the CoderFactories to use to create their
-   * default Coders.
+   * The list of {@link CoderProvider coder providers} to use to provide Coders.
    */
-  private Map<Class<?>, CoderFactory> coderFactoryMap;
-
-  /**
-   * A provider of coders for types where no coder is registered.
-   */
-  private CoderProvider fallbackCoderProvider;
-
-  /**
-   * Returns the {@link CoderFactory} to use to create default {@link Coder Coders} for instances of
-   * the given class, or {@code null} if there is no default {@link CoderFactory} registered.
-   */
-  private CoderFactory getDefaultCoderFactory(Class<?> clazz) throws CannotProvideCoderException {
-    CoderFactory coderFactoryOrNull = coderFactoryMap.get(clazz);
-    if (coderFactoryOrNull != null) {
-      return coderFactoryOrNull;
-    } else {
-      throw new CannotProvideCoderException(
-          String.format("Cannot provide coder based on value with class %s: No CoderFactory has "
-              + "been registered for the class.", clazz.getCanonicalName()));
-    }
-  }
-
-  /**
-   * Returns the {@link Coder} returned according to the {@link CoderProvider} from any
-   * {@link DefaultCoder} annotation on the given class.
-   */
-  private <T> Coder<T> getDefaultCoderFromAnnotation(Class<T> clazz)
-      throws CannotProvideCoderException {
-    DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
-    if (defaultAnnotation == null) {
-      throw new CannotProvideCoderException(
-          String.format("Class %s does not have a @DefaultCoder annotation.",
-              clazz.getCanonicalName()));
-    }
-
-    LOG.debug("DefaultCoder annotation found for {} with value {}",
-        clazz, defaultAnnotation.value());
-    CoderProvider coderProvider = CoderProviders.fromStaticMethods(defaultAnnotation.value());
-    return coderProvider.getCoder(TypeDescriptor.of(clazz));
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given type,
-   * in a context where the given types use the given coders.
-   *
-   * @throws CannotProvideCoderException if a coder cannot be provided
-   */
-  private <T> Coder<T> getDefaultCoder(
-      TypeDescriptor<T> typeDescriptor,
-      Map<Type, Coder<?>> typeCoderBindings)
-      throws CannotProvideCoderException {
-
-    Coder<?> defaultCoder = getDefaultCoder(typeDescriptor.getType(), typeCoderBindings);
-    LOG.debug("Default coder for {}: {}", typeDescriptor, defaultCoder);
-    @SuppressWarnings("unchecked")
-    Coder<T> result = (Coder<T>) defaultCoder;
-    return result;
-  }
+  private LinkedList<CoderProvider> coderProviders;
 
   /**
-   * Returns the {@link Coder} to use by default for values of the given type,
+   * Returns a {@link Coder} to use for values of the given type,
    * in a context where the given types use the given coders.
    *
    * @throws CannotProvideCoderException if a coder cannot be provided
    */
-  private Coder<?> getDefaultCoder(Type type, Map<Type, Coder<?>> typeCoderBindings)
+  private <T> Coder<T> getCoderFromTypeDescriptor(
+      TypeDescriptor<T> typeDescriptor, Map<Type, Coder<?>> typeCoderBindings)
       throws CannotProvideCoderException {
-    Coder<?> coder = typeCoderBindings.get(type);
-    if (coder != null) {
-      return coder;
-    }
-    if (type instanceof Class<?>) {
-      Class<?> clazz = (Class<?>) type;
-      return getDefaultCoder(clazz);
+    Type type = typeDescriptor.getType();
+    Coder<?> coder;
+    if (typeCoderBindings.containsKey(type)) {
+      coder = typeCoderBindings.get(type);
+    } else if (type instanceof Class<?>) {
+      coder = getCoderFromFactories(typeDescriptor, Collections.<Coder<?>>emptyList());
     } else if (type instanceof ParameterizedType) {
-      return getDefaultCoder((ParameterizedType) type, typeCoderBindings);
+      coder = getCoderFromParameterizedType((ParameterizedType) type, typeCoderBindings);
     } else if (type instanceof TypeVariable) {
-      return getDefaultCoder(TypeDescriptor.of(type).getRawType());
+      coder = getCoderFromFactories(typeDescriptor, Collections.<Coder<?>>emptyList());
     } else if (type instanceof WildcardType) {
-      // No default coder for an unknown generic type.
+      // No coder for an unknown generic type.
       throw new CannotProvideCoderException(
           String.format("Cannot provide a coder for type variable %s"
           + " (declared by %s) because the actual type is unknown due to erasure.",
@@ -788,72 +586,70 @@ public class CoderRegistry implements CoderProvider {
       throw new RuntimeException(
           "Internal error: unexpected kind of Type: " + type);
     }
+
+    LOG.debug("Coder for {}: {}", typeDescriptor, coder);
+    @SuppressWarnings("unchecked")
+    Coder<T> result = (Coder<T>) coder;
+    return result;
   }
 
   /**
-   * Returns the {@link Coder} to use by default for values of the given
+   * Returns a {@link Coder} to use for values of the given
    * parameterized type, in a context where the given types use the
    * given {@link Coder Coders}.
    *
    * @throws CannotProvideCoderException if no coder can be provided
    */
-  private Coder<?> getDefaultCoder(
+  private Coder<?> getCoderFromParameterizedType(
       ParameterizedType type,
       Map<Type, Coder<?>> typeCoderBindings)
           throws CannotProvideCoderException {
 
-    CannotProvideCoderException factoryException;
-    try {
-      return getDefaultCoderFromFactory(type, typeCoderBindings);
-    } catch (CannotProvideCoderException exc) {
-      factoryException = exc;
-    }
-
-    CannotProvideCoderException annotationException;
-    try {
-      Class<?> rawClazz = (Class<?>) type.getRawType();
-      return getDefaultCoderFromAnnotation(rawClazz);
-    } catch (CannotProvideCoderException exc) {
-      annotationException = exc;
-    }
-
-    // Build up the error message and list of causes.
-    StringBuilder messageBuilder = new StringBuilder()
-        .append("Unable to provide a default Coder for ").append(type)
-        .append(". Correct one of the following root causes:");
-
-    messageBuilder
-        .append("\n  Building a Coder using a registered CoderFactory failed: ")
-        .append(factoryException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the @DefaultCoder annotation failed: ")
-        .append(annotationException.getMessage());
-
-    throw new CannotProvideCoderException(messageBuilder.toString());
-  }
-
-  private Coder<?> getDefaultCoderFromFactory(
-      ParameterizedType type,
-      Map<Type, Coder<?>> typeCoderBindings)
-          throws CannotProvideCoderException {
-    Class<?> rawClazz = (Class<?>) type.getRawType();
-    CoderFactory coderFactory = getDefaultCoderFactory(rawClazz);
     List<Coder<?>> typeArgumentCoders = new ArrayList<>();
     for (Type typeArgument : type.getActualTypeArguments()) {
       try {
-        Coder<?> typeArgumentCoder = getDefaultCoder(typeArgument,
-                                                     typeCoderBindings);
+        Coder<?> typeArgumentCoder =
+            getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
         typeArgumentCoders.add(typeArgumentCoder);
       } catch (CannotProvideCoderException exc) {
-         throw new CannotProvideCoderException(
-          String.format("Cannot provide coder for parameterized type %s: %s",
-              type,
-              exc.getMessage()),
-          exc);
+        throw new CannotProvideCoderException(
+            String.format("Cannot provide coder for parameterized type %s: %s",
+                type,
+                exc.getMessage()),
+            exc);
       }
     }
-    return coderFactory.create(typeArgumentCoders);
+    return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders);
+  }
+
+  /**
+   * Attempts to create a {@link Coder} from any registered {@link CoderProvider} returning
+   * the first successfully created instance.
+   */
+  private Coder<?> getCoderFromFactories(
+      TypeDescriptor<?> typeDescriptor, List<Coder<?>> typeArgumentCoders)
+      throws CannotProvideCoderException {
+    List<CannotProvideCoderException> suppressedExceptions = new ArrayList<>();
+    for (CoderProvider coderProvider : coderProviders) {
+      try {
+        return coderProvider.coderFor(typeDescriptor, typeArgumentCoders);
+      } catch (CannotProvideCoderException e) {
+        // Add all failures as suppressed exceptions.
+        suppressedExceptions.add(e);
+      }
+    }
+
+    // Build up the error message and list of causes.
+    StringBuilder messageBuilder = new StringBuilder()
+        .append("Unable to provide a Coder for ").append(typeDescriptor).append(".\n")
+        .append("  Building a Coder using a registered CoderProvider failed.\n")
+        .append("  See suppressed exceptions for detailed failures.");
+    CannotProvideCoderException exceptionOnFailure =
+        new CannotProvideCoderException(messageBuilder.toString());
+    for (CannotProvideCoderException suppressedException : suppressedExceptions) {
+      exceptionOnFailure.addSuppressed(suppressedException);
+    }
+    throw exceptionOnFailure;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
index 523b69b..1762308 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
@@ -46,15 +46,6 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
     return decodedElements;
   }
 
-  /**
-   * Returns the first element in this collection if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Collection<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
   protected CollectionCoder(Coder<T> elemCoder) {
     super(elemCoder, "Collection");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index f33e210..edbaa7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -94,14 +94,6 @@ public abstract class CustomCoder<T> extends Coder<T>
   }
 
   /**
-   * Returns an empty list. A {@link CustomCoder} by default will not have component coders that are
-   * used for inference.
-   */
-  public static <T> List<Object> getInstanceComponents(T exampleValue) {
-    return Collections.emptyList();
-  }
-
-  /**
    * {@inheritDoc}
    *
    * @throws NonDeterministicException a {@link CustomCoder} is presumed

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
index 9a976f9..6eff9e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
@@ -17,45 +17,33 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * The {@link DefaultCoder} annotation
- * specifies a default {@link Coder} class to handle encoding and decoding
- * instances of the annotated class.
+ * The {@link DefaultCoder} annotation specifies a {@link Coder} class to handle encoding and
+ * decoding instances of the annotated class.
  *
- * <p>The specified {@link Coder} must satisfy the requirements of
- * {@link CoderProviders#fromStaticMethods}. Two classes provided by the SDK that
- * are intended for use with this annotation include {@link SerializableCoder}
- * and {@link AvroCoder}.
+ * <p>The specified {@link Coder} must have the following method:
+ * <pre>
+ * {@code public static CoderProvider getCoderProvider()}.
+ * </pre>
  *
- * <p>To configure the use of Java serialization as the default
- * for a class, annotate the class to use
- * {@link SerializableCoder} as follows:
- *
- * <pre><code>{@literal @}DefaultCoder(SerializableCoder.class)
- * public class MyCustomDataType implements Serializable {
- *   // ...
- * }</code></pre>
- *
- * <p>Similarly, to configure the use of
- * {@link AvroCoder} as the default:
- * <pre><code>{@literal @}DefaultCoder(AvroCoder.class)
- * public class MyCustomDataType {
- *   public MyCustomDataType() {}  // Avro requires an empty constructor.
- *   // ...
- * }</code></pre>
- *
- * <p>Coders specified explicitly via
- * {@link PCollection#setCoder}
- * take precedence, followed by Coders registered at runtime via
- * {@link CoderRegistry#registerCoder}. See {@link CoderRegistry} for a more detailed discussion
- * of the precedence rules.
+ * <p>Coders specified explicitly via {@link PCollection#setCoder} take precedence, followed by
+ * Coders found at runtime via {@link CoderRegistry#getCoder}.
+ * See {@link CoderRegistry} for a more detailed discussion of the precedence rules.
  */
 @Documented
 @Retention(RetentionPolicy.RUNTIME)
@@ -63,4 +51,77 @@ import org.apache.beam.sdk.values.PCollection;
 @SuppressWarnings("rawtypes")
 public @interface DefaultCoder {
   Class<? extends Coder> value();
+
+  /**
+   * A {@link CoderProviderRegistrar} that registers a {@link CoderProvider} which can use
+   * the {@code @DefaultCoder} annotation to provide {@link CoderProvider coder providers} that
+   * creates {@link Coder}s.
+   */
+  @AutoService(CoderProviderRegistrar.class)
+  class DefaultCoderProviderRegistrar implements CoderProviderRegistrar {
+
+    @Override
+    public List<CoderProvider> getCoderProviders() {
+      return ImmutableList.<CoderProvider>of(new DefaultCoderProvider());
+    }
+
+    /**
+     * A {@link CoderProvider} that uses the {@code @DefaultCoder} annotation to provide
+     * {@link CoderProvider coder providers} that create {@link Coder}s.
+     */
+    static class DefaultCoderProvider extends CoderProvider {
+      private static final Logger LOG = LoggerFactory.getLogger(DefaultCoderProvider.class);
+
+      /**
+       * Returns the {@link Coder} returned according to the {@link CoderProvider} from any
+       * {@link DefaultCoder} annotation on the given class.
+       */
+      @Override
+      public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+          List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+
+        Class<?> clazz = typeDescriptor.getRawType();
+        DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
+        if (defaultAnnotation == null) {
+          throw new CannotProvideCoderException(
+              String.format("Class %s does not have a @DefaultCoder annotation.",
+                  clazz.getName()));
+        }
+
+        if (defaultAnnotation.value() == null) {
+          throw new CannotProvideCoderException(
+              String.format("Class %s has a @DefaultCoder annotation with a null value.",
+                  clazz.getName()));
+        }
+
+        LOG.debug("DefaultCoder annotation found for {} with value {}",
+            clazz, defaultAnnotation.value());
+
+        Method coderProviderMethod;
+        try {
+          coderProviderMethod = defaultAnnotation.value().getMethod("getCoderProvider");
+        } catch (NoSuchMethodException e) {
+          throw new CannotProvideCoderException(String.format(
+              "Unable to find 'public static CoderProvider getCoderProvider()' on %s",
+              defaultAnnotation.value()),
+              e);
+        }
+
+        CoderProvider coderProvider;
+        try {
+          coderProvider = (CoderProvider) coderProviderMethod.invoke(null);
+        } catch (IllegalAccessException
+            | IllegalArgumentException
+            | InvocationTargetException
+            | NullPointerException
+            | ExceptionInInitializerError e) {
+          throw new CannotProvideCoderException(String.format(
+              "Unable to invoke 'public static CoderProvider getCoderProvider()' on %s",
+              defaultAnnotation.value()),
+              e);
+        }
+        return coderProvider.coderFor(typeDescriptor, componentCoders);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index 02c3d0f..b600b1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -41,15 +41,6 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
     return decodedElements;
   }
 
-  /**
-   * Returns the first element in this iterable if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Iterable<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
   protected IterableCoder(Coder<T> elemCoder) {
     super(elemCoder, "Iterable");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 8e10ca2..52b9c34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -75,18 +75,6 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
   private final Coder<T> elementCoder;
   private final String iterableName;
 
-  /**
-   * Returns the first element in the iterable-like {@code exampleValue} if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  protected static <T, IterableT extends Iterable<T>>
-      List<Object> getInstanceComponentsHelper(IterableT exampleValue) {
-    for (T value : exampleValue) {
-      return Arrays.<Object>asList(value);
-    }
-    return null;
-  }
-
   protected IterableLikeCoder(Coder<T> elementCoder, String  iterableName) {
     checkArgument(elementCoder != null, "element Coder for IterableLikeCoder must not be null");
     checkArgument(iterableName != null, "iterable name for IterableLikeCoder must not be null");

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 35b7449..da7f03c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -39,13 +39,6 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
     return new KvCoder<>(keyCoder, valueCoder);
   }
 
-  public static <K, V> List<Object> getInstanceComponents(
-      KV<K, V> exampleValue) {
-    return Arrays.asList(
-        exampleValue.getKey(),
-        exampleValue.getValue());
-  }
-
   public Coder<K> getKeyCoder() {
     return keyCoder;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 70bbf93..25f3ee9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -40,14 +40,6 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
     return decodedElements;
   }
 
-  /**
-   * Returns the first element in this list if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(List<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
   protected ListCoder(Coder<T> elemCoder) {
     super(elemCoder, "List");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index da2bf50..9e3c768 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -50,18 +50,6 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
     return new MapCoder<>(keyCoder, valueCoder);
   }
 
-  /**
-   * Returns the key and value for an arbitrary element of this map,
-   * if it is non-empty, otherwise returns {@code null}.
-   */
-   public static <K, V> List<Object> getInstanceComponents(
-       Map<K, V> exampleValue) {
-     for (Map.Entry<K, V> entry : exampleValue.entrySet()) {
-       return Arrays.asList(entry.getKey(), entry.getValue());
-     }
-     return null;
-   }
-
   public Coder<K> getKeyCoder() {
     return keyCoder;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index b52b9db..e3b2959 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.List;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -63,29 +66,45 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
   }
 
   /**
-   * A {@link CoderProvider} that constructs a {@link SerializableCoder}
-   * for any class that implements serializable.
+   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible for
+   * all types.
+   *
+   * <p>This method is invoked reflectively from {@link DefaultCoder}.
    */
-  public static final CoderProvider PROVIDER = new CoderProvider() {
+  @SuppressWarnings("unused")
+  public static CoderProvider getCoderProvider() {
+    return new SerializableCoderProvider();
+  }
+
+  /**
+   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
+   * serializable types.
+   */
+  @AutoService(CoderProviderRegistrar.class)
+  public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar {
+
     @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
-        throws CannotProvideCoderException {
-      Class<?> clazz = typeDescriptor.getRawType();
-      if (Serializable.class.isAssignableFrom(clazz)) {
-        @SuppressWarnings("unchecked")
-        Class<? extends Serializable> serializableClazz =
-            (Class<? extends Serializable>) clazz;
-        @SuppressWarnings("unchecked")
-        Coder<T> coder = (Coder<T>) SerializableCoder.of(serializableClazz);
-        return coder;
-      } else {
-        throw new CannotProvideCoderException(
-            "Cannot provide SerializableCoder because " + typeDescriptor
-            + " does not implement Serializable");
-      }
+    public List<CoderProvider> getCoderProviders() {
+      return ImmutableList.of(getCoderProvider());
     }
-  };
+  }
 
+  /**
+   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that
+   * implements serializable.
+   */
+  static class SerializableCoderProvider extends CoderProvider {
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
+        return SerializableCoder.of((TypeDescriptor) typeDescriptor);
+      }
+      throw new CannotProvideCoderException(
+          "Cannot provide SerializableCoder because " + typeDescriptor
+              + " does not implement Serializable");
+    }
+  }
 
   private final Class<T> type;
   private final TypeDescriptor<T> typeDescriptor;

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index da16165..baec128 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -56,15 +56,6 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
         new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor());
   }
 
-  /**
-   * Returns the first element in this set if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Set<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
   /////////////////////////////////////////////////////////////////////////////
   // Internal operations below here.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index 7fc094f..9a7b125 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -37,13 +37,6 @@ public class VarLongCoder extends StructuredCoder<Long> {
     return INSTANCE;
   }
 
-  /**
-   * Returns an empty list. {@link VarLongCoder} has no components.
-   */
-  public static <T> List<Object> getInstanceComponents(T ignored) {
-    return Collections.emptyList();
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   private static final VarLongCoder INSTANCE = new VarLongCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
index 29990cd..20ec300 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
@@ -125,14 +125,14 @@ public class CombineFnBase {
     @Override
     public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
         throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
+      return registry.getCoder(getClass(), AbstractGlobalCombineFn.class,
           ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder), getAccumTVariable());
     }
 
     @Override
     public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputT> inputCoder)
         throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
+      return registry.getCoder(getClass(), AbstractGlobalCombineFn.class,
           ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder, getAccumTVariable(),
               this.getAccumulatorCoder(registry, inputCoder)),
           getOutputTVariable());

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index d4c97bc..0515ed5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -339,7 +339,7 @@ public class CombineFns {
       List<Coder<Object>> coders = Lists.newArrayList();
       for (int i = 0; i < combineFnCount; ++i) {
         Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
+            registry.getOutputCoder(extractInputFns.get(i), dataCoder);
         coders.add(combineFns.get(i).getAccumulatorCoder(registry, inputCoder));
       }
       return new ComposedAccumulatorCoder(coders);
@@ -478,7 +478,7 @@ public class CombineFns {
       List<Coder<Object>> coders = Lists.newArrayList();
       for (int i = 0; i < combineFnCount; ++i) {
         Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
+            registry.getOutputCoder(extractInputFns.get(i), dataCoder);
         coders.add(combineFnWithContexts.get(i).getAccumulatorCoder(registry, inputCoder));
       }
       return new ComposedAccumulatorCoder(coders);

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 5624f2f..7af8fb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -27,17 +27,25 @@ import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CollectionCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.OffsetBasedSource;
@@ -317,7 +325,7 @@ public class Create<T> {
       if (coder.isPresent()) {
         return coder.get();
       } else if (typeDescriptor.isPresent()) {
-        return input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get());
+        return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
       } else {
         return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems);
       }
@@ -566,7 +574,7 @@ public class Create<T> {
       if (elementCoder.isPresent()) {
         return elementCoder.get();
       } else if (typeDescriptor.isPresent()) {
-        return input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get());
+        return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
       } else {
         Iterable<T> rawElements =
             Iterables.transform(
@@ -611,7 +619,7 @@ public class Create<T> {
     if (elementClazz.getTypeParameters().length == 0) {
       try {
         @SuppressWarnings("unchecked") // elementClazz is a wildcard type
-        Coder<T> coder = (Coder<T>) registry.getDefaultCoder(TypeDescriptor.of(elementClazz));
+        Coder<T> coder = (Coder<T>) registry.getCoder(TypeDescriptor.of(elementClazz));
         return coder;
       } catch (CannotProvideCoderException exc) {
         // Can't get a coder from the class of the elements, try with the elements next
@@ -619,11 +627,20 @@ public class Create<T> {
     }
 
     // If that fails, try to deduce a coder using the elements themselves
-    Optional<Coder<T>> coder = Optional.absent();
-    for (T elem : elems) {
-      Coder<T> c = registry.getDefaultCoder(elem);
+    return (Coder<T>) inferCoderFromObjects(registry, elems);
+  }
+
+  /**
+   * Attempts to infer the {@link Coder} of the elements ensuring that the returned coder is
+   * equivalent for all elements.
+   */
+  private static Coder<?> inferCoderFromObjects(
+      CoderRegistry registry, Iterable<?> elems) throws CannotProvideCoderException {
+    Optional<Coder<?>> coder = Optional.absent();
+    for (Object elem : elems) {
+      Coder<?> c = inferCoderFromObject(registry, elem);
       if (!coder.isPresent()) {
-        coder = Optional.of(c);
+        coder = (Optional) Optional.of(c);
       } else if (!Objects.equals(c, coder.get())) {
         throw new CannotProvideCoderException(
             "Cannot provide coder for elements of "
@@ -633,11 +650,48 @@ public class Create<T> {
                 + " Based on their values, they do not all default to the same Coder.");
       }
     }
+    if (coder.isPresent()) {
+      return coder.get();
+    }
 
-    if (!coder.isPresent()) {
-      throw new CannotProvideCoderException(
-          "Unable to infer a coder. Please register " + "a coder for ");
+    throw new CannotProvideCoderException("Cannot provide coder for elements of "
+        + Create.class.getSimpleName()
+        + ":"
+        + " For their common class, no coder could be provided."
+        + " Based on their values, no coder could be inferred.");
+  }
+
+  /**
+   * Attempt to infer the type for some very common Apache Beam parameterized types.
+   *
+   * <p>TODO: Instead, build a TypeDescriptor so that the {@link CoderRegistry} is invoked
+   * for the type instead of hard coding the coders for common types.
+   */
+  private static Coder<?> inferCoderFromObject(CoderRegistry registry, Object o)
+      throws CannotProvideCoderException {
+    if (o == null) {
+      return VoidCoder.of();
+    } else if (o instanceof TimestampedValue) {
+      return TimestampedValueCoder.of(
+          inferCoderFromObject(registry, ((TimestampedValue) o).getValue()));
+    } else if (o instanceof List) {
+      return ListCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+    } else if (o instanceof Set) {
+      return SetCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+    } else if (o instanceof Collection) {
+      return CollectionCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+    } else if (o instanceof Iterable) {
+      return IterableCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+    } else if (o instanceof Map) {
+      return MapCoder.of(
+          inferCoderFromObjects(registry, ((Map) o).keySet()),
+          inferCoderFromObjects(registry, ((Map) o).entrySet()));
+    } else if (o instanceof KV) {
+      return KvCoder.of(
+          inferCoderFromObject(registry, ((KV) o).getKey()),
+          inferCoderFromObject(registry, ((KV) o).getValue()));
+    } else {
+      return registry.getCoder(o.getClass());
     }
-    return coder.get();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 15abd98..d5df944 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -117,7 +117,7 @@ import org.apache.beam.sdk.values.TupleTag;
  * mapping from Java types to the default Coder to use, for a standard
  * set of Java types; users can extend this mapping for additional
  * types, via
- * {@link org.apache.beam.sdk.coders.CoderRegistry#registerCoder}.
+ * {@link org.apache.beam.sdk.coders.CoderRegistry#registerCoderProvider}.
  * If this inference process fails, either because the Java type was
  * not known at run-time (e.g., due to Java's "erasure" of generic
  * types) or there was no default Coder registered, then the Coder
@@ -281,7 +281,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
    * @throws CannotProvideCoderException if no coder can be inferred
    */
   protected Coder<?> getDefaultOutputCoder() throws CannotProvideCoderException {
-    throw new CannotProvideCoderException("PTransform.getDefaultOutputCoder called.");
+    throw new CannotProvideCoderException("PTransform.getOutputCoder called.");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index e67dbe1..edf1419 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -477,10 +477,10 @@ public class ParDo {
       Type typeArgument = typeArguments[i];
       TypeDescriptor<?> typeDescriptor = TypeDescriptor.of(typeArgument);
       try {
-        coders[i] = coderRegistry.getDefaultCoder(typeDescriptor);
+        coders[i] = coderRegistry.getCoder(typeDescriptor);
       } catch (CannotProvideCoderException e) {
         try {
-          coders[i] = coderRegistry.getDefaultCoder(
+          coders[i] = coderRegistry.getCoder(
               typeDescriptor, inputCoder.getEncodedTypeDescriptor(), inputCoder);
         } catch (CannotProvideCoderException ignored) {
           // Since not all type arguments will have a registered coder we ignore this exception.
@@ -623,7 +623,7 @@ public class ParDo {
     @SuppressWarnings("unchecked")
     protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
         throws CannotProvideCoderException {
-      return input.getPipeline().getCoderRegistry().getDefaultCoder(
+      return input.getPipeline().getCoderRegistry().getCoder(
           getFn().getOutputTypeDescriptor(),
           getFn().getInputTypeDescriptor(),
           ((PCollection<InputT>) input).getCoder());
@@ -767,7 +767,7 @@ public class ParDo {
         throws CannotProvideCoderException {
       @SuppressWarnings("unchecked")
       Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
-      return input.getPipeline().getCoderRegistry().getDefaultCoder(
+      return input.getPipeline().getCoderRegistry().getCoder(
           output.getTypeDescriptor(),
           getFn().getInputTypeDescriptor(),
           inputCoder);

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index dd38006..c66d1b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -124,9 +124,9 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
       Coder<K> keyCoder;
       CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
       if (keyClass == null) {
-        keyCoder = coderRegistry.getDefaultOutputCoder(fn, in.getCoder());
+        keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
       } else {
-        keyCoder = coderRegistry.getDefaultCoder(TypeDescriptor.of(keyClass));
+        keyCoder = coderRegistry.getCoder(TypeDescriptor.of(keyClass));
       }
       // TODO: Remove when we can set the coder inference context.
       result.setCoder(KvCoder.of(keyCoder, in.getCoder()));

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 0276ba6..0bfb875 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -87,12 +87,6 @@ public class GlobalWindow extends BoundedWindow {
       return Collections.emptyList();
     }
 
-    /**
-     * Returns an empty list. The Global Window Coder has no components.
-     */
-    public static <T> List<Object> getInstanceComponents(T exampleValue) {
-      return Collections.emptyList();
-    }
     private Coder() {}
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index fd2a2d8..46ece09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -178,13 +178,6 @@ public class IntervalWindow extends BoundedWindow
       return INSTANCE;
     }
 
-    /**
-     * Returns an empty list. {@link IntervalWindowCoder} has no components.
-     */
-    public static <T> List<Object> getInstanceComponents(T value) {
-      return Collections.emptyList();
-    }
-
     @Override
     public void encode(IntervalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 1095fb8..f210fd8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -142,7 +142,7 @@ public class PCollection<T> extends PValueBase implements PValue {
     CannotProvideCoderException inferFromTokenException = null;
     if (token != null) {
       try {
-        return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
+        return new CoderOrFailure<>(registry.getCoder(token), null);
       } catch (CannotProvideCoderException exc) {
         inferFromTokenException = exc;
         // Attempt to detect when the token came from a TupleTag used for a ParDo output,

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index a9f3929..c172885 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -132,10 +132,6 @@ public class TimestampedValue<V> {
       return valueCoder;
     }
 
-    public static <T> List<Object> getInstanceComponents(TimestampedValue<T> exampleValue) {
-      return Arrays.<Object>asList(exampleValue.getValue());
-    }
-
     @Override
     public TypeDescriptor<TimestampedValue<T>> getEncodedTypeDescriptor() {
       return new TypeDescriptor<TimestampedValue<T>>() {}.where(

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
deleted file mode 100644
index 4ffc9c1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Collections;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link CoderFactories}.
- */
-@RunWith(JUnit4.class)
-public class CoderFactoriesTest {
-
-  /**
-   * Ensures that a few of our standard atomic coder classes
-   * can each be built into a factory that works as expected.
-   * It is presumed that testing a few, not all, suffices to
-   * exercise CoderFactoryFromStaticMethods.
-   */
-  @Test
-  public void testAtomicCoderClassFactories() {
-    checkAtomicCoderFactory(StringUtf8Coder.class, StringUtf8Coder.of());
-    checkAtomicCoderFactory(DoubleCoder.class, DoubleCoder.of());
-    checkAtomicCoderFactory(ByteArrayCoder.class, ByteArrayCoder.of());
-  }
-
-  /**
-   * Checks that {#link CoderFactories.fromStaticMethods} successfully
-   * builds a working {@link CoderFactory} from {@link KvCoder KvCoder.class}.
-   */
-  @Test
-  public void testKvCoderFactory() {
-    CoderFactory kvCoderFactory = CoderFactories.fromStaticMethods(KvCoder.class);
-    assertEquals(
-        KvCoder.of(DoubleCoder.of(), DoubleCoder.of()),
-        kvCoderFactory.create(Arrays.asList(DoubleCoder.of(), DoubleCoder.of())));
-  }
-
-  /**
-   * Checks that {#link CoderFactories.fromStaticMethods} successfully
-   * builds a working {@link CoderFactory} from {@link ListCoder ListCoder.class}.
-   */
-  @Test
-  public void testListCoderFactory() {
-    CoderFactory listCoderFactory = CoderFactories.fromStaticMethods(ListCoder.class);
-
-    assertEquals(
-        ListCoder.of(DoubleCoder.of()),
-        listCoderFactory.create(Arrays.asList(DoubleCoder.of())));
-  }
-
-  /**
-   * Checks that {#link CoderFactories.fromStaticMethods} successfully
-   * builds a working {@link CoderFactory} from {@link IterableCoder IterableCoder.class}.
-   */
-  @Test
-  public void testIterableCoderFactory() {
-    CoderFactory iterableCoderFactory = CoderFactories.fromStaticMethods(IterableCoder.class);
-
-    assertEquals(
-        IterableCoder.of(DoubleCoder.of()),
-        iterableCoderFactory.create(Arrays.asList(DoubleCoder.of())));
-  }
-
-  ///////////////////////////////////////////////////////////////////////
-
-  /**
-   * Checks that an atomic coder class can be converted into
-   * a factory that then yields a coder equal to the example
-   * provided.
-   */
-  private <T> void checkAtomicCoderFactory(
-      Class<? extends Coder<T>> coderClazz,
-      Coder<T> expectedCoder) {
-    CoderFactory factory = CoderFactories.fromStaticMethods(coderClazz);
-    @SuppressWarnings("unchecked")
-    Coder<T> actualCoder = (Coder<T>) factory.create(Collections.<Coder<?>>emptyList());
-    assertEquals(expectedCoder, actualCoder);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
index 44be56d..2aa8351 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
@@ -17,54 +17,78 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
 
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Rule;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for {@link CoderFactories}.
+ * Tests for {@link CoderProviders}.
  */
 @RunWith(JUnit4.class)
 public class CoderProvidersTest {
+  @Test
+  public void testCoderProvidersFromStaticMethodsForParameterlessTypes() throws Exception {
+    CoderProvider factory = CoderProviders.fromStaticMethods(String.class, StringUtf8Coder.class);
+    assertEquals(StringUtf8Coder.of(),
+        factory.coderFor(TypeDescriptors.strings(), Collections.<Coder<?>>emptyList()));
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
+    factory = CoderProviders.fromStaticMethods(Double.class, DoubleCoder.class);
+    assertEquals(DoubleCoder.of(),
+        factory.coderFor(TypeDescriptors.doubles(), Collections.<Coder<?>>emptyList()));
 
-  @Test
-  public void testAvroThenSerializableStringMap() throws Exception {
-    CoderProvider provider = CoderProviders.firstOf(AvroCoder.PROVIDER, SerializableCoder.PROVIDER);
-    Coder<Map<String, String>> coder =
-        provider.getCoder(new TypeDescriptor<Map<String, String>>(){});
-    assertThat(coder, instanceOf(AvroCoder.class));
+    factory = CoderProviders.fromStaticMethods(byte[].class, ByteArrayCoder.class);
+    assertEquals(ByteArrayCoder.of(),
+        factory.coderFor(TypeDescriptor.of(byte[].class), Collections.<Coder<?>>emptyList()));
   }
 
+  /**
+   * Checks that {#link CoderProviders.fromStaticMethods} successfully
+   * builds a working {@link CoderProvider} from {@link KvCoder KvCoder.class}.
+   */
   @Test
-  public void testThrowingThenSerializable() throws Exception {
-    CoderProvider provider =
-        CoderProviders.firstOf(new ThrowingCoderProvider(), SerializableCoder.PROVIDER);
-    Coder<Integer> coder = provider.getCoder(new TypeDescriptor<Integer>(){});
-    assertThat(coder, instanceOf(SerializableCoder.class));
+  public void testKvCoderProvider() throws Exception {
+    TypeDescriptor<KV<Double, Double>> type =
+        TypeDescriptors.kvs(TypeDescriptors.doubles(), TypeDescriptors.doubles());
+    CoderProvider kvCoderProvider = CoderProviders.fromStaticMethods(KV.class, KvCoder.class);
+    assertEquals(
+        KvCoder.of(DoubleCoder.of(), DoubleCoder.of()),
+        kvCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of(), DoubleCoder.of())));
   }
 
+  /**
+   * Checks that {#link CoderProviders.fromStaticMethods} successfully
+   * builds a working {@link CoderProvider} from {@link ListCoder ListCoder.class}.
+   */
   @Test
-  public void testNullThrows() throws Exception {
-    CoderProvider provider = CoderProviders.firstOf(new ThrowingCoderProvider());
-    thrown.expect(CannotProvideCoderException.class);
-    thrown.expectMessage("ThrowingCoderProvider");
-    provider.getCoder(new TypeDescriptor<Integer>(){});
+  public void testListCoderProvider() throws Exception {
+    TypeDescriptor<List<Double>> type = TypeDescriptors.lists(TypeDescriptors.doubles());
+    CoderProvider listCoderProvider = CoderProviders.fromStaticMethods(List.class, ListCoder.class);
+
+    assertEquals(
+        ListCoder.of(DoubleCoder.of()),
+        listCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of())));
   }
 
-  private static class ThrowingCoderProvider implements CoderProvider {
-    @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-      throw new CannotProvideCoderException("ThrowingCoderProvider cannot ever provide a Coder");
-    }
+  /**
+   * Checks that {#link CoderProviders.fromStaticMethods} successfully
+   * builds a working {@link CoderProvider} from {@link IterableCoder IterableCoder.class}.
+   */
+  @Test
+  public void testIterableCoderProvider() throws Exception {
+    TypeDescriptor<Iterable<Double>> type = TypeDescriptors.iterables(TypeDescriptors.doubles());
+    CoderProvider iterableCoderProvider =
+        CoderProviders.fromStaticMethods(Iterable.class, IterableCoder.class);
+
+    assertEquals(
+        IterableCoder.of(DoubleCoder.of()),
+        iterableCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of())));
   }
 }


Mime
View raw message