avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1556378 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/mapreduce/ lang/java/mapred/src/test/java/org/apache/avro/mapreduce/
Date Tue, 07 Jan 2014 22:10:35 GMT
Author: cutting
Date: Tue Jan  7 22:10:34 2014
New Revision: 1556378

URL: http://svn.apache.org/r1556378
Log:
AVRO-1418. Java: Add sync support to AvroMultipleOutputs.  Contributed by Deepak Kumar V.

Added:
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java   (with
props)
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
  (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jan  7 22:10:34 2014
@@ -30,6 +30,9 @@ Trunk (not yet released)
     AVRO-1414. C++: Add support for deflate-compressed data files.
     (Daniel Russel via cutting)
 
+    AVRO-1418. Java: Add sync support to AvroMultipleOutputs.
+    (Deepak Kumar V via cutting)
+
   OPTIMIZATIONS
 
     AVRO-1348. Java: Improve UTF-8 to String conversion performance in

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
Tue Jan  7 22:10:34 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskA
  *
  * @param <T> The Java type of the Avro data to write.
  */
-public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable>
{
+public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable>
implements Syncable {
   /** A writer for the Avro container file. */
   private final DataFileWriter<T> mAvroFileWriter;
 
@@ -82,4 +82,10 @@ public class AvroKeyRecordWriter<T> exte
   public void close(TaskAttemptContext context) throws IOException {
     mAvroFileWriter.close();
   }
+  
+  /** {@inheritDoc} */
+  @Override
+  public long sync() throws IOException {
+    return mAvroFileWriter.sync();
+  }  
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
Tue Jan  7 22:10:34 2014
@@ -22,13 +22,13 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.avro.Schema;
-import org.apache.avro.hadoop.io.AvroKeyValue;
-import org.apache.avro.hadoop.io.AvroDatumConverter;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.hadoop.io.AvroKeyValue;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.TaskA
  * @param <K> The type of key to write.
  * @param <V> The type of value to write.
  */
-public class AvroKeyValueRecordWriter<K, V> extends RecordWriter<K, V> {
+public class AvroKeyValueRecordWriter<K, V> extends RecordWriter<K, V> implements
Syncable {
   /** A writer for the Avro container file. */
   private final DataFileWriter<GenericRecord> mAvroFileWriter;
 
@@ -132,4 +132,10 @@ public class AvroKeyValueRecordWriter<K,
   public void close(TaskAttemptContext context) throws IOException {
     mAvroFileWriter.close();
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public long sync() throws IOException {
+    return mAvroFileWriter.sync();
+  }  
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
Tue Jan  7 22:10:34 2014
@@ -19,26 +19,26 @@ package org.apache.avro.mapreduce;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.List;
-import java.util.Set;
-import java.util.HashMap;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
 
+import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.avro.Schema;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * The AvroMultipleOutputs class simplifies writing Avro output data 
@@ -439,7 +439,35 @@ public class AvroMultipleOutputs{
     TaskAttemptContext taskContext = createTaskAttemptContext(job.getConfiguration(), context.getTaskAttemptID());
     getRecordWriter(taskContext, baseOutputPath).write(key, value);
   }
-    
+
+  /**
+   * 
+   * Gets the record writer from job's output format. Job's output format should
+   * be a FileOutputFormat.If the record writer implements Syncable then returns 
+   * the current position as a value that may be passed to DataFileReader.seek(long)
+   * otherwise returns -1. 
+   * Forces the end of the current block, emitting a synchronization marker.
+   * 
+   * @param namedOutput   the namedOutput
+   * @param baseOutputPath base-output path to write the record to. Note: Framework will
+   *          generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public long sync(String namedOutput, String baseOutputPath) throws IOException, InterruptedException
{
+   checkNamedOutputName(context, namedOutput, false);
+   checkBaseOutputPath(baseOutputPath);
+   if (!namedOutputs.contains(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'");
+   }
+   TaskAttemptContext taskContext = getContext(namedOutput);
+   RecordWriter recordWriter = getRecordWriter(taskContext, baseOutputPath);
+   long position = -1;
+   if (recordWriter instanceof Syncable) {
+      Syncable syncableWriter = (Syncable) recordWriter;
+      position = syncableWriter.sync();
+   }
+   return position;
+ }
   // by being synchronized MultipleOutputTask can be use with a
   // MultithreadedMapper.
   @SuppressWarnings("unchecked")

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java?rev=1556378&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java Tue
Jan  7 22:10:34 2014
@@ -0,0 +1,32 @@
+package org.apache.avro.mapreduce;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.IOException;
+
+public interface Syncable {
+
+  /**
+   * Return the current position as a value that may be passed to DataFileReader.seek(long).
+   * Forces the end of the current block, emitting a synchronization marker.
+   *
+   * @throws IOException - if an error occurred while attempting to sync.
+   */
+  long sync() throws IOException;
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/Syncable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
(original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
Tue Jan  7 22:10:34 2014
@@ -18,25 +18,34 @@
 
 package org.apache.avro.mapreduce;
 
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.*;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.FsInput;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.easymock.EasyMock;
 import org.junit.Test;
 
 public class TestAvroKeyRecordWriter {
@@ -73,4 +82,43 @@ public class TestAvroKeyRecordWriter {
 
     dataFileReader.close();
   }
+  
+  @Test
+  public void testSycnableWrite() throws IOException {
+    Schema writerSchema = Schema.create(Schema.Type.INT);
+    GenericData dataModel = new ReflectData();
+    CodecFactory compressionCodec = CodecFactory.nullCodec();
+    FileOutputStream outputStream = new FileOutputStream(new File("target/temp.avro"));
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+
+    replay(context);
+
+    // Write an avro container file with two records: 1 and 2.
+    AvroKeyRecordWriter<Integer> recordWriter = new AvroKeyRecordWriter<Integer>(
+        writerSchema, dataModel, compressionCodec, outputStream);
+    long positionOne = recordWriter.sync();
+    recordWriter.write(new AvroKey<Integer>(1), NullWritable.get());
+    long positionTwo = recordWriter.sync();
+    recordWriter.write(new AvroKey<Integer>(2), NullWritable.get());
+    recordWriter.close(context);
+
+    verify(context);
+
+    // Verify that the file was written as expected.
+	Configuration conf = new Configuration();
+	conf.set("fs.default.name", "file:///");
+	Path avroFile = new Path("target/temp.avro");
+	DataFileReader<GenericData.Record> dataFileReader = new DataFileReader<GenericData.Record>(new
FsInput(avroFile,
+			conf), new SpecificDatumReader<GenericData.Record>());
+
+    dataFileReader.seek(positionTwo);
+    assertTrue(dataFileReader.hasNext());  // Record 2.
+    assertEquals(2, dataFileReader.next());
+
+	dataFileReader.seek(positionOne);
+    assertTrue(dataFileReader.hasNext());  // Record 1.
+    assertEquals(1, dataFileReader.next());
+    
+    dataFileReader.close();
+  }  
 }

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java?rev=1556378&r1=1556377&r2=1556378&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
(original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
Tue Jan  7 22:10:34 2014
@@ -18,25 +18,37 @@
 
 package org.apache.avro.mapreduce;
 
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.*;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.hadoop.io.AvroDatumConverter;
 import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
 import org.apache.avro.hadoop.io.AvroKeyValue;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.FsInput;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -156,4 +168,68 @@ public class TestAvroKeyValueRecordWrite
     assertEquals("reflectionData", firstRecord.getKey().toString());
     assertEquals(record.attribute, firstRecord.getValue().attribute);
   }
+ 
+  @Test
+  public void testSyncableWriteRecords() throws IOException {
+    Job job = new Job();
+    AvroJob.setOutputValueSchema(job, TextStats.SCHEMA$);
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+
+    replay(context);
+
+    AvroDatumConverterFactory factory = new AvroDatumConverterFactory(job.getConfiguration());
+    AvroDatumConverter<Text, ?> keyConverter = factory.create(Text.class);
+    AvroValue<TextStats> avroValue = new AvroValue<TextStats>(null);
+    @SuppressWarnings("unchecked")
+    AvroDatumConverter<AvroValue<TextStats>, ?> valueConverter
+        = factory.create((Class<AvroValue<TextStats>>) avroValue.getClass());
+    CodecFactory compressionCodec = CodecFactory.nullCodec();
+    FileOutputStream outputStream = new FileOutputStream(new File("target/temp.avro"));
+
+    // Write a marker followed by each record: <'apple', TextStats('apple')> and <'banana',
TextStats('banana')>.
+    AvroKeyValueRecordWriter<Text, AvroValue<TextStats>> writer
+        = new AvroKeyValueRecordWriter<Text, AvroValue<TextStats>>(keyConverter,
valueConverter,
+            new ReflectData(), compressionCodec, outputStream);
+    TextStats appleStats = new TextStats();
+    appleStats.name = "apple";
+    long pointOne = writer.sync();
+    writer.write(new Text("apple"), new AvroValue<TextStats>(appleStats));
+    TextStats bananaStats = new TextStats();
+    bananaStats.name = "banana";
+    long pointTwo = writer.sync();
+    writer.write(new Text("banana"), new AvroValue<TextStats>(bananaStats));
+    writer.close(context);
+
+    verify(context);
+
+	Configuration conf = new Configuration();
+	conf.set("fs.default.name", "file:///");
+	Path avroFile = new Path("target/temp.avro");
+	DataFileReader<GenericData.Record> avroFileReader = new DataFileReader<GenericData.Record>(new
FsInput(avroFile,
+			conf), new SpecificDatumReader<GenericData.Record>());
+    
+	
+	avroFileReader.seek(pointTwo);
+    // Verify that the second record was written;
+    assertTrue(avroFileReader.hasNext());
+    AvroKeyValue<CharSequence, TextStats> secondRecord
+        = new AvroKeyValue<CharSequence, TextStats>(avroFileReader.next());
+    assertNotNull(secondRecord.get());
+    assertEquals("banana", secondRecord.getKey().toString());
+    assertEquals("banana", secondRecord.getValue().name.toString());
+
+    
+	avroFileReader.seek(pointOne);
+    // Verify that the first record was written.
+    assertTrue(avroFileReader.hasNext());
+    AvroKeyValue<CharSequence, TextStats> firstRecord
+        = new AvroKeyValue<CharSequence, TextStats>(avroFileReader.next());
+    assertNotNull(firstRecord.get());
+    assertEquals("apple", firstRecord.getKey().toString());
+    assertEquals("apple", firstRecord.getValue().name.toString());
+
+
+    // That's all, folks.
+    avroFileReader.close();
+  }  
 }

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java?rev=1556378&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
(added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
Tue Jan  7 22:10:34 2014
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroMultipleOutputsSyncable {
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  public static final Schema STATS_SCHEMA =
+      Schema.parse("{\"name\":\"stats\",\"type\":\"record\","
+          + "\"fields\":[{\"name\":\"count\",\"type\":\"int\"},"
+          + "{\"name\":\"name\",\"type\":\"string\"}]}");
+  public static final Schema STATS_SCHEMA_2 = 
+      Schema.parse("{\"name\":\"stats\",\"type\":\"record\","
+          + "\"fields\":[{\"name\":\"count1\",\"type\":\"int\"},"
+          + "{\"name\":\"name1\",\"type\":\"string\"}]}");  
+
+  private static class LineCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
+    private IntWritable mOne;
+
+    @Override
+    protected void setup(Context context) {
+      mOne = new IntWritable(1);
+    }
+
+    @Override
+    protected void map(LongWritable fileByteOffset, Text line, Context context)
+        throws IOException, InterruptedException {
+      context.write(line, mOne);
+    }
+  }
+
+  private static class StatCountMapper
+      extends Mapper<AvroKey<TextStats>, NullWritable, Text, IntWritable> {
+    private IntWritable mCount;
+    private Text mText;
+
+    @Override
+    protected void setup(Context context) {
+      mCount = new IntWritable(0);
+      mText = new Text("");
+    }
+
+    @Override
+    protected void map(AvroKey<TextStats> record, NullWritable ignore, Context context)
+        throws IOException, InterruptedException {
+      mCount.set(record.datum().count);
+      mText.set(record.datum().name.toString());
+      context.write(mText, mCount);
+    }
+  }
+
+  private static class GenericStatsReducer
+      extends Reducer<Text, IntWritable, AvroKey<GenericData.Record>, NullWritable>
{
+    private AvroKey<GenericData.Record> mStats;
+    private AvroMultipleOutputs amos;
+
+    @Override
+    protected void setup(Context context) {
+      mStats = new AvroKey<GenericData.Record>(null);
+      amos = new AvroMultipleOutputs(context);
+    }
+
+    @Override
+    protected void reduce(Text line, Iterable<IntWritable> counts, Context context)
+        throws IOException, InterruptedException {
+      GenericData.Record record = new GenericData.Record(STATS_SCHEMA);
+      GenericData.Record record2 = new GenericData.Record(STATS_SCHEMA_2);
+      int sum = 0;
+      for (IntWritable count : counts) {
+        sum += count.get();
+      }
+      record.put("name", new Utf8(line.toString()));
+      record.put("count", new Integer(sum));
+      mStats.datum(record);
+      context.write(mStats, NullWritable.get());
+      amos.sync("myavro","myavro");
+      amos.write("myavro",mStats,NullWritable.get());
+      record2.put("name1", new Utf8(line.toString()));
+      record2.put("count1", new Integer(sum));
+      mStats.datum(record2); 
+      amos.write(mStats, NullWritable.get(), STATS_SCHEMA_2, null, "testnewwrite2");
+      amos.sync("myavro1","myavro1");
+      amos.write("myavro1",mStats);
+      amos.write(mStats, NullWritable.get(), STATS_SCHEMA, null, "testnewwrite");
+      amos.write(mStats, NullWritable.get(), "testwritenonschema");
+    }
+   
+    @Override
+    protected void cleanup(Context context) throws IOException,InterruptedException
+    {
+      amos.close();
+    }
+  }
+
+  private static class SpecificStatsReducer
+      extends Reducer<Text, IntWritable, AvroKey<TextStats>, NullWritable> {
+    private AvroKey<TextStats> mStats;
+    private AvroMultipleOutputs amos;
+    @Override
+    protected void setup(Context context) {
+      mStats = new AvroKey<TextStats>(null);
+      amos = new AvroMultipleOutputs(context);
+    }
+
+    @Override
+    protected void reduce(Text line, Iterable<IntWritable> counts, Context context)
+        throws IOException, InterruptedException {
+      TextStats record = new TextStats();
+      record.count = 0;
+      for (IntWritable count : counts) {
+        record.count += count.get();
+      }
+      record.name = line.toString();
+      mStats.datum(record);
+      context.write(mStats, NullWritable.get());
+      amos.sync("myavro3","myavro3");
+      amos.write("myavro3",mStats,NullWritable.get());
+    }
+    @Override
+    protected void cleanup(Context context) throws IOException,InterruptedException
+    {
+      amos.close();
+    }
+  }
+
+  private static class SortMapper
+      extends Mapper<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>,
NullWritable> {
+    @Override
+    protected void map(AvroKey<TextStats> key, NullWritable value, Context context)
+        throws IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+  private static class SortReducer
+      extends Reducer<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>,
NullWritable> {
+    @Override
+    protected void reduce(AvroKey<TextStats> key, Iterable<NullWritable> ignore,
Context context)
+        throws IOException, InterruptedException {
+      context.write(key, NullWritable.get());
+    }
+  }
+
+  @Test
+  public void testAvroGenericOutput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+            .toURI().toString()));
+    job.setInputFormatClass(TextInputFormat.class);
+
+    job.setMapperClass(LineCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    job.setReducerClass(GenericStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, STATS_SCHEMA);    
+    AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroKeyOutputFormat.class,STATS_SCHEMA,null);
+    AvroMultipleOutputs.addNamedOutput(job,"myavro1", AvroKeyOutputFormat.class, STATS_SCHEMA_2);

+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+    outputPath.getFileSystem(job.getConfiguration()).delete(outputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/myavro-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<GenericData.Record> reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+      counts.put(((Utf8) record.get("name")).toString(), (Integer) record.get("count"));
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+
+    outputFiles = fileSystem.globStatus(outputPath.suffix("/myavro1-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new GenericDatumReader<GenericData.Record>(STATS_SCHEMA_2));
+    counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+      counts.put(((Utf8) record.get("name1")).toString(), (Integer) record.get("count1"));
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  
+    outputFiles = fileSystem.globStatus(outputPath.suffix("/testnewwrite-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+            new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+    counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+       counts.put(((Utf8) record.get("name")).toString(), (Integer) record.get("count"));
+    }
+    reader.close();
+    
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+        
+    outputFiles = fileSystem.globStatus(outputPath.suffix("/testnewwrite2-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new GenericDatumReader<GenericData.Record>(STATS_SCHEMA_2));
+    counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+     counts.put(((Utf8) record.get("name1")).toString(), (Integer) record.get("count1"));
+    }
+    reader.close();
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+    
+    outputFiles = fileSystem.globStatus(outputPath.suffix("/testwritenonschema-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+    counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+      counts.put(((Utf8) record.get("name")).toString(), (Integer) record.get("count"));
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+    
+    
+  }
+
+  @Test
+  public void testAvroSpecificOutput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+            .toURI().toString()));
+    job.setInputFormatClass(TextInputFormat.class);
+
+    job.setMapperClass(LineCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    AvroMultipleOutputs.addNamedOutput(job,"myavro3",AvroKeyOutputFormat.class,TextStats.SCHEMA$,null);
+
+    job.setReducerClass(SpecificStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out-specific");
+    outputPath.getFileSystem(job.getConfiguration()).delete(outputPath); 
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/myavro3-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new SpecificDatumReader<TextStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (TextStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
+  public void testAvroInput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+            .toURI().toString()));
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    AvroJob.setInputKeySchema(job, TextStats.SCHEMA$);
+    AvroMultipleOutputs.addNamedOutput(job,"myavro3",AvroKeyOutputFormat.class,TextStats.SCHEMA$,null);
+
+    job.setMapperClass(StatCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    job.setReducerClass(SpecificStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-specific-input");
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/myavro3-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new SpecificDatumReader<TextStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (TextStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
+  public void testAvroMapOutput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+            .toURI().toString()));
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    AvroJob.setInputKeySchema(job, TextStats.SCHEMA$);
+
+    job.setMapperClass(SortMapper.class);
+    AvroJob.setMapOutputKeySchema(job, TextStats.SCHEMA$);
+    job.setMapOutputValueClass(NullWritable.class);
+
+    job.setReducerClass(SortReducer.class);
+    AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-specific-input");
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/part-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new SpecificDatumReader<TextStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (TextStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputsSyncable.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message