avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1044111 - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/file/ lang/java/src/java/org/apache/avro/mapred/ lang/java/src/java/org/apache/avro/tool/ lang/java/src/test/java/org/apache/avro/ share/test/data/
Date Thu, 09 Dec 2010 19:50:55 GMT
Author: cutting
Date: Thu Dec  9 19:50:54 2010
New Revision: 1044111

URL: http://svn.apache.org/viewvc?rev=1044111&view=rev
Log:
AVRO-692. Java: Permit Avro 1.2 format files to be read.

Added:
    avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader12.java
    avro/trunk/share/test/data/test.avro12   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1044111&r1=1044110&r2=1044111&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Dec  9 19:50:54 2010
@@ -41,6 +41,8 @@ Avro 1.5.0 (unreleased)
     AVRO-698. Java: Add MapReduce tests and documentation for jobs
     that mix Avro and non-Avro data. (cutting)
 
+    AVRO-692. Java: Permit Avro 1.2 format files to be read. (cutting)
+
   BUG FIXES
 
     AVRO-675. C: Bytes and fixed setters don't update datum size.

Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java?rev=1044111&r1=1044110&r2=1044111&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java Thu Dec  9 19:50:54
2010
@@ -21,10 +21,12 @@ import java.io.IOException;
 import java.io.EOFException;
 import java.io.InputStream;
 import java.io.File;
+import java.util.Arrays;
 
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.DatumReader;
 import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
