avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1511535 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/avro/src/main/java/org/apache/avro/reflect/ lang/java/avro/src/main/java/org/apache/avro/specific/ lang/java/mapred/src/main/java/org/apache/avro/...
Date Wed, 07 Aug 2013 23:27:07 GMT
Author: cutting
Date: Wed Aug  7 23:27:06 2013
New Revision: 1511535

URL: http://svn.apache.org/r1511535
Log:
AVRO-1353. Java: Permit specification of data model (generic, specific, reflect, or other) in mapreduce job configuration.  Contributed by Marshall Bockrath-Vandegrift.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyDeserializer.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Aug  7 23:27:06 2013
@@ -22,6 +22,10 @@ Trunk (not yet released)
     AVRO-1341. Java: Add reflection annotations @AvroName, @AvroIgnore,
     @AvroMeta, @AvroAlias and @AvroEncode. (Vincenz Priesnitz via cutting)
 
+    AVRO-1353. Java: Permit specification of data model (generic,
+    specific, reflect, or other) in mapreduce job configuration.
+    (Marshall Bockrath-Vandegrift via cutting)
+
   IMPROVEMENTS
 
     AVRO-1260. Ruby: Improve read performance. (Martin Kleppmann via cutting)

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java Wed Aug  7 23:27:06 2013
@@ -62,6 +62,8 @@ public class GenericData {
   protected static final String STRING_PROP = "avro.java.string";
   protected static final String STRING_TYPE_STRING = "String";
 
+  private final ClassLoader classLoader;
+
   /** Set the Java type to be used when reading this schema.  Meaningful only
    * only string schemas and map schemas (for the keys). */
   public static void setStringType(Schema s, StringType stringType) {
@@ -74,8 +76,21 @@ public class GenericData {
   /** Return the singleton instance. */
   public static GenericData get() { return INSTANCE; }
 
-  protected GenericData() {}
-  
+  /** For subclasses.  Applications normally use {@link GenericData#get()}. */
+  public GenericData() {
+    this(null);
+  }
+
+  /** For subclasses.  GenericData does not use a ClassLoader. */
+  public GenericData(ClassLoader classLoader) {
+    this.classLoader = (classLoader != null)
+      ? classLoader
+      : getClass().getClassLoader();
+  }
+
+  /** Return the class loader that's used (by subclasses). */
+  public ClassLoader getClassLoader() { return classLoader; }
+
   /** Default implementation of {@link GenericRecord}. Note that this implementation
    * does not fill in default values for fields if they are not specified; use {@link
    * GenericRecordBuilder} in that case.
@@ -339,6 +354,11 @@ public class GenericData {
     return new GenericDatumReader(schema, schema, this);
   }
 
+  /** Returns a {@link DatumReader} for this kind of data. */
+  public DatumReader createDatumReader(Schema writer, Schema reader) {
+    return new GenericDatumReader(writer, reader, this);
+  }
+
   /** Returns a {@link DatumWriter} for this kind of data. */
   public DatumWriter createDatumWriter(Schema schema) {
     return new GenericDatumWriter(schema, this);

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Wed Aug  7 23:27:06 2013
@@ -78,6 +78,7 @@ public class ReflectData extends Specifi
   
   private static final ReflectData INSTANCE = new ReflectData();
 
+  /** For subclasses.  Applications normally use {@link ReflectData#get()}. */
   public ReflectData() {}
   
   /** Construct with a particular classloader. */
@@ -101,6 +102,11 @@ public class ReflectData extends Specifi
   }
 
   @Override
+  public DatumReader createDatumReader(Schema writer, Schema reader) {
+    return new ReflectDatumReader(writer, reader, this);
+  }
+
+  @Override
   public DatumWriter createDatumWriter(Schema schema) {
     return new ReflectDatumWriter(schema, this);
   }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java Wed Aug  7 23:27:06 2013
@@ -43,8 +43,6 @@ public class SpecificData extends Generi
 
   private static final SpecificData INSTANCE = new SpecificData();
   
-  private final ClassLoader classLoader;
-  
   private static final Class<?>[] NO_ARG = new Class[]{};
   private static final Class<?>[] SCHEMA_ARG = new Class[]{Schema.class};
   private static final Map<Class,Constructor> CTOR_CACHE =
@@ -70,22 +68,24 @@ public class SpecificData extends Generi
   }
 
   /** For subclasses.  Applications normally use {@link SpecificData#get()}. */
-  protected SpecificData() { this(SpecificData.class.getClassLoader()); }
+  public SpecificData() {}
 
   /** Construct with a specific classloader. */
   public SpecificData(ClassLoader classLoader) {
-    this.classLoader = classLoader;
+    super(classLoader);
   }
   
-  /** Return the class loader that's used. */
-  public ClassLoader getClassLoader() { return classLoader; }
-
   @Override
   public DatumReader createDatumReader(Schema schema) {
     return new SpecificDatumReader(schema, schema, this);
   }
 
   @Override
+  public DatumReader createDatumReader(Schema writer, Schema reader) {
+    return new SpecificDatumReader(writer, reader, this);
+  }
+
+  @Override
   public DatumWriter createDatumWriter(Schema schema) {
     return new SpecificDatumWriter(schema, this);
   }
@@ -128,7 +128,7 @@ public class SpecificData extends Generi
       Class c = classCache.get(name);
       if (c == null) {
         try {
-          c = classLoader.loadClass(getClassName(schema));
+          c = getClassLoader().loadClass(getClassName(schema));
         } catch (ClassNotFoundException e) {
           c = NO_CLASS;
         }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java Wed Aug  7 23:27:06 2013
@@ -73,6 +73,20 @@ public abstract class AvroDeserializer<T
   }
 
   /**
+   * Constructor.
+   *
+   * @param writerSchema The Avro writer schema for the data to deserialize.
+   * @param readerSchema The Avro reader schema for the data to deserialize (may be null).
+   * @param datumReader The Avro datum reader to use for deserialization.
+   */
+  protected AvroDeserializer(Schema writerSchema, Schema readerSchema,
+                             DatumReader<D> datumReader) {
+    mWriterSchema = writerSchema;
+    mReaderSchema = null != readerSchema ? readerSchema : writerSchema;
+    mAvroDatumReader = datumReader;
+  }
+
+  /**
    * Gets the writer schema used for deserializing.
    *
    * @return The writer schema;

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java Wed Aug  7 23:27:06 2013
@@ -19,10 +19,10 @@
 package org.apache.avro.hadoop.io;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.reflect.ReflectData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.RawComparator;
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.RawComparato
 public class AvroKeyComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
   /** The schema of the Avro data in the key to compare. */
   private Schema mSchema;
+  private GenericData mDataModel;
 
   /** {@inheritDoc} */
   @Override
@@ -43,8 +44,10 @@ public class AvroKeyComparator<T> extend
     super.setConf(conf);
     if (null != conf) {
       // The MapReduce framework will be using this comparator to sort AvroKey objects
-      // output from the map phase, so use the schema defined for the map output key.
+      // output from the map phase, so use the schema defined for the map output key
+      // and the data model non-raw compare() implementation.
       mSchema = AvroJob.getMapOutputKeySchema(conf);
+      mDataModel = AvroSerialization.createDataModel(conf);
     }
   }
 
@@ -57,6 +60,6 @@ public class AvroKeyComparator<T> extend
   /** {@inheritDoc} */
   @Override
   public int compare(AvroKey<T> x, AvroKey<T> y) {
-    return ReflectData.get().compare(x.datum(), y.datum(), mSchema);
+    return mDataModel.compare(x.datum(), y.datum(), mSchema);
   }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyDeserializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyDeserializer.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyDeserializer.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyDeserializer.java Wed Aug  7 23:27:06 2013
@@ -19,6 +19,7 @@
 package org.apache.avro.hadoop.io;
 
 import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroWrapper;
 
@@ -42,6 +43,18 @@ public class AvroKeyDeserializer<D> exte
   }
 
   /**
+   * Constructor.
+   *
+   * @param writerSchema The Avro writer schema for the data to deserialize.
+   * @param readerSchema The Avro reader schema for the data to deserialize.
+   * @param datumReader The Avro datum reader to use for deserialization.
+   */
+  public AvroKeyDeserializer(Schema writerSchema, Schema readerSchema,
+                             DatumReader<D> datumReader) {
+    super(writerSchema, readerSchema, datumReader);
+  }
+
+  /**
    * Creates a new empty <code>AvroKey</code> instance.
    *
    * @return a new empty AvroKey.

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java Wed Aug  7 23:27:06 2013
@@ -18,17 +18,23 @@
 
 package org.apache.avro.hadoop.io;
 
+import java.lang.reflect.Constructor;
 import java.util.Collection;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * The {@link org.apache.hadoop.io.serializer.Serialization} used by jobs configured with
@@ -49,6 +55,9 @@ public class AvroSerialization<T> extend
   /** Conf key for the reader schema of the AvroValue datum being serialized/deserialized. */
   private static final String CONF_VALUE_READER_SCHEMA = "avro.serialization.value.reader.schema";
 
