avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1511456 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/ lang/java/mapred/src/main/java/org/apache/avro/mapreduce/ lang/java/mapred/src/test/java/org/apache/avro/mapreduce/
Date Wed, 07 Aug 2013 19:47:42 GMT
Author: cutting
Date: Wed Aug  7 19:47:42 2013
New Revision: 1511456

URL: http://svn.apache.org/r1511456
Log:
AVRO-1356. Java: Fix AvroMultipleOutputs for map-only jobs.  Contributed by Alan Paulsen.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1511456&r1=1511455&r2=1511456&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Aug  7 19:47:42 2013
@@ -140,6 +140,9 @@ Trunk (not yet released)
     AVRO-1144. Java: Deadlock with FSInput and Hadoop NativeS3FileSystem.
     (scottcarey)
 
+    AVRO-1356. Java: Fix AvroMultipleOutputs for map-only jobs.
+    (Alan Paulsen via cutting)
+
 Avro 1.7.4 (22 February 2012)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java?rev=1511456&r1=1511455&r2=1511456&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDatumConverterFactory.java
Wed Aug  7 19:47:42 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Constructs converters that turn objects (usually from the output of a MR job) into Avro
@@ -77,8 +78,18 @@ public class AvroDatumConverterFactory e
    */
   @SuppressWarnings("unchecked")
   public <IN, OUT> AvroDatumConverter<IN, OUT> create(Class<IN> inputClass)
{
+    boolean isMapOnly = ((JobConf)getConf()).getNumReduceTasks() == 0;
     if (AvroKey.class.isAssignableFrom(inputClass)) {
-      Schema schema = AvroJob.getOutputKeySchema(getConf());
+      Schema schema = null;
+      if (isMapOnly) {
+        schema = AvroJob.getMapOutputKeySchema(getConf());
+        if (null == schema) {
+          schema = AvroJob.getOutputKeySchema(getConf());
+        }
+      }
+      else {
+        schema = AvroJob.getOutputKeySchema(getConf());
+      }
       if (null == schema) {
         throw new IllegalStateException(
             "Writer schema for output key was not set. Use AvroJob.setOutputKeySchema().");
@@ -86,7 +97,16 @@ public class AvroDatumConverterFactory e
       return (AvroDatumConverter<IN, OUT>) new AvroWrapperConverter(schema);
     }
     if (AvroValue.class.isAssignableFrom(inputClass)) {
-      Schema schema = AvroJob.getOutputValueSchema(getConf());
+      Schema schema = null;
+      if (isMapOnly) {
+        AvroJob.getMapOutputValueSchema(getConf());
+        if (null == schema) {
+          schema = AvroJob.getOutputValueSchema(getConf());
+        }
+      }
+      else {
+        schema = AvroJob.getOutputValueSchema(getConf());
+      }
       if (null == schema) {
         throw new IllegalStateException(
             "Writer schema for output value was not set. Use AvroJob.setOutputValueSchema().");

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java?rev=1511456&r1=1511455&r2=1511456&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
Wed Aug  7 19:47:42 2013
@@ -82,7 +82,18 @@ public class AvroKeyOutputFormat<T> exte
   public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext
context)
       throws IOException {
     // Get the writer schema.
-    Schema writerSchema = AvroJob.getOutputKeySchema(context.getConfiguration());
+    Schema writerSchema = null;
+    boolean isMapOnly = context.getNumReduceTasks() == 0;
+    if (isMapOnly) {
+      writerSchema = AvroJob.getMapOutputKeySchema(context.getConfiguration());
+      //If the MapOutputKeySchema is not set, try to use the OutputKeySchema
+      if (null == writerSchema) {
+        writerSchema = AvroJob.getOutputKeySchema(context.getConfiguration());
+      }
+    }
+    else {
+      writerSchema = AvroJob.getOutputKeySchema(context.getConfiguration());
+    }
     if (null == writerSchema) {
       throw new IOException(
           "AvroKeyOutputFormat requires an output schema. Use AvroJob.setOutputKeySchema().");

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1511456&r1=1511455&r2=1511456&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
(original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
Wed Aug  7 19:47:42 2013
@@ -115,6 +115,7 @@ public class TestAvroKeyOutputFormat {
     expect(context.getTaskAttemptID())
         .andReturn(TaskAttemptID.forName("attempt_200707121733_0001_m_000000_0"))
         .anyTimes();
+    expect(context.getNumReduceTasks()).andReturn(1);
 
     // Create a mock record writer.
     @SuppressWarnings("unchecked")



Mime
View raw message