+import static org.apache.avro.file.DataFileConstants.MAGIC;
 
 /** Random access to files written with {@link DataFileWriter}.
  * @see DataFileWriter
@@ -34,6 +36,33 @@ public class DataFileReader<D>
   private SeekableInputStream sin;
   private long blockStart;
 
+  /** Open a reader for a file. */
+  public static <D> FileReader<D> openReader(File file, DatumReader<D>
reader)
+    throws IOException {
+    return openReader(new SeekableFileInput(file), reader);
+  }
+
+  /** Open a reader for a file. */
+  public static <D> FileReader<D> openReader(SeekableInput in,
+                                             DatumReader<D> reader)
+    throws IOException {
+    if (in.length() < MAGIC.length)
+      throw new IOException("Not an Avro data file");
+
+    // read magic header
+    byte[] magic = new byte[MAGIC.length];
+    in.seek(0);
+    for (int c = 0; c < magic.length; c = in.read(magic, c, magic.length-c)) {}
+    in.seek(0);
+
+    if (Arrays.equals(MAGIC, magic))              // current format
+      return new DataFileReader<D>(in, reader);
+    if (Arrays.equals(DataFileReader12.MAGIC, magic)) // 1.2 format
+      return new DataFileReader12<D>(in, reader);
+    
+    throw new IOException("Not an Avro data file");
+  }
+
   /** Construct a reader for a file. */
   public DataFileReader(File file, DatumReader<D> reader) throws IOException {
     this(new SeekableFileInput(file), reader);
@@ -108,7 +137,7 @@ public class DataFileReader<D>
 
   @Override public long tell() throws IOException { return sin.tell(); }
 
-  private static class SeekableInputStream extends InputStream 
+  static class SeekableInputStream extends InputStream 
   implements SeekableInput {
     private final byte[] oneByte = new byte[1];
     private SeekableInput in;

Added: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader12.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader12.java?rev=1044111&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader12.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader12.java Thu Dec  9 19:50:54
2010
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.BinaryDecoder;
+
+/** Read files written by Avro version 1.2. */
+public class DataFileReader12<D> implements FileReader<D>, Closeable {
+  private static final byte VERSION = 0;
+  static final byte[] MAGIC = new byte[] {
+    (byte)'O', (byte)'b', (byte)'j', VERSION
+  };
+  private static final long FOOTER_BLOCK = -1;
+  private static final int SYNC_SIZE = 16;
+  private static final int SYNC_INTERVAL = 1000*SYNC_SIZE; 
+
+  private static final String SCHEMA = "schema";
+  private static final String SYNC = "sync";
+  private static final String COUNT = "count";
+  private static final String CODEC = "codec";
+  private static final String NULL_CODEC = "null";
+
+  private Schema schema;
+  private DatumReader<D> reader;
+  private DataFileReader.SeekableInputStream in;
+  private BinaryDecoder vin;
+
+  private Map<String,byte[]> meta = new HashMap<String,byte[]>();
+
+  private long count;                           // # entries in file
+  private long blockCount;                      // # entries in block
+  private long blockStart;
+  private byte[] sync = new byte[SYNC_SIZE];
+  private byte[] syncBuffer = new byte[SYNC_SIZE];
+
+  /** Construct a reader for a file. */
+  public DataFileReader12(SeekableInput sin, DatumReader<D> reader)
+    throws IOException {
+    this.in = new DataFileReader.SeekableInputStream(sin);
+
+    byte[] magic = new byte[4];
+    in.read(magic);
+    if (!Arrays.equals(MAGIC, magic))
+      throw new IOException("Not a data file.");
+
+    long length = in.length();
+    in.seek(length-4);
+    int footerSize=(in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
+    seek(length-footerSize);
+    long l = vin.readMapStart();
+    if (l > 0) {
+      do {
+        for (long i = 0; i < l; i++) {
+          String key = vin.readString(null).toString();
+          ByteBuffer value = vin.readBytes(null);
+          byte[] bb = new byte[value.remaining()];
+          value.get(bb);
+          meta.put(key, bb);
+        }
+      } while ((l = vin.mapNext()) != 0);
+    }
+
+    this.sync = getMeta(SYNC);
+    this.count = getMetaLong(COUNT);
+    String codec = getMetaString(CODEC);
+    if (codec != null && ! codec.equals(NULL_CODEC)) {
+      throw new IOException("Unknown codec: " + codec);
+    }
+    this.schema = Schema.parse(getMetaString(SCHEMA));
+    this.reader = reader;
+
+    reader.setSchema(schema);
+
+    seek(MAGIC.length);         // seek to start
+  }
+
+  /** Return the value of a metadata property. */
+  public synchronized byte[] getMeta(String key) {
+    return meta.get(key);
+  }
+  /** Return the value of a metadata property. */
+  public synchronized String getMetaString(String key) {
+    byte[] value = getMeta(key);
+    if (value == null) {
+      return null;
+    }
+    try {
+      return new String(value, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  /** Return the value of a metadata property. */
+  public synchronized long getMetaLong(String key) {
+    return Long.parseLong(getMetaString(key));
+  }
+
+  /** Return the schema used in this file. */
+  public Schema getSchema() { return schema; }
+
+  // Iterator and Iterable implementation
+  private D peek;
+  @Override public Iterator<D> iterator() { return this; }
+  @Override public boolean hasNext() {
+    if (peek != null || blockCount != 0)
+      return true;
+    this.peek = next();
+    return peek != null;
+  }
+  @Override public D next() {
+    if (peek != null) {
+      D result = peek;
+      peek = null;
+      return result;
+    }
+    try {
+      return next(null);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  @Override public void remove() { throw new UnsupportedOperationException(); }
+
+  /** Return the next datum in the file. */
+  public synchronized D next(D reuse) throws IOException {
+    while (blockCount == 0) {                     // at start of block
+
+      if (in.tell() == in.length())               // at eof
+        return null;
+
+      skipSync();                                 // skip a sync
+
+      blockCount = vin.readLong();                // read blockCount
+         
+      if (blockCount == FOOTER_BLOCK) { 
+        seek(vin.readLong()+in.tell());           // skip a footer
+      }
+    }
+    blockCount--;
+    return reader.read(reuse, vin);
+  }
+
+  private void skipSync() throws IOException {
+    vin.readFixed(syncBuffer);
+    if (!Arrays.equals(syncBuffer, sync))
+      throw new IOException("Invalid sync!");
+  }
+
+  /** Move to the specified synchronization point, as returned by {@link
+   * DataFileWriter#sync()}. */
+  public synchronized void seek(long position) throws IOException {
+    in.seek(position);
+    blockCount = 0;
+    blockStart = position;
+    vin = DecoderFactory.defaultFactory().createBinaryDecoder(in, vin);
+  }
+
+  /** Move to the next synchronization point after a position. */
+  public synchronized void sync(long position) throws IOException {
+    if (in.tell()+SYNC_SIZE >= in.length()) {
+      seek(in.length());
+      return;
+    }
+    in.seek(position);
+    vin.readFixed(syncBuffer);
+    for (int i = 0; in.tell() < in.length(); i++) {
+      int j = 0;
+      for (; j < sync.length; j++) {
+        if (sync[j] != syncBuffer[(i+j)%sync.length])
+          break;
+      }
+      if (j == sync.length) {                     // position before sync
+        seek(in.tell() - SYNC_SIZE);
+        return;
+      }
+      syncBuffer[i%sync.length] = (byte)in.read();
+    }
+    seek(in.length());
+  }
+
+  /** Return true if past the next synchronization point after a position. */ 
+  public boolean pastSync(long position) throws IOException {
+    return ((blockStart >= position+SYNC_SIZE)||(blockStart >= in.length()));
+  }
+
+  /** Return the current position in the input. */
+  public long tell() throws IOException { return in.tell(); }
+
+  /** Close this reader. */
+  public synchronized void close() throws IOException {
+    in.close();
+  }
+
+}

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java?rev=1044111&r1=1044110&r2=1044111&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroRecordReader.java Thu Dec  9
19:50:54 2010
@@ -39,7 +39,7 @@ public class AvroRecordReader<T>
 
   public AvroRecordReader(JobConf job, FileSplit split)
     throws IOException {
-    this(new DataFileReader<T>
+    this(DataFileReader.openReader
          (new FsInput(split.getPath(), job),
           new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
          split);

Modified: avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java?rev=1044111&r1=1044110&r2=1044111&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/tool/DataFileReadTool.java Thu Dec  9 19:50:54
2010
@@ -57,7 +57,7 @@ public class DataFileReadTool implements
 
     GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
     FileReader<Object> fileReader =
-      new DataFileReader<Object>(new File(args.get(0)), reader);
+      DataFileReader.openReader(new File(args.get(0)), reader);
     try {
       Schema schema = fileReader.getSchema();
       DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java?rev=1044111&r1=1044110&r2=1044111&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFile.java Thu Dec  9 19:50:54
2010
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.FileReader;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumReader;
@@ -214,10 +215,14 @@ public class TestDataFile {
     }
   }  
 
+  @Test public void test12() throws IOException {
+    readFile(new File("../../share/test/data/test.avro12"),
+             new GenericDatumReader<Object>());
+  }
+
   protected void readFile(File f, DatumReader<Object> datumReader)
     throws IOException {
-    System.out.println("Reading "+ f.getName());
-    DataFileReader<Object> reader = new DataFileReader<Object>(f, datumReader);
+    FileReader<Object> reader = DataFileReader.openReader(f, datumReader);
     for (Object datum : reader) {
       assertNotNull(datum);
     }

Added: avro/trunk/share/test/data/test.avro12
URL: http://svn.apache.org/viewvc/avro/trunk/share/test/data/test.avro12?rev=1044111&view=auto
==============================================================================
Binary file - no diff available.

Propchange: avro/trunk/share/test/data/test.avro12
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream



Mime
View raw message