+  /** Conf key for the data model implementation class. */
+  private static final String CONF_DATA_MODEL = "avro.serialization.data.model";
+
   /** {@inheritDoc} */
   @Override
   public boolean accept(Class<?> c) {
@@ -64,14 +73,21 @@ public class AvroSerialization<T> extend
   @Override
   public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
     Configuration conf = getConf();
+    GenericData dataModel = createDataModel(conf);
     if (AvroKey.class.isAssignableFrom(c)) {
-      return new AvroKeyDeserializer<T>(getKeyWriterSchema(conf),
-                                        getKeyReaderSchema(conf),
-                                        conf.getClassLoader());
+      Schema writerSchema = getKeyWriterSchema(conf);
+      Schema readerSchema = getKeyReaderSchema(conf);
+      DatumReader<T> datumReader = (readerSchema != null)
+        ? dataModel.createDatumReader(writerSchema, readerSchema)
+        : dataModel.createDatumReader(writerSchema);
+      return new AvroKeyDeserializer<T>(writerSchema, readerSchema, datumReader);
     } else if (AvroValue.class.isAssignableFrom(c)) {
-      return new AvroValueDeserializer<T>(getValueWriterSchema(conf),
-                                          getValueReaderSchema(conf),
-                                          conf.getClassLoader());
+      Schema writerSchema = getValueWriterSchema(conf);
+      Schema readerSchema = getValueReaderSchema(conf);
+      DatumReader<T> datumReader = (readerSchema != null)
+        ? dataModel.createDatumReader(writerSchema, readerSchema)
+        : dataModel.createDatumReader(writerSchema);
+      return new AvroValueDeserializer<T>(writerSchema, readerSchema, datumReader);
     } else {
       throw new IllegalStateException("Only AvroKey and AvroValue are supported.");
     }
@@ -85,15 +101,18 @@ public class AvroSerialization<T> extend
    */
   @Override
   public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
+    Configuration conf = getConf();
     Schema schema;
     if (AvroKey.class.isAssignableFrom(c)) {
-      schema = getKeyWriterSchema(getConf());
+      schema = getKeyWriterSchema(conf);
     } else if (AvroValue.class.isAssignableFrom(c)) {
-      schema = getValueWriterSchema(getConf());
+      schema = getValueWriterSchema(conf);
     } else {
       throw new IllegalStateException("Only AvroKey and AvroValue are supported.");
     }
-    return new AvroSerializer<T>(schema);
+    GenericData dataModel = createDataModel(conf);
+    DatumWriter<T> datumWriter = dataModel.createDatumWriter(schema);
+    return new AvroSerializer<T>(schema, datumWriter);
   }
 
   /**
@@ -158,6 +177,16 @@ public class AvroSerialization<T> extend
   }
 
   /**
+   * Sets the data model class for de/seralization.
+   *
+   * @param conf The configuration.
+   * @param modelClass The data model class.
+   */
+  public static void setDataModelClass(Configuration conf, Class<? extends GenericData> modelClass) {
+    conf.setClass(CONF_DATA_MODEL, modelClass, GenericData.class);
+  }
+
+  /**
    * Gets the writer schema of the AvroKey datum that is being serialized/deserialized.
    *
    * @param conf The configuration.
@@ -200,4 +229,38 @@ public class AvroSerialization<T> extend
     String json = conf.get(CONF_VALUE_READER_SCHEMA);
     return null == json ? null : Schema.parse(json);
   }
+
+  /**
+   * Gets the data model class for de/seralization.
+   *
+   * @param conf The configuration.
+   */
+  public static Class<? extends GenericData> getDataModelClass(Configuration conf) {
+    return conf.getClass(CONF_DATA_MODEL, ReflectData.class, GenericData.class);
+  }
+
+  private static GenericData newDataModelInstance(Class<? extends GenericData> modelClass, Configuration conf) {
+    GenericData dataModel;
+    try {
+      Constructor<? extends GenericData> ctor = modelClass.getDeclaredConstructor(ClassLoader.class);
+      ctor.setAccessible(true);
+      dataModel = ctor.newInstance(conf.getClassLoader());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    ReflectionUtils.setConf(dataModel, conf);
+    return dataModel;
+  }
+
+  /**
+   * Gets an instance of data model implementation, defaulting to
+   * {@link ReflectData} if not explicitly specified.
+   *
+   * @param conf The job configuration.
+   * @return Instance of the job data model implementation.
+   */
+  public static GenericData createDataModel(Configuration conf) {
+    Class<? extends GenericData> modelClass = getDataModelClass(conf);
+    return newDataModelInstance(modelClass, conf);
+  }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java Wed Aug  7 23:27:06 2013
@@ -83,6 +83,20 @@ public class AvroSerializer<T> implement
   }
 
   /**
+   * Constructor.
+   *
+   * @param writerSchema The writer schema for the Avro data being serialized.
+   * @param datumWriter The datum writer to use for serialization.
+   */
+  public AvroSerializer(Schema writerSchema, DatumWriter<T> datumWriter) {
+    if (null == writerSchema) {
+      throw new IllegalArgumentException("Writer schema may not be null");
+    }
+    mWriterSchema = writerSchema;
+    mAvroDatumWriter = datumWriter;
+  }
+
+  /**
    * Gets the writer schema being used for serialization.
    *
    * @return The writer schema.

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java Wed Aug  7 23:27:06 2013
@@ -19,6 +19,7 @@
 package org.apache.avro.hadoop.io;
 
 import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapred.AvroWrapper;
 
@@ -42,6 +43,18 @@ public class AvroValueDeserializer<D> ex
   }
 
   /**
+   * Constructor.
+   *
+   * @param writerSchema The Avro writer schema for the data to deserialize.
+   * @param readerSchema The Avro reader schema for the data to deserialize.
+   * @param datumReader The Avro datum reader to use for deserialization.
+   */
+  public AvroValueDeserializer(Schema writerSchema, Schema readerSchema,
+                               DatumReader<D> datumReader) {
+    super(writerSchema, readerSchema, datumReader);
+  }
+
+  /**
    * Creates a new empty <code>AvroValue</code> instance.
    *
    * @return a new empty AvroValue.

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java Wed Aug  7 23:27:06 2013
@@ -19,6 +19,7 @@
 package org.apache.avro.mapred;
 
 import java.util.Collection;
+import java.lang.reflect.Constructor;
 import java.net.URLEncoder;
 import java.io.UnsupportedEncodingException;
 
@@ -26,7 +27,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
 
 /** Setters to configure jobs for Avro data. */
 public class AvroJob {
@@ -53,6 +58,8 @@ public class AvroJob {
   public static final String INPUT_IS_REFLECT = "avro.input.is.reflect";
   /** The configuration key for reflection-based map output representation. */
   public static final String MAP_OUTPUT_IS_REFLECT = "avro.map.output.is.reflect";
+  /** The configuration key for the data model implementation class. */
+  private static final String CONF_DATA_MODEL = "avro.serialization.data.model";
 
   /** Configure a job's map input schema. */
   public static void setInputSchema(JobConf job, Schema s) {
@@ -62,7 +69,8 @@ public class AvroJob {
 
   /** Return a job's map input schema. */
   public static Schema getInputSchema(Configuration job) {
-    return Schema.parse(job.get(INPUT_SCHEMA));
+    String schemaString = job.get(INPUT_SCHEMA);
+    return schemaString != null ? Schema.parse(schemaString) : null;
   }
 
   /** Configure a job's map output schema.  The map output schema defaults to
@@ -190,6 +198,57 @@ public class AvroJob {
     job.set(REDUCER, c.getName());
   }
 
+  /** Configure a job's data model implementation class. */
+  public static void setDataModelClass(JobConf job, Class<? extends GenericData> modelClass) {
+    job.setClass(CONF_DATA_MODEL, modelClass, GenericData.class);
+  }
   
+  /** Return the job's data model implementation class. */
+  public static Class<? extends GenericData> getDataModelClass(Configuration conf) {
+    return (Class<? extends GenericData>) conf.getClass(
+        CONF_DATA_MODEL, ReflectData.class, GenericData.class);
+  }
+
+  private static GenericData newDataModelInstance(Class<? extends GenericData> modelClass, Configuration conf) {
+    GenericData dataModel;
+    try {
+      Constructor<? extends GenericData> ctor = modelClass.getDeclaredConstructor(ClassLoader.class);
+      ctor.setAccessible(true);
+      dataModel = ctor.newInstance(conf.getClassLoader());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    ReflectionUtils.setConf(dataModel, conf);
+    return dataModel;
+  }
 
+  public static GenericData createDataModel(Configuration conf) {
+    return newDataModelInstance(getDataModelClass(conf), conf);
+  }
+
+  public static GenericData createInputDataModel(Configuration conf) {
+    String className = conf.get(CONF_DATA_MODEL, null);
+    Class<? extends GenericData> modelClass;
+    if (className != null) {
+      modelClass = getDataModelClass(conf);
+    } else if (conf.getBoolean(INPUT_IS_REFLECT, false)) {
+      modelClass = ReflectData.class;
+    } else {
+      modelClass = SpecificData.class;
+    }
+    return newDataModelInstance(modelClass, conf);
+  }
+
+  public static GenericData createMapOutputDataModel(Configuration conf) {
+    String className = conf.get(CONF_DATA_MODEL, null);
+    Class<? extends GenericData> modelClass;
+    if (className != null) {
+      modelClass = getDataModelClass(conf);
+    } else if (conf.getBoolean(MAP_OUTPUT_IS_REFLECT, false)) {
+      modelClass = ReflectData.class;
+    } else {
+      modelClass = SpecificData.class;
+    }
+    return newDataModelInstance(modelClass, conf);
+  }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java Wed Aug  7 23:27:06 2013
@@ -33,9 +33,9 @@ import org.apache.hadoop.mapred.RecordWr
 import org.apache.hadoop.util.Progressable;
 
 import org.apache.avro.Schema;
-import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.hadoop.file.HadoopCodecFactory;
 
 import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
@@ -145,9 +145,10 @@ public class AvroOutputFormat <T>
     Schema schema = isMapOnly
       ? AvroJob.getMapOutputSchema(job)
       : AvroJob.getOutputSchema(job);
+    GenericData dataModel = AvroJob.createDataModel(job);
 
     final DataFileWriter<T> writer =
-      new DataFileWriter<T>(new ReflectDatumWriter<T>());
+      new DataFileWriter<T>(dataModel.createDatumWriter(null));
     
     configureDataFileWriter(writer, job);
 

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java Wed Aug  7 23:27:06 2013
@@ -27,8 +27,6 @@ import org.apache.hadoop.mapred.RecordRe
 
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
 
 /** An {@link RecordReader} for Avro data files. */
 public class AvroRecordReader<T>
@@ -42,9 +40,8 @@ public class AvroRecordReader<T>
     throws IOException {
     this(DataFileReader.openReader
          (new FsInput(split.getPath(), job),
-          job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
-          ? new ReflectDatumReader<T>(AvroJob.getInputSchema(job))
-          : new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
+          AvroJob.createInputDataModel(job)
+          .createDatumReader(AvroJob.getInputSchema(job))),
          split);
   }
 

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java Wed Aug  7 23:27:06 2013
@@ -25,18 +25,17 @@ import java.io.OutputStream;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
 
 /** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
 public class AvroSerialization<T> extends Configured 
@@ -49,14 +48,13 @@ public class AvroSerialization<T> extend
   /** Returns the specified map output deserializer.  Defaults to the final
    * output deserializer if no map output schema was specified. */
   public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
+    Configuration conf = getConf();
     boolean isKey = AvroKey.class.isAssignableFrom(c);
     Schema schema = isKey
-      ? Pair.getKeySchema(AvroJob.getMapOutputSchema(getConf()))
-      : Pair.getValueSchema(AvroJob.getMapOutputSchema(getConf()));
-    DatumReader<T> datumReader =
-      getConf().getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)
-      ? new ReflectDatumReader<T>(schema)
-      : new SpecificDatumReader<T>(schema);
+      ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf))
+      : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf));
+    GenericData dataModel = AvroJob.createMapOutputDataModel(conf);
+    DatumReader<T> datumReader = dataModel.createDatumReader(schema);
     return new AvroWrapperDeserializer(datumReader, isKey);
   }
   
