avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1096798 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/
Date Tue, 26 Apr 2011 16:33:43 GMT
Author: cutting
Date: Tue Apr 26 16:33:43 2011
New Revision: 1096798

URL: http://svn.apache.org/viewvc?rev=1096798&view=rev
Log:
AVRO-808. Java: Add AvroAsTextInputFormat for use with streaming.  Contributed by Tom White.

Added:
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1096798&r1=1096797&r2=1096798&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Apr 26 16:33:43 2011
@@ -20,6 +20,9 @@ Avro 1.5.1 (unreleased)
     AVRO-788. Java: Add Snappy compression for data files, including
     MapReduce API support. (cutting)
 
+    AVRO-808. Java: Add AvroAsTextInputFormat for use with streaming.
+    (Tom White via cutting)
+
   IMPROVEMENTS
 
     AVRO-785. Java: Squash a Velocity warning by upgrading to Velocity 1.7.

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java?rev=1096798&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java
(added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextInputFormat.java
Tue Apr 26 16:33:43 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * An {@link org.apache.hadoop.mapred.InputFormat} for Avro data files, which
+ * converts each datum to string form in the input key. The input value is
+ * always empty. The string representation is
+ * <a href="http://www.json.org/">JSON</a>.
+ * <p>
+ * This {@link org.apache.hadoop.mapred.InputFormat} is useful for applications
+ * that wish to process Avro data using tools like MapReduce Streaming.
+ */
+public class AvroAsTextInputFormat extends FileInputFormat<Text, Text> {
+
+  @Override
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus file : super.listStatus(job))
+      if (file.getPath().getName().endsWith(AvroOutputFormat.EXT))
+        result.add(file);
+    return result.toArray(new FileStatus[0]);
+  }
+  
+  @Override
+  public RecordReader<Text, Text>
+    getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+    throws IOException {
+    reporter.setStatus(split.toString());
+    return new AvroAsTextRecordReader(job, (FileSplit) split);
+  }
+}

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java?rev=1096798&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java
(added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroAsTextRecordReader.java
Tue Apr 26 16:33:43 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+class AvroAsTextRecordReader<T> implements RecordReader<Text, Text> {
+
+  private FileReader<T> reader;
+  private T datum;
+  private long start;
+  private long end;
+
+  public AvroAsTextRecordReader(JobConf job, FileSplit split)
+    throws IOException {
+    this(DataFileReader.openReader
+         (new FsInput(split.getPath(), job), new GenericDatumReader<T>()), split);
+  }
+
+  protected AvroAsTextRecordReader(FileReader<T> reader, FileSplit split)
+    throws IOException {
+    this.reader = reader;
+    reader.sync(split.getStart());                    // sync to start
+    this.start = reader.tell();
+    this.end = split.getStart() + split.getLength();
+  }
+
+  public Text createKey() {
+    return new Text();
+  }
+  
+  public Text createValue() {
+    return new Text();
+  }
+    
+  public boolean next(Text key, Text ignore) throws IOException {
+    if (!reader.hasNext() || reader.pastSync(end))
+      return false;
+    datum = reader.next(datum);
+    if (datum instanceof ByteBuffer) {
+      ByteBuffer b = (ByteBuffer) datum;
+      if (b.hasArray()) {
+        int offset = b.arrayOffset();
+        int start = b.position();
+        int length = b.remaining();
+        key.set(b.array(), offset + start, offset + start + length);
+      } else {
+        byte[] bytes = new byte[b.remaining()];
+        b.duplicate().get(bytes);
+        key.set(bytes);
+      }
+    } else {
+      key.set(datum.toString());
+    }
+    return true;
+  }
+  
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+    }
+  }
+  
+  public long getPos() throws IOException {
+    return reader.tell();
+  }
+
+  public void close() throws IOException { reader.close(); }
+  
+
+}

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java?rev=1096798&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
(added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
Tue Apr 26 16:33:43 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestAvroAsTextInputFormat {
+  
+  @Test
+  /**
+   * Run the identity job on a "bytes" Avro file using AvroAsTextInputFormat
+   * and check the output is a sorted text file.
+   */
+  public void testSort() throws Exception {
+    JobConf job = new JobConf();
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+    
+    outputPath.getFileSystem(job).delete(outputPath);
+    WordCountUtil.writeLinesBytesFile();
+    
+    job.setInputFormat(AvroAsTextInputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+    FileOutputFormat.setOutputPath(job, outputPath);
+    
+    JobClient.runJob(job);
+    
+    WordCountUtil.validateSortedFile();
+  }
+
+}

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=1096798&r1=1096797&r2=1096798&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java Tue
Apr 26 16:33:43 2011
@@ -20,13 +20,19 @@ package org.apache.avro.mapred;
 
 import static org.junit.Assert.*;
 
+import java.io.BufferedReader;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.File;
 import java.io.InputStream;
 import java.io.FileInputStream;
 import java.io.BufferedInputStream;
 import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.StringTokenizer;
 import java.util.Map;
 import java.util.TreeMap;
@@ -53,6 +59,8 @@ class WordCountUtil {
     = new File(new File(DIR, "in"), "lines.txt");
   private static final File COUNTS_FILE
     = new File(new File(DIR, "out"), "part-00000.avro");
+  private static final File SORTED_FILE
+    = new File(new File(DIR, "out"), "part-00000");
 
   public static final String[] LINES = new String[] {
     "the quick brown fox jumps over the lazy dog",
@@ -84,6 +92,17 @@ class WordCountUtil {
       out.append(new Utf8(line));
     out.close();
   }
+
+  public static void writeLinesBytesFile() throws IOException {
+    FileUtil.fullyDelete(DIR);
+    DatumWriter<ByteBuffer> writer = new GenericDatumWriter<ByteBuffer>();
+    DataFileWriter<ByteBuffer> out = new DataFileWriter<ByteBuffer>(writer);
+    LINES_FILE.getParentFile().mkdirs();
+    out.create(Schema.create(Schema.Type.BYTES), LINES_FILE);
+    for (String line : LINES)
+      out.append(ByteBuffer.wrap(line.getBytes("UTF-8")));
+    out.close();
+  }
   
   public static void writeLinesTextFile() throws IOException {
     FileUtil.fullyDelete(DIR);
@@ -110,6 +129,19 @@ class WordCountUtil {
     in.close();
     assertEquals(COUNTS.size(), numWords);
   }
+  
+  public static void validateSortedFile() throws Exception {
+    BufferedReader reader = new BufferedReader(new FileReader(SORTED_FILE));
+    List<String> sortedLines = new ArrayList<String>();
+    for (String line : LINES) {
+      sortedLines.add(line);
+    }
+    Collections.sort(sortedLines);
+    for (String expectedLine : sortedLines) {
+      assertEquals(expectedLine, reader.readLine().trim());
+    }
+    assertNull(reader.readLine());
+  }
 
   // metadata tests
   private static final String STRING_KEY = "string-key";



Mime
View raw message