avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1151660 - in /avro/trunk: CHANGES.txt lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java
Date Wed, 27 Jul 2011 22:52:47 GMT
Author: cutting
Date: Wed Jul 27 22:52:46 2011
New Revision: 1151660

URL: http://svn.apache.org/viewvc?rev=1151660&view=rev
Log:
AVRO-864. Java: Fix reflect to be able to write unions containing generic and/or specific
records.  Contributed by Isabel Drost.

Added:
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1151660&r1=1151659&r2=1151660&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Jul 27 22:52:46 2011
@@ -45,6 +45,9 @@ Avro 1.6.0 (unreleased)
     AVRO-824. Java: Fix usage message of BinaryFragmentToJsonTool.
     (Jakob Homan via cutting)
 
+    AVRO-864. Java: Fix reflect to be able to write unions containing
+    generic and/or specific records.  (Isabel Drost & cutting)
+
 Avro 1.5.2 (unreleased)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1151660&r1=1151659&r2=1151660&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Wed Jul
27 22:52:46 2011
@@ -41,6 +41,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Protocol.Message;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.FixedSize;
 import org.apache.avro.io.BinaryData;
@@ -101,6 +102,7 @@ public class ReflectData extends Specifi
   @Override
   protected boolean isRecord(Object datum) {
     if (datum == null) return false;
+    if (super.isRecord(datum)) return true;
     return getSchema(datum.getClass()).getType() == Schema.Type.RECORD;
   }
 
@@ -120,6 +122,8 @@ public class ReflectData extends Specifi
 
   @Override
   protected Schema getRecordSchema(Object record) {
+    if (record instanceof GenericContainer)
+      return super.getRecordSchema(record);
     return getSchema(record.getClass());
   }
 

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java?rev=1151660&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java Wed
Jul 27 22:52:46 2011
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroOutputFormat;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestGenericJob {
+  private static final String dir =
+    System.getProperty("test.dir", ".") + "target/testGenericJob";
+
+  private static Schema createSchema() {
+    List<Field> fields = new ArrayList<Schema.Field>();
+
+      
+    fields.add(new Field("Optional", createArraySchema(), "",
+                         JsonNodeFactory.instance.arrayNode()));
+
+    Schema recordSchema =
+      Schema.createRecord("Container", "", "org.apache.avro.mapred", false);
+    recordSchema.setFields(fields);
+    return recordSchema;
+  }
+
+  private static Schema createArraySchema() {
+    List<Schema> schemas = new ArrayList<Schema>();
+    for (int i = 0; i < 5; i++) {
+      schemas.add(createInnerSchema("optional_field_" + i));
+    }
+        
+    Schema unionSchema = Schema.createUnion(schemas);
+    return Schema.createArray(unionSchema);
+  }
+
+  private static Schema createInnerSchema(String name) {
+    Schema innerrecord = Schema.createRecord(name, "", "", false);
+    innerrecord.setFields
+      (Arrays.asList(new Field(name, Schema.create(Type.LONG), "",
+                               JsonNodeFactory.instance.numberNode(0l))));
+    return innerrecord;
+  }
+
+  @Before
+    public void setup() throws IOException {
+    // needed to satisfy the framework only - input ignored in mapper
+    File indir = new File(dir);
+    indir.mkdirs();
+    File infile = new File(dir + "/in");
+    RandomAccessFile file = new RandomAccessFile(infile, "rw");
+    // add some data so framework actually calls our mapper
+    file.writeChars("aa bb cc\ndd ee ff\n");
+    file.close();
+  }
+    
+  @After
+    public void tearDown() throws IOException {
+    FileUtil.fullyDelete(new File(dir));
+  }
+
+  static class AvroTestConverter
+    extends MapReduceBase
+    implements Mapper<LongWritable, Text,
+               AvroWrapper<Pair<Long, GenericData.Record>>, NullWritable>
{
+      
+    public void map(LongWritable key, Text value, 
+                    OutputCollector<AvroWrapper<Pair<Long,GenericData.Record>>,NullWritable>
out, 
+                    Reporter reporter) throws IOException {
+      GenericData.Record optional_entry =
+        new GenericData.Record(createInnerSchema("optional_field_1"));
+      optional_entry.put("optional_field_1", 0l);
+      GenericData.Array<GenericData.Record> array =
+        new GenericData.Array<GenericData.Record>(1, createArraySchema());
+      array.add(optional_entry);
+
+      GenericData.Record container = new GenericData.Record(createSchema());
+      container.put("Optional", array);
+
+      out.collect(new AvroWrapper<Pair<Long,GenericData.Record>>
+                  (new Pair<Long,GenericData.Record>(key.get(), container)),
+                  NullWritable.get());
+    }
+  }  
+
+
+  @Test
+    public void testJob() throws Exception {
+    JobConf job = new JobConf();
+    Path outputPath = new Path(dir + "/out");
+    outputPath.getFileSystem(job).delete(outputPath);
+        
+    job.setInputFormat(TextInputFormat.class);
+    FileInputFormat.setInputPaths(job, dir + "/in");
+        
+    job.setMapperClass(AvroTestConverter.class);
+    job.setNumReduceTasks(0);
+
+    FileOutputFormat.setOutputPath(job, outputPath);
+    System.out.println(createSchema());
+    AvroJob.setOutputSchema(job,
+                            Pair.getPairSchema(Schema.create(Schema.Type.LONG),
+                                               createSchema()));
+    job.setOutputFormat(AvroOutputFormat.class);
+
+    JobClient.runJob(job);
+  }
+}
+
+



Mime
View raw message