@@ -99,12 +97,14 @@ public class AvroSerialization<T> extend
   public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
     // AvroWrapper used for final output, AvroKey or AvroValue for map output
     boolean isFinalOutput = c.equals(AvroWrapper.class);
+    Configuration conf = getConf();
     Schema schema = isFinalOutput
-      ? AvroJob.getOutputSchema(getConf())
+      ? AvroJob.getOutputSchema(conf)
       : (AvroKey.class.isAssignableFrom(c)
-         ? Pair.getKeySchema(AvroJob.getMapOutputSchema(getConf()))
-         : Pair.getValueSchema(AvroJob.getMapOutputSchema(getConf())));
-    return new AvroWrapperSerializer(new ReflectDatumWriter<T>(schema));
+         ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf))
+         : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
+    GenericData dataModel = AvroJob.createDataModel(conf);
+    return new AvroWrapperSerializer(dataModel.createDatumWriter(schema));
   }
 
   private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java Wed Aug  7 23:27:06 2013
@@ -19,6 +19,7 @@
 package org.apache.avro.mapreduce;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.hadoop.io.AvroKeyComparator;
 import org.apache.avro.hadoop.io.AvroSerialization;
 import org.apache.avro.mapred.AvroKey;
@@ -132,6 +133,16 @@ public final class AvroJob {
   }
 
   /**
+   * Sets the job data model class.
+   *
+   * @param job The job to configure.
+   * @param modelClass The job data model class.
+   */
+  public static void setDataModelClass(Job job, Class<? extends GenericData> modelClass) {
+    AvroSerialization.setDataModelClass(job.getConfiguration(), modelClass);
+  }
+
+  /**
    * Gets the job input key schema.
    *
    * @param conf The job configuration.

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java Wed Aug  7 23:27:06 2013
@@ -23,7 +23,10 @@ import java.io.OutputStream;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.hadoop.io.AvroSerialization;
 import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -70,9 +73,9 @@ public class AvroKeyOutputFormat<T> exte
      * @param outputStream The target output stream for the records.
      */
     protected RecordWriter<AvroKey<T>, NullWritable> create(
-        Schema writerSchema, CodecFactory compressionCodec, OutputStream outputStream)
-        throws IOException {
-      return new AvroKeyRecordWriter<T>(writerSchema, compressionCodec, outputStream);
+        Schema writerSchema, GenericData dataModel, CodecFactory compressionCodec,
+        OutputStream outputStream) throws IOException {
+      return new AvroKeyRecordWriter<T>(writerSchema, dataModel, compressionCodec, outputStream);
     }
   }
 
@@ -81,25 +84,25 @@ public class AvroKeyOutputFormat<T> exte
   @SuppressWarnings("unchecked")
   public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
       throws IOException {
+    Configuration conf = context.getConfiguration();
     // Get the writer schema.
-    Schema writerSchema = null;
+    Schema writerSchema = AvroJob.getOutputKeySchema(conf);
     boolean isMapOnly = context.getNumReduceTasks() == 0;
     if (isMapOnly) {
-      writerSchema = AvroJob.getMapOutputKeySchema(context.getConfiguration());
-      //If the MapOutputKeySchema is not set, try to use the OutputKeySchema
-      if (null == writerSchema) {
-        writerSchema = AvroJob.getOutputKeySchema(context.getConfiguration());
+      Schema mapOutputSchema = AvroJob.getMapOutputKeySchema(conf);
+      if (mapOutputSchema != null) {
+        writerSchema = mapOutputSchema;
       }
     }
-    else {
-      writerSchema = AvroJob.getOutputKeySchema(context.getConfiguration());
-    }
     if (null == writerSchema) {
       throw new IOException(
           "AvroKeyOutputFormat requires an output schema. Use AvroJob.setOutputKeySchema().");
     }
 
-    return mRecordWriterFactory.create(
-        writerSchema, getCompressionCodec(context), getAvroFileOutputStream(context));
+    GenericData dataModel = AvroSerialization.createDataModel(conf);
+
+    return mRecordWriterFactory.create
+      (writerSchema, dataModel, getCompressionCodec(context),
+       getAvroFileOutputStream(context));
   }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java Wed Aug  7 23:27:06 2013
@@ -24,8 +24,8 @@ import java.io.OutputStream;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -47,10 +47,10 @@ public class AvroKeyRecordWriter<T> exte
    * @param outputStream The output stream to write the Avro container file to.
    * @throws IOException If the record writer cannot be opened.
    */
-  public AvroKeyRecordWriter(Schema writerSchema, CodecFactory compressionCodec,
-      OutputStream outputStream) throws IOException {
+  public AvroKeyRecordWriter(Schema writerSchema, GenericData dataModel,
+      CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
     // Create an Avro container file and a writer to it.
-    mAvroFileWriter = new DataFileWriter<T>(new ReflectDatumWriter<T>(writerSchema));
+    mAvroFileWriter = new DataFileWriter<T>(dataModel.createDatumWriter(writerSchema));
     mAvroFileWriter.setCodec(compressionCodec);
     mAvroFileWriter.create(writerSchema, outputStream);
   }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java Wed Aug  7 23:27:06 2013
@@ -20,8 +20,11 @@ package org.apache.avro.mapreduce;
 
 import java.io.IOException;
 
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.hadoop.io.AvroDatumConverter;
 import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
+import org.apache.avro.hadoop.io.AvroSerialization;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -45,15 +48,18 @@ public class AvroKeyValueOutputFormat<K,
   @Override
   @SuppressWarnings("unchecked")
   public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException {
-    AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(
-        context.getConfiguration());
+    Configuration conf = context.getConfiguration();
+
+    AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(conf);
 
     AvroDatumConverter<K, ?> keyConverter = converterFactory.create(
         (Class<K>) context.getOutputKeyClass());
     AvroDatumConverter<V, ?> valueConverter = converterFactory.create(
         (Class<V>) context.getOutputValueClass());
 
+    GenericData dataModel = AvroSerialization.createDataModel(conf);
+
     return new AvroKeyValueRecordWriter<K, V>(keyConverter, valueConverter,
-        getCompressionCodec(context), getAvroFileOutputStream(context));
+        dataModel, getCompressionCodec(context), getAvroFileOutputStream(context));
   }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java Wed Aug  7 23:27:06 2013
@@ -28,7 +28,6 @@ import org.apache.avro.file.CodecFactory
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -61,15 +60,15 @@ public class AvroKeyValueRecordWriter<K,
   private final AvroDatumConverter<V, ?> mValueConverter;
 
   public AvroKeyValueRecordWriter(AvroDatumConverter<K, ?> keyConverter,
-      AvroDatumConverter<V, ?> valueConverter, CodecFactory compressionCodec,
-      OutputStream outputStream) throws IOException {
+      AvroDatumConverter<V, ?> valueConverter, GenericData dataModel,
+      CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
     // Create the generic record schema for the key/value pair.
     mKeyValuePairSchema = AvroKeyValue.getSchema(
         keyConverter.getWriterSchema(), valueConverter.getWriterSchema());
 
     // Create an Avro container file and a writer to it.
     mAvroFileWriter = new DataFileWriter<GenericRecord>(
-        new ReflectDatumWriter<GenericRecord>(mKeyValuePairSchema));
+        dataModel.createDatumWriter(mKeyValuePairSchema));
     mAvroFileWriter.setCodec(compressionCodec);
     mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
 

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java Wed Aug  7 23:27:06 2013
@@ -23,9 +23,10 @@ import java.io.IOException;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.hadoop.io.AvroSerialization;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
-import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -87,8 +88,10 @@ public abstract class AvroRecordReaderBa
         = createSeekableInput(context.getConfiguration(), fileSplit.getPath());
 
     // Wrap the seekable input stream in an Avro DataFileReader.
-    mAvroFileReader = createAvroFileReader(seekableFileInput,
-        new ReflectDatumReader<T>(mReaderSchema));
+    Configuration conf = context.getConfiguration();
+    GenericData dataModel = AvroSerialization.createDataModel(conf);
+    DatumReader<T> datumReader = dataModel.createDatumReader(mReaderSchema);
+    mAvroFileReader = createAvroFileReader(seekableFileInput, datumReader);
 
     // Initialize the start and end offsets into the file based on the boundaries of the
     // input split we're responsible for.  We will read the first block that begins

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java Wed Aug  7 23:27:06 2013
@@ -19,11 +19,21 @@ package org.apache.avro.hadoop.io;
 
 import static org.junit.Assert.*;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapred.AvroWrapper;
@@ -125,8 +135,7 @@ public class TestAvroSerialization {
 
   @Test public void testClassPath() throws Exception {
     Configuration conf = new Configuration();
-    ClassLoader loader = new ClassLoader() {};
-    conf.setClassLoader(loader);
+    ClassLoader loader = conf.getClass().getClassLoader();
     AvroSerialization serialization = new AvroSerialization();
     serialization.setConf(conf);
     AvroDeserializer des =
@@ -135,4 +144,35 @@ public class TestAvroSerialization {
       (ReflectData)((ReflectDatumReader)des.mAvroDatumReader).getData();
     Assert.assertEquals(loader, data.getClassLoader());
   }
+
+  private <T, O> O roundTrip(Schema schema, T data, Class<? extends GenericData> modelClass) throws IOException {
+    Job job = new Job();
+    AvroJob.setMapOutputKeySchema(job, schema);
+    if (modelClass != null)
+      AvroJob.setDataModelClass(job, modelClass);
+    AvroSerialization serialization =
+      ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+    Serializer<AvroKey<T>> serializer = serialization.getSerializer(AvroKey.class);
+    Deserializer<AvroKey<O>> deserializer = serialization.getDeserializer(AvroKey.class);
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    serializer.open(baos);
+    serializer.serialize(new AvroKey<T>(data));
+    serializer.close();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    deserializer.open(bais);
+    AvroKey<O> result = null;
+    result = deserializer.deserialize(result);
+    deserializer.close();
+
+    return result.datum();
+  }
+
+  @Test
+  public void testRoundTrip() throws Exception {
+    Schema schema = Schema.create(Schema.Type.STRING);
+    assertTrue(roundTrip(schema, "record", null) instanceof String);
+    assertTrue(roundTrip(schema, "record", GenericData.class) instanceof Utf8);
+  }
 }

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java Wed Aug  7 23:27:06 2013
@@ -27,6 +27,7 @@ import java.io.OutputStream;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroOutputFormat;
 import org.apache.hadoop.conf.Configuration;
@@ -66,7 +67,7 @@ public class TestAvroKeyOutputFormat {
     conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
     testGetRecordWriter(conf, CodecFactory.snappyCodec());
   }
-  
+
   @Test
   public void testWithBZip2Code() throws IOException {
     Configuration conf = new Configuration();
@@ -74,7 +75,7 @@ public class TestAvroKeyOutputFormat {
     conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.BZIP2_CODEC);
     testGetRecordWriter(conf, CodecFactory.bzip2Codec());
   }
-  
+
   @Test
   public void testWithDeflateCodeWithHadoopConfig() throws IOException {
     Configuration conf = new Configuration();
@@ -82,7 +83,7 @@ public class TestAvroKeyOutputFormat {
     conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.DeflateCodec");
     conf.setInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, -1);
   }
-  
+
   @Test
   public void testWithSnappyCodeWithHadoopConfig() throws IOException {
     Configuration conf = new Configuration();
@@ -90,7 +91,7 @@ public class TestAvroKeyOutputFormat {
     conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
     testGetRecordWriter(conf, CodecFactory.snappyCodec());
   }
-  
+
   @Test
   public void testWithBZip2CodeWithHadoopConfig() throws IOException {
     Configuration conf = new Configuration();
@@ -127,6 +128,7 @@ public class TestAvroKeyOutputFormat {
     // Expect the record writer factory to be called with appropriate parameters.
     Capture<CodecFactory> capturedCodecFactory = new Capture<CodecFactory>();
     expect(recordWriterFactory.create(eq(writerSchema),
+        anyObject(GenericData.class),
         capture(capturedCodecFactory),  // Capture for comparison later.
         anyObject(OutputStream.class))).andReturn(expectedRecordWriter);
 

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java Wed Aug  7 23:27:06 2013
@@ -29,8 +29,10 @@ import java.io.InputStream;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -41,6 +43,7 @@ public class TestAvroKeyRecordWriter {
   @Test
   public void testWrite() throws IOException {
     Schema writerSchema = Schema.create(Schema.Type.INT);
+    GenericData dataModel = new ReflectData();
     CodecFactory compressionCodec = CodecFactory.nullCodec();
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
     TaskAttemptContext context = createMock(TaskAttemptContext.class);
@@ -49,7 +52,7 @@ public class TestAvroKeyRecordWriter {
 
     // Write an avro container file with two records: 1 and 2.
     AvroKeyRecordWriter<Integer> recordWriter = new AvroKeyRecordWriter<Integer>(
-        writerSchema, compressionCodec, outputStream);
+        writerSchema, dataModel, compressionCodec, outputStream);
     recordWriter.write(new AvroKey<Integer>(1), NullWritable.get());
     recordWriter.write(new AvroKey<Integer>(2), NullWritable.get());
     recordWriter.close(context);

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java?rev=1511535&r1=1511534&r2=1511535&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java Wed Aug  7 23:27:06 2013
@@ -64,7 +64,7 @@ public class TestAvroKeyValueRecordWrite
     // Write two records: <'apple', TextStats('apple')> and <'banana', TextStats('banana')>.
     AvroKeyValueRecordWriter<Text, AvroValue<TextStats>> writer
         = new AvroKeyValueRecordWriter<Text, AvroValue<TextStats>>(keyConverter, valueConverter,
-            compressionCodec, outputStream);
+            new ReflectData(), compressionCodec, outputStream);
     TextStats appleStats = new TextStats();
     appleStats.name = "apple";
     writer.write(new Text("apple"), new AvroValue<TextStats>(appleStats));
@@ -129,8 +129,9 @@ public class TestAvroKeyValueRecordWrite
       factory.create((Class<AvroValue<R1>>) avroValue.getClass());
 
     AvroKeyValueRecordWriter<Text, AvroValue<R1>> writer =
-      new AvroKeyValueRecordWriter<Text, AvroValue<R1>>(keyConverter,
-        valueConverter, CodecFactory.nullCodec(), outputStream);
+      new AvroKeyValueRecordWriter<Text, AvroValue<R1>>(
+        keyConverter, valueConverter, new ReflectData(),
+        CodecFactory.nullCodec(), outputStream);
 
     writer.write(new Text("reflectionData"), avroValue);
     writer.close(context);



Mime
View raw message