crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [15/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
new file mode 100644
index 0000000..5ce970f
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.util.Collection;
+
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroKeyComparator;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.PairMapFn;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PGroupedTableType;
+
+/**
+ *
+ *
+ */
+public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
+
+  private static final AvroPairConverter CONVERTER = new AvroPairConverter();
+  private final MapFn inputFn;
+  private final MapFn outputFn;
+  
+  public AvroGroupedTableType(AvroTableType<K, V> tableType) {
+    super(tableType);
+    AvroType keyType = (AvroType) tableType.getKeyType();
+    AvroType valueType = (AvroType) tableType.getValueType();
+    this.inputFn =  new PairIterableMapFn(keyType.getInputMapFn(),
+        valueType.getInputMapFn());
+    this.outputFn = new PairMapFn(keyType.getOutputMapFn(),
+        valueType.getOutputMapFn());
+  }
+
+  @Override
+  public Class<Pair<K, Iterable<V>>> getTypeClass() {
+    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();  
+  }
+
+  @Override
+  public Converter getGroupingConverter() {
+    return CONVERTER;
+  }
+
+  @Override
+  public MapFn getInputMapFn() {
+    return inputFn;
+  }
+  
+  @Override
+  public MapFn getOutputMapFn() {
+    return outputFn;
+  }
+  
+  @Override
+  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
+    return PTables.getGroupedDetachedValue(this, value);
+  }
+
+  @Override
+  public void configureShuffle(Job job, GroupingOptions options) {
+    AvroTableType<K, V> att = (AvroTableType<K, V>) tableType;
+    String schemaJson = att.getSchema().toString();
+    Configuration conf = job.getConfiguration();
+    
+    if (!att.isSpecific()) {
+        conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
+    }
+    conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson);
+    job.setSortComparatorClass(AvroKeyComparator.class);
+    job.setMapOutputKeyClass(AvroKey.class);
+    job.setMapOutputValueClass(AvroValue.class);
+    if (options != null) {
+      options.configure(job);
+    }
+    
+    Avros.configureReflectDataFactory(conf);
+    
+    Collection<String> serializations =
+        job.getConfiguration().getStringCollection("io.serializations");
+    if (!serializations.contains(SafeAvroSerialization.class.getName())) {
+      serializations.add(SafeAvroSerialization.class.getName());
+      job.getConfiguration().setStrings("io.serializations",
+          serializations.toArray(new String[0]));
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java b/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
new file mode 100644
index 0000000..da5bbb2
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/** An {@link org.apache.hadoop.mapreduce.InputFormat} for Avro data files. */
+public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
+
+	@Override
+	protected List<FileStatus> listStatus(JobContext job) throws IOException {
+	  List<FileStatus> result = new ArrayList<FileStatus>();
+      for (FileStatus file : super.listStatus(job)) {
+        if (file.getPath().getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+          result.add(file);
+		}
+      }
+      return result;
+	}
+
+	@Override
+	public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(InputSplit split,
+		TaskAttemptContext context) throws IOException, InterruptedException {
+      context.setStatus(split.toString());
+      String jsonSchema = context.getConfiguration().get(AvroJob.INPUT_SCHEMA);
+      Schema schema = new Schema.Parser().parse(jsonSchema);
+      return new AvroRecordReader<T>(schema);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java b/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
new file mode 100644
index 0000000..60aa69b
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.NullWritable;
+
+import org.apache.crunch.types.Converter;
+
+public class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, Iterable<K>> {
+  
+  private transient AvroWrapper<K> wrapper = null;
+  
+  @Override
+  public K convertInput(AvroWrapper<K> key, NullWritable value) {
+    return key.datum();
+  }
+
+  @Override
+  public AvroWrapper<K> outputKey(K value) {
+    getWrapper().datum(value);
+    return wrapper;
+  }
+
+  @Override
+  public NullWritable outputValue(K value) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public Class<AvroWrapper<K>> getKeyClass() {
+    return (Class<AvroWrapper<K>>) getWrapper().getClass();
+  }
+
+  @Override
+  public Class<NullWritable> getValueClass() {
+    return NullWritable.class;
+  }
+
+  private AvroWrapper<K> getWrapper() {
+    if (wrapper == null) {
+      wrapper = new AvroWrapper<K>();
+    }
+    return wrapper;
+  }
+
+  @Override
+  public Iterable<K> convertIterableInput(AvroWrapper<K> key,
+      Iterable<NullWritable> value) {
+    throw new UnsupportedOperationException("Should not be possible");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
new file mode 100644
index 0000000..838c21a
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapreduce.OutputFormat} for Avro data files. */
+public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
+
+  @Override
+  public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+
+    Configuration conf = context.getConfiguration();
+    Schema schema = null;
+    String outputName = conf.get("crunch.namedoutput");
+    if (outputName != null && !outputName.isEmpty()) {
+      schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName));
+    } else {
+      schema = AvroJob.getOutputSchema(context.getConfiguration());
+    }
+    
+    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
+    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T>getWriter());
+
+    Path path = getDefaultWorkFile(context,
+        org.apache.avro.mapred.AvroOutputFormat.EXT);
+    WRITER.create(schema,
+        path.getFileSystem(context.getConfiguration()).create(path));
+
+    return new RecordWriter<AvroWrapper<T>, NullWritable>() {
+      @Override
+      public void write(AvroWrapper<T> wrapper, NullWritable ignore)
+        throws IOException {
+        WRITER.append(wrapper.datum());
+      }
+      @Override
+      public void close(TaskAttemptContext context) throws IOException,
+          InterruptedException {
+        WRITER.close();
+      }
+    };
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java b/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
new file mode 100644
index 0000000..6ec3972
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.util.Iterator;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.Converter;
+
+public class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<V>, Pair<K, V>, Pair<K, Iterable<V>>> {
+  
+  private transient AvroKey<K> keyWrapper = null;
+  private transient AvroValue<V> valueWrapper = null;
+  
+  @Override
+  public Pair<K, V> convertInput(AvroKey<K> key, AvroValue<V> value) {
+    return Pair.of(key.datum(), value.datum());
+  }
+
+  public Pair<K, Iterable<V>> convertIterableInput(AvroKey<K> key, Iterable<AvroValue<V>> iter) {
+    Iterable<V> it = new AvroWrappedIterable<V>(iter);
+    return Pair.of(key.datum(), it);  
+  }
+  
+  @Override
+  public AvroKey<K> outputKey(Pair<K, V> value) {
+    getKeyWrapper().datum(value.first());
+    return keyWrapper;
+  }
+
+  @Override
+  public AvroValue<V> outputValue(Pair<K, V> value) {
+    getValueWrapper().datum(value.second());
+    return valueWrapper;
+  }
+
+  @Override
+  public Class<AvroKey<K>> getKeyClass() {
+    return (Class<AvroKey<K>>) getKeyWrapper().getClass();
+  }
+
+  @Override
+  public Class<AvroValue<V>> getValueClass() {
+    return (Class<AvroValue<V>>) getValueWrapper().getClass();
+  }
+  
+  private AvroKey<K> getKeyWrapper() {
+    if (keyWrapper == null) {
+      keyWrapper = new AvroKey<K>();
+    }
+    return keyWrapper;
+  }
+  
+  private AvroValue<V> getValueWrapper() {
+    if (valueWrapper == null) {
+      valueWrapper = new AvroValue<V>();
+    }
+    return valueWrapper;
+  }
+  
+  private static class AvroWrappedIterable<V> implements Iterable<V> {
+
+    private final Iterable<AvroValue<V>> iters;
+    
+    public AvroWrappedIterable(Iterable<AvroValue<V>> iters) {
+      this.iters = iters;
+    }
+    
+    @Override
+    public Iterator<V> iterator() {
+      return new Iterator<V>() {
+        private final Iterator<AvroValue<V>> it = iters.iterator();
+
+        @Override
+        public boolean hasNext() {
+          return it.hasNext();
+        }
+
+        @Override
+        public V next() {
+          return it.next().datum();
+        }
+
+        @Override
+        public void remove() {
+          it.remove();
+        }  
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
new file mode 100644
index 0000000..3bcab5c
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/** An {@link RecordReader} for Avro data files. */
+public class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
+
+	private FileReader<T> reader;
+	private long start;
+	private long end;
+	private AvroWrapper<T> key;
+	private NullWritable value;
+	private Schema schema;
+
+	public AvroRecordReader(Schema schema) {
+		this.schema = schema;
+	}
+
+	@Override
+	public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
+			InterruptedException {
+		FileSplit split = (FileSplit) genericSplit;
+		Configuration conf = context.getConfiguration();
+		SeekableInput in = new FsInput(split.getPath(), conf);
+		DatumReader<T> datumReader = null;
+		if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) {
+		  ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
+			datumReader = factory.getReader(schema);
+		} else {
+			datumReader = new SpecificDatumReader<T>(schema);
+		}
+		this.reader = DataFileReader.openReader(in, datumReader);
+		reader.sync(split.getStart()); // sync to start
+		this.start = reader.tell();
+		this.end = split.getStart() + split.getLength();
+	}
+
+	@Override
+	public boolean nextKeyValue() throws IOException, InterruptedException {
+		if (!reader.hasNext() || reader.pastSync(end)) {
+			key = null;
+			value = null;
+			return false;
+		}
+		if (key == null) {
+			key = new AvroWrapper<T>();
+		}
+		if (value == null) {
+			value = NullWritable.get();
+		}
+		key.datum(reader.next(key.datum()));
+		return true;
+	}
+
+	@Override
+	public AvroWrapper<T> getCurrentKey() throws IOException, InterruptedException {
+		return key;
+	}
+
+	@Override
+	public NullWritable getCurrentValue() throws IOException, InterruptedException {
+		return value;
+	}
+
+	@Override
+	public float getProgress() throws IOException {
+		if (end == start) {
+			return 0.0f;
+		} else {
+			return Math.min(1.0f, (getPos() - start) / (float) (end - start));
+		}
+	}
+
+	public long getPos() throws IOException {
+		return reader.tell();
+	}
+
+	@Override
+	public void close() throws IOException {
+		reader.close();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
new file mode 100644
index 0000000..6d21122
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+/**
+ * The implementation of the PTableType interface for Avro-based serialization.
+ * 
+ */
+public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements
+		PTableType<K, V> {
+
+	private static class PairToAvroPair extends
+			MapFn<Pair, org.apache.avro.mapred.Pair> {
+		private final MapFn keyMapFn;
+		private final MapFn valueMapFn;
+		private final String firstJson;
+		private final String secondJson;
+
+		private String pairSchemaJson;
+		private transient Schema pairSchema;
+
+		public PairToAvroPair(AvroType keyType, AvroType valueType) {
+			this.keyMapFn = keyType.getOutputMapFn();
+			this.firstJson = keyType.getSchema().toString();
+			this.valueMapFn = valueType.getOutputMapFn();
+			this.secondJson = valueType.getSchema().toString();
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			keyMapFn.configure(conf);
+			valueMapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			keyMapFn.setConfigurationForTest(conf);
+			valueMapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			keyMapFn.setContext(getContext());
+			valueMapFn.setContext(getContext());
+			pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
+					new Schema.Parser().parse(firstJson),
+					new Schema.Parser().parse(secondJson)).toString();
+		}
+
+		@Override
+		public org.apache.avro.mapred.Pair map(Pair input) {
+			if (pairSchema == null) {
+				pairSchema = new Schema.Parser().parse(pairSchemaJson);
+			}
+			org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(
+					pairSchema);
+			avroPair.key(keyMapFn.map(input.first()));
+			avroPair.value(valueMapFn.map(input.second()));
+			return avroPair;
+		}
+	}
+
+	private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
+
+		private final MapFn firstMapFn;
+		private final MapFn secondMapFn;
+
+		public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
+			this.firstMapFn = firstMapFn;
+			this.secondMapFn = secondMapFn;
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			firstMapFn.configure(conf);
+			secondMapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			firstMapFn.setConfigurationForTest(conf);
+			secondMapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			firstMapFn.setContext(getContext());
+			secondMapFn.setContext(getContext());
+		}
+
+		@Override
+		public Pair map(IndexedRecord input) {
+			return Pair.of(firstMapFn.map(input.get(0)),
+					secondMapFn.map(input.get(1)));
+		}
+	}
+
+	private final AvroType<K> keyType;
+	private final AvroType<V> valueType;
+
+	public AvroTableType(AvroType<K> keyType, AvroType<V> valueType,
+			Class<Pair<K, V>> pairClass) {
+		super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(
+				keyType.getSchema(), valueType.getSchema()),
+				new IndexedRecordToPair(keyType.getInputMapFn(),
+						valueType.getInputMapFn()), new PairToAvroPair(keyType,
+						valueType), keyType, valueType);
+		this.keyType = keyType;
+		this.valueType = valueType;
+	}
+
+	@Override
+	public boolean isSpecific() {
+		return keyType.isSpecific() || valueType.isSpecific();
+	}
+
+	@Override
+	public boolean isGeneric() {
+		return keyType.isGeneric() || valueType.isGeneric();
+	}
+
+	@Override
+	public PType<K> getKeyType() {
+		return keyType;
+	}
+
+	@Override
+	public PType<V> getValueType() {
+		return valueType;
+	}
+
+	@Override
+	public PGroupedTableType<K, V> getGroupedTableType() {
+		return new AvroGroupedTableType<K, V>(this);
+	}
+
+  @Override
+  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+    return PTables.getDetachedValue(this, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroType.java b/src/main/java/org/apache/crunch/types/avro/AvroType.java
new file mode 100644
index 0000000..d5d22a8
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.avro.AvroFileSourceTarget;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * The implementation of the PType interface for Avro-based serialization.
+ * 
+ */
+public class AvroType<T> implements PType<T> {
+
+	private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
+
+	private final Class<T> typeClass;
+  private final String schemaString;
+  private transient Schema schema;
+	private final MapFn baseInputMapFn;
+	private final MapFn baseOutputMapFn;
+	private final List<PType> subTypes;
+  private AvroDeepCopier<T> deepCopier;
+
+	public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
+		this(typeClass, schema, IdentityFn.getInstance(), IdentityFn
+				.getInstance(), ptypes);
+	}
+
+	public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn,
+			MapFn outputMapFn, PType... ptypes) {
+		this.typeClass = typeClass;
+		this.schema = Preconditions.checkNotNull(schema);
+		this.schemaString = schema.toString();
+		this.baseInputMapFn = inputMapFn;
+		this.baseOutputMapFn = outputMapFn;
+		this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return typeClass;
+	}
+
+	@Override
+	public PTypeFamily getFamily() {
+		return AvroTypeFamily.getInstance();
+	}
+
+	@Override
+	public List<PType> getSubTypes() {
+		return subTypes;
+	}
+
+	public Schema getSchema() {
+	  if (schema == null){
+	    schema = new Schema.Parser().parse(schemaString);
+	  }
+		return schema;
+	}
+
+	/**
+	 * Determine if the wrapped type is a specific data avro type.
+	 * 
+	 * @return true if the wrapped type is a specific data type
+	 */
+	public boolean isSpecific() {
+		if (SpecificRecord.class.isAssignableFrom(typeClass)) {
+			return true;
+		}
+		for (PType ptype : subTypes) {
+			if (SpecificRecord.class.isAssignableFrom(ptype.getTypeClass())) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * Determine if the wrapped type is a generic data avro type.
+	 * 
+	 * @return true if the wrapped type is a generic type
+	 */
+	public boolean isGeneric() {
+		return GenericData.Record.class.equals(typeClass);
+	}
+
+	public MapFn<Object, T> getInputMapFn() {
+		return baseInputMapFn;
+	}
+
+	public MapFn<T, Object> getOutputMapFn() {
+		return baseOutputMapFn;
+	}
+
+	@Override
+	public Converter getConverter() {
+		return AVRO_CONVERTER;
+	}
+
+	@Override
+	public SourceTarget<T> getDefaultFileSource(Path path) {
+		return new AvroFileSourceTarget<T>(path, this);
+	}
+
+  private AvroDeepCopier<T> getDeepCopier() {
+    if (deepCopier == null) {
+      if (isSpecific()) {
+        deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema());
+      } else if (isGeneric()) {
+        deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema());
+      } else {
+        deepCopier = new AvroDeepCopier.AvroReflectDeepCopier<T>(typeClass, getSchema());
+      }
+    }
+    return deepCopier;
+  }
+
+  public T getDetachedValue(T value) {
+    if (this.baseInputMapFn instanceof IdentityFn && !Avros.isPrimitive(this)) {
+      return getDeepCopier().deepCopy(value);
+    }
+    return value;
+  }
+
+	@Override
+	public boolean equals(Object other) {
+		if (other == null || !(other instanceof AvroType)) {
+			return false;
+		}
+		AvroType at = (AvroType) other;
+		return (typeClass.equals(at.typeClass) && subTypes.equals(at.subTypes));
+
+	}
+
+	@Override
+	public int hashCode() {
+		HashCodeBuilder hcb = new HashCodeBuilder();
+		hcb.append(typeClass).append(subTypes);
+		return hcb.toHashCode();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
new file mode 100644
index 0000000..b7d5598
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypeUtils;
+
+public class AvroTypeFamily implements PTypeFamily {
+  
+  private static final AvroTypeFamily INSTANCE = new AvroTypeFamily();
+  
+  public static AvroTypeFamily getInstance() {
+    return INSTANCE;
+  }
+  
+  // There can only be one instance.
+  private AvroTypeFamily() {
+  }
+
+  
+  @Override
+  public PType<Void> nulls() {
+    return Avros.nulls();
+  }
+
+  @Override
+  public PType<String> strings() {
+    return Avros.strings();
+  }
+
+  @Override
+  public PType<Long> longs() {
+    return Avros.longs();
+  }
+
+  @Override
+  public PType<Integer> ints() {
+    return Avros.ints();
+  }
+
+  @Override
+  public PType<Float> floats() {
+    return Avros.floats();
+  }
+
+  @Override
+  public PType<Double> doubles() {
+    return Avros.doubles();
+  }
+
+  @Override
+  public PType<Boolean> booleans() {
+    return Avros.booleans();
+  }
+
+  @Override
+  public PType<ByteBuffer> bytes() {
+    return Avros.bytes();
+  }
+
+  @Override
+  public <T> PType<T> records(Class<T> clazz) {
+    return Avros.records(clazz);
+  }
+
+  public PType<GenericData.Record> generics(Schema schema) {
+	return Avros.generics(schema);
+  }
+  
+  public <T> PType<T> containers(Class<T> clazz) {
+    return Avros.containers(clazz);
+  }
+  
+  @Override
+  public <T> PType<Collection<T>> collections(PType<T> ptype) {
+    return Avros.collections(ptype);
+  }
+
+  @Override
+  public <T> PType<Map<String, T>> maps(PType<T> ptype) {
+	return Avros.maps(ptype);
+  }
+  
+  @Override
+  public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
+    return Avros.pairs(p1, p2);
+  }
+
+  @Override
+  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3) {
+    return Avros.triples(p1, p2, p3);
+  }
+
+  @Override
+  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+    return Avros.quads(p1, p2, p3, p4);
+  }
+
+  @Override
+  public PType<TupleN> tuples(PType<?>... ptypes) {
+    return Avros.tuples(ptypes);
+  }
+
+  @Override
+  public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
+    return Avros.tableOf(key, value);
+  }
+
+  @Override
+  public <T> PType<T> as(PType<T> ptype) {
+    if (ptype instanceof AvroType || ptype instanceof AvroGroupedTableType) {
+      return ptype;
+    }
+    if (ptype instanceof PGroupedTableType) {
+      PTableType ptt = ((PGroupedTableType) ptype).getTableType();
+      return new AvroGroupedTableType((AvroTableType) as(ptt));
+    }
+    Class<T> typeClass = ptype.getTypeClass();
+    PType<T> prim = Avros.getPrimitiveType(typeClass);
+    if (prim != null) {
+      return prim;
+    }
+    return PTypeUtils.convert(ptype, this);
+  }
+
+  @Override
+  public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
+    return Avros.tuples(clazz, ptypes);
+  }
+
+  @Override
+  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
+      MapFn<T, S> outputFn, PType<S> base) {
+    return Avros.derived(clazz, inputFn, outputFn, base);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java b/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
new file mode 100644
index 0000000..207fe8d
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+
+/**
+ * An {@link org.apache.hadoop.mapred.InputFormat} for text files.
+ * Each line is a {@link Utf8} key; values are null.
+ */
+public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, NullWritable> {
+
+  static class Utf8LineRecordReader extends RecordReader<AvroWrapper<Utf8>, NullWritable> {
+
+    private LineRecordReader lineRecordReader;
+
+    private AvroWrapper<Utf8> currentKey = new AvroWrapper<Utf8>();
+    
+    public Utf8LineRecordReader() throws IOException {
+      this.lineRecordReader = new LineRecordReader();
+    }
+
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    public float getProgress() throws IOException {
+      return lineRecordReader.getProgress();
+    }
+
+    @Override
+    public AvroWrapper<Utf8> getCurrentKey() throws IOException,
+        InterruptedException {
+      Text txt = lineRecordReader.getCurrentValue();
+      currentKey.datum(new Utf8(txt.toString()));
+      return currentKey;
+    }
+
+    @Override
+    public NullWritable getCurrentValue() throws IOException,
+        InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      lineRecordReader.initialize(split, context);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return lineRecordReader.nextKeyValue();
+    }
+  }
+
+  private CompressionCodecFactory compressionCodecs = null;
+
+  public void configure(Configuration conf) {
+    compressionCodecs = new CompressionCodecFactory(conf);
+  }
+
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    return compressionCodecs.getCodec(file) == null;
+  }
+
+  @Override
+  public RecordReader<AvroWrapper<Utf8>, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    return new Utf8LineRecordReader();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/Avros.java b/src/main/java/org/apache/crunch/types/avro/Avros.java
new file mode 100644
index 0000000..fbc6bf3
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.fn.CompositeMapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.util.PTypes;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Defines static methods that are analogous to the methods defined in
+ * {@link AvroTypeFamily} for convenient static importing.
+ * 
+ */
+public class Avros {
+
+	/**
+	 * The instance we use for generating reflected schemas. May be modified by
+	 * clients (e.g., Scrunch.)
+	 */
+	public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
+
+	/**
+	 * The name of the configuration parameter that tracks which reflection
+	 * factory to use.
+	 */
+	public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
+
+	public static void configureReflectDataFactory(Configuration conf) {
+		conf.setClass(REFLECT_DATA_FACTORY_CLASS,
+				REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
+	}
+
+	public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
+		return (ReflectDataFactory) ReflectionUtils.newInstance(conf.getClass(
+				REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
+	}
+
+	public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
+		@Override
+		public String map(CharSequence input) {
+			return input.toString();
+		}
+	};
+
+	public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
+		@Override
+		public Utf8 map(String input) {
+			return new Utf8(input);
+		}
+	};
+
+	public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
+		@Override
+		public ByteBuffer map(Object input) {
+			if (input instanceof ByteBuffer) {
+				return (ByteBuffer) input;
+			}
+			return ByteBuffer.wrap((byte[]) input);
+		}
+	};
+
+	private static final AvroType<String> strings = new AvroType<String>(
+			String.class, Schema.create(Schema.Type.STRING), UTF8_TO_STRING,
+			STRING_TO_UTF8);
+	private static final AvroType<Void> nulls = create(Void.class,
+			Schema.Type.NULL);
+	private static final AvroType<Long> longs = create(Long.class,
+			Schema.Type.LONG);
+	private static final AvroType<Integer> ints = create(Integer.class,
+			Schema.Type.INT);
+	private static final AvroType<Float> floats = create(Float.class,
+			Schema.Type.FLOAT);
+	private static final AvroType<Double> doubles = create(Double.class,
+			Schema.Type.DOUBLE);
+	private static final AvroType<Boolean> booleans = create(Boolean.class,
+			Schema.Type.BOOLEAN);
+	private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(
+			ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN,
+			IdentityFn.getInstance());
+
+	private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
+			.<Class<?>, PType<?>> builder().put(String.class, strings)
+			.put(Long.class, longs).put(Integer.class, ints)
+			.put(Float.class, floats).put(Double.class, doubles)
+			.put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+
+	private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps
+			.newHashMap();
+
+	public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
+		EXTENSIONS.put(clazz, ptype);
+	}
+
+	public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
+		return (PType<T>) PRIMITIVES.get(clazz);
+	}
+
+  static <T> boolean isPrimitive(AvroType<T> avroType) {
+    return PRIMITIVES.containsKey(avroType.getTypeClass());
+  }
+
+	private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
+		return new AvroType<T>(clazz, Schema.create(schemaType));
+	}
+
+	public static final AvroType<Void> nulls() {
+		return nulls;
+	}
+
+	public static final AvroType<String> strings() {
+		return strings;
+	}
+
+	public static final AvroType<Long> longs() {
+		return longs;
+	}
+
+	public static final AvroType<Integer> ints() {
+		return ints;
+	}
+
+	public static final AvroType<Float> floats() {
+		return floats;
+	}
+
+	public static final AvroType<Double> doubles() {
+		return doubles;
+	}
+
+	public static final AvroType<Boolean> booleans() {
+		return booleans;
+	}
+
+	public static final AvroType<ByteBuffer> bytes() {
+		return bytes;
+	}
+
+	public static final <T> AvroType<T> records(Class<T> clazz) {
+		if (EXTENSIONS.containsKey(clazz)) {
+			return (AvroType<T>) EXTENSIONS.get(clazz);
+		}
+		return containers(clazz);
+	}
+
+	public static final AvroType<GenericData.Record> generics(Schema schema) {
+		return new AvroType<GenericData.Record>(GenericData.Record.class,
+				schema);
+	}
+
+	public static final <T> AvroType<T> containers(Class<T> clazz) {
+		return reflects(clazz);
+	}
+
+	public static final <T> AvroType<T> reflects(Class<T> clazz) {
+		return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData()
+				.getSchema(clazz));
+	}
+
+	private static class BytesToWritableMapFn<T extends Writable> extends
+			MapFn<ByteBuffer, T> {
+		private static final Log LOG = LogFactory
+				.getLog(BytesToWritableMapFn.class);
+
+		private final Class<T> writableClazz;
+
+		public BytesToWritableMapFn(Class<T> writableClazz) {
+			this.writableClazz = writableClazz;
+		}
+
+		@Override
+		public T map(ByteBuffer input) {
+			T instance = ReflectionUtils.newInstance(writableClazz,
+					getConfiguration());
+			try {
+				instance.readFields(new DataInputStream(
+						new ByteArrayInputStream(input.array(), input
+								.arrayOffset(), input.limit())));
+			} catch (IOException e) {
+				LOG.error("Exception thrown reading instance of: "
+						+ writableClazz, e);
+			}
+			return instance;
+		}
+	}
+
+	private static class WritableToBytesMapFn<T extends Writable> extends
+			MapFn<T, ByteBuffer> {
+		private static final Log LOG = LogFactory
+				.getLog(WritableToBytesMapFn.class);
+
+		@Override
+		public ByteBuffer map(T input) {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			DataOutputStream das = new DataOutputStream(baos);
+			try {
+				input.write(das);
+			} catch (IOException e) {
+				LOG.error("Exception thrown converting Writable to bytes", e);
+			}
+			return ByteBuffer.wrap(baos.toByteArray());
+		}
+	}
+
+	public static final <T extends Writable> AvroType<T> writables(
+			Class<T> clazz) {
+		return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES),
+				new BytesToWritableMapFn<T>(clazz),
+				new WritableToBytesMapFn<T>());
+	}
+
+	private static class GenericDataArrayToCollection<T> extends
+			MapFn<Object, Collection<T>> {
+
+		private final MapFn<Object, T> mapFn;
+
+		public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
+			this.mapFn = mapFn;
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			mapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			mapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			this.mapFn.setContext(getContext());
+		}
+
+		@Override
+		public Collection<T> map(Object input) {
+			Collection<T> ret = Lists.newArrayList();
+			if (input instanceof Collection) {
+				for (Object in : (Collection<Object>) input) {
+					ret.add(mapFn.map(in));
+				}
+			} else {
+				// Assume it is an array
+				Object[] arr = (Object[]) input;
+				for (Object in : arr) {
+					ret.add(mapFn.map(in));
+				}
+			}
+			return ret;
+		}
+	}
+
+	private static class CollectionToGenericDataArray extends
+			MapFn<Collection<?>, GenericData.Array<?>> {
+
+		private final MapFn mapFn;
+		private final String jsonSchema;
+		private transient Schema schema;
+
+		public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
+			this.mapFn = mapFn;
+			this.jsonSchema = schema.toString();
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			mapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			mapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			this.mapFn.setContext(getContext());
+		}
+
+		@Override
+		public GenericData.Array<?> map(Collection<?> input) {
+			if (schema == null) {
+				schema = new Schema.Parser().parse(jsonSchema);
+			}
+			GenericData.Array array = new GenericData.Array(input.size(),
+					schema);
+			for (Object in : input) {
+				array.add(mapFn.map(in));
+			}
+			return array;
+		}
+	}
+
+	public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
+		AvroType<T> avroType = (AvroType<T>) ptype;
+		Schema collectionSchema = Schema.createArray(allowNulls(avroType
+				.getSchema()));
+		GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
+				avroType.getInputMapFn());
+		CollectionToGenericDataArray output = new CollectionToGenericDataArray(
+				collectionSchema, avroType.getOutputMapFn());
+		return new AvroType(Collection.class, collectionSchema, input, output,
+				ptype);
+	}
+
+	private static class AvroMapToMap<T> extends
+			MapFn<Map<CharSequence, Object>, Map<String, T>> {
+		private final MapFn<Object, T> mapFn;
+
+		public AvroMapToMap(MapFn<Object, T> mapFn) {
+			this.mapFn = mapFn;
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			mapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			mapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			this.mapFn.setContext(getContext());
+		}
+
+		@Override
+		public Map<String, T> map(Map<CharSequence, Object> input) {
+			Map<String, T> out = Maps.newHashMap();
+			for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
+				out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+			}
+			return out;
+		}
+	}
+
+	private static class MapToAvroMap<T> extends
+			MapFn<Map<String, T>, Map<Utf8, Object>> {
+		private final MapFn<T, Object> mapFn;
+
+		public MapToAvroMap(MapFn<T, Object> mapFn) {
+			this.mapFn = mapFn;
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			mapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			mapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			this.mapFn.setContext(getContext());
+		}
+
+		@Override
+		public Map<Utf8, Object> map(Map<String, T> input) {
+			Map<Utf8, Object> out = Maps.newHashMap();
+			for (Map.Entry<String, T> e : input.entrySet()) {
+				out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
+			}
+			return out;
+		}
+	}
+
+	public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
+		AvroType<T> avroType = (AvroType<T>) ptype;
+		Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
+		AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
+		MapToAvroMap<T> outputFn = new MapToAvroMap<T>(
+				avroType.getOutputMapFn());
+		return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
+	}
+
+	private static class GenericRecordToTuple extends
+			MapFn<GenericRecord, Tuple> {
+		private final TupleFactory<?> tupleFactory;
+		private final List<MapFn> fns;
+
+		private transient Object[] values;
+
+		public GenericRecordToTuple(TupleFactory<?> tupleFactory,
+				PType<?>... ptypes) {
+			this.tupleFactory = tupleFactory;
+			this.fns = Lists.newArrayList();
+			for (PType<?> ptype : ptypes) {
+				AvroType atype = (AvroType) ptype;
+				fns.add(atype.getInputMapFn());
+			}
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			for (MapFn fn : fns) {
+				fn.configure(conf);
+			}
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			for (MapFn fn : fns) {
+				fn.setConfigurationForTest(conf);
+			}
+		}
+
+		@Override
+		public void initialize() {
+			for (MapFn fn : fns) {
+				fn.setContext(getContext());
+			}
+			this.values = new Object[fns.size()];
+			tupleFactory.initialize();
+		}
+
+		@Override
+		public Tuple map(GenericRecord input) {
+			for (int i = 0; i < values.length; i++) {
+				Object v = input.get(i);
+				if (v == null) {
+					values[i] = null;
+				} else {
+					values[i] = fns.get(i).map(v);
+				}
+			}
+			return tupleFactory.makeTuple(values);
+		}
+	}
+
+	private static class TupleToGenericRecord extends
+			MapFn<Tuple, GenericRecord> {
+		private final List<MapFn> fns;
+		private final String jsonSchema;
+
+		private transient GenericRecord record;
+
+		public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
+			this.fns = Lists.newArrayList();
+			this.jsonSchema = schema.toString();
+			for (PType ptype : ptypes) {
+				AvroType atype = (AvroType) ptype;
+				fns.add(atype.getOutputMapFn());
+			}
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			for (MapFn fn : fns) {
+				fn.configure(conf);
+			}
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			for (MapFn fn : fns) {
+				fn.setConfigurationForTest(conf);
+			}
+		}
+
+		@Override
+		public void initialize() {
+			this.record = new GenericData.Record(
+					new Schema.Parser().parse(jsonSchema));
+			for (MapFn fn : fns) {
+				fn.setContext(getContext());
+			}
+		}
+
+		@Override
+		public GenericRecord map(Tuple input) {
+			for (int i = 0; i < input.size(); i++) {
+				Object v = input.get(i);
+				if (v == null) {
+					record.put(i, null);
+				} else {
+					record.put(i, fns.get(i).map(v));
+				}
+			}
+			return record;
+		}
+	}
+
+	public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1,
+			PType<V2> p2) {
+		Schema schema = createTupleSchema(p1, p2);
+		GenericRecordToTuple input = new GenericRecordToTuple(
+				TupleFactory.PAIR, p1, p2);
+		TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
+		return new AvroType(Pair.class, schema, input, output, p1, p2);
+	}
+
+	public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(
+			PType<V1> p1, PType<V2> p2, PType<V3> p3) {
+		Schema schema = createTupleSchema(p1, p2, p3);
+		return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(
+				TupleFactory.TUPLE3, p1, p2, p3), new TupleToGenericRecord(
+				schema, p1, p2, p3), p1, p2, p3);
+	}
+
+	public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(
+			PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+		Schema schema = createTupleSchema(p1, p2, p3, p4);
+		return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(
+				TupleFactory.TUPLE4, p1, p2, p3, p4), new TupleToGenericRecord(
+				schema, p1, p2, p3, p4), p1, p2, p3, p4);
+	}
+
+	public static final AvroType<TupleN> tuples(PType... ptypes) {
+		Schema schema = createTupleSchema(ptypes);
+		return new AvroType(TupleN.class, schema, new GenericRecordToTuple(
+				TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
+				ptypes), ptypes);
+	}
+
+	public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz,
+			PType... ptypes) {
+		Schema schema = createTupleSchema(ptypes);
+		Class[] typeArgs = new Class[ptypes.length];
+		for (int i = 0; i < typeArgs.length; i++) {
+			typeArgs[i] = ptypes[i].getTypeClass();
+		}
+		TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
+		return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory,
+				ptypes), new TupleToGenericRecord(schema, ptypes), ptypes);
+	}
+
+	private static Schema createTupleSchema(PType<?>... ptypes) {
+		// Guarantee each tuple schema has a globally unique name
+		String tupleName = "tuple"
+				+ UUID.randomUUID().toString().replace('-', 'x');
+		Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+		List<Schema.Field> fields = Lists.newArrayList();
+		for (int i = 0; i < ptypes.length; i++) {
+			AvroType atype = (AvroType) ptypes[i];
+			Schema fieldSchema = allowNulls(atype.getSchema());
+			fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
+		}
+		schema.setFields(fields);
+		return schema;
+	}
+
+	public static final <S, T> AvroType<T> derived(Class<T> clazz,
+			MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
+		AvroType<S> abase = (AvroType<S>) base;
+		return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(
+				abase.getInputMapFn(), inputFn), new CompositeMapFn(outputFn,
+				abase.getOutputMapFn()), base.getSubTypes().toArray(
+				new PType[0]));
+	}
+
+	public static <T> PType<T> jsons(Class<T> clazz) {
+		return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
+	}
+
+	public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key,
+			PType<V> value) {
+	  if (key instanceof PTableType) {
+	    PTableType ptt = (PTableType) key;
+	    key = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
+	  }
+	  if (value instanceof PTableType) {
+	    PTableType ptt = (PTableType) value;
+	    value = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
+	  }
+		AvroType<K> avroKey = (AvroType<K>) key;
+		AvroType<V> avroValue = (AvroType<V>) value;
+		return new AvroTableType(avroKey, avroValue, Pair.class);
+	}
+
+	private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
+
+	private static Schema allowNulls(Schema base) {
+		if (NULL_SCHEMA.equals(base)) {
+			return base;
+		}
+		return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
+	}
+
+	private Avros() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
new file mode 100644
index 0000000..7c952ec
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+
+/**
+ * A Factory class for constructing Avro reflection-related objects.
+ */
+public class ReflectDataFactory {
+
+  public ReflectData getReflectData() { return ReflectData.AllowNull.get(); }
+  
+  public <T> ReflectDatumReader<T> getReader(Schema schema) {
+    return new ReflectDatumReader<T>(schema);
+  }
+  
+  public <T> ReflectDatumWriter<T> getWriter() {
+    return new ReflectDatumWriter<T>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
new file mode 100644
index 0000000..b5f27fe
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
+public class SafeAvroSerialization<T> extends Configured 
+  implements Serialization<AvroWrapper<T>> {
+
+  public boolean accept(Class<?> c) {
+    return AvroWrapper.class.isAssignableFrom(c);
+  }
+  
+  /** Returns the specified map output deserializer.  Defaults to the final
+   * output deserializer if no map output schema was specified. */
+  public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
+    boolean isKey = AvroKey.class.isAssignableFrom(c);
+    Configuration conf = getConf();
+    Schema schema = isKey 
+        ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf))
+            : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf));
+
+    DatumReader<T> datumReader = null;
+    if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {        
+        ReflectDataFactory factory = (ReflectDataFactory) ReflectionUtils.newInstance(
+            conf.getClass("crunch.reflectdatafactory", ReflectDataFactory.class), conf);
+        datumReader = factory.getReader(schema);
+    } else {
+        datumReader = new SpecificDatumReader<T>(schema);
+    }
+    return new AvroWrapperDeserializer(datumReader, isKey);
+  }
+  
+  private static final DecoderFactory FACTORY = DecoderFactory.get();
+
+  private class AvroWrapperDeserializer
+    implements Deserializer<AvroWrapper<T>> {
+
+    private DatumReader<T> reader;
+    private BinaryDecoder decoder;
+    private boolean isKey;
+    
+    public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) {
+      this.reader = reader;
+      this.isKey = isKey;
+    }
+    
+    public void open(InputStream in) {
+      this.decoder = FACTORY.directBinaryDecoder(in, decoder);
+    }
+    
+    public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper)
+      throws IOException {
+      T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
+      if (wrapper == null) {
+        wrapper = isKey? new AvroKey<T>(datum) : new AvroValue<T>(datum);
+      } else {
+        wrapper.datum(datum);
+      }
+      return wrapper;
+    }
+
+    public void close() throws IOException {
+      decoder.inputStream().close();
+    }
+  }
+  
+  /** Returns the specified output serializer. */
+  public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
+    // AvroWrapper used for final output, AvroKey or AvroValue for map output
+    boolean isFinalOutput = c.equals(AvroWrapper.class);
+    Configuration conf = getConf();
+    Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf)
+        : (AvroKey.class.isAssignableFrom(c)
+            ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf))
+                : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
+
+    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
+    ReflectDatumWriter<T> writer = factory.getWriter();
+    writer.setSchema(schema);
+    return new AvroWrapperSerializer(writer);
+  }
+
+  private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
+    private DatumWriter<T> writer;
+    private OutputStream out;
+    private BinaryEncoder encoder;
+    
+    public AvroWrapperSerializer(DatumWriter<T> writer) {
+      this.writer = writer;
+    }
+
+    public void open(OutputStream out) {
+      this.out = out;
+      this.encoder = new EncoderFactory().configureBlockSize(512)
+          .binaryEncoder(out, null);
+    }
+
+    public void serialize(AvroWrapper<T> wrapper) throws IOException {
+      writer.write(wrapper.datum(), encoder);
+      // would be a lot faster if the Serializer interface had a flush()
+      // method and the Hadoop framework called it when needed rather
+      // than for every record.
+      encoder.flush();
+    }
+
+    public void close() throws IOException {
+      out.close();
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
new file mode 100644
index 0000000..5c4d83f
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+
+public class GenericArrayWritable<T> implements Writable {
+  private Writable[] values;
+  private Class<? extends Writable> valueClass;
+
+  public GenericArrayWritable(Class<? extends Writable> valueClass) {
+    this.valueClass = valueClass;
+  }
+  
+  public GenericArrayWritable() {
+    // for deserialization
+  }
+  
+  public void set(Writable[] values) { 
+    this.values = values; 
+  }
+
+  public Writable[] get() {
+    return values;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    values = new Writable[WritableUtils.readVInt(in)];          // construct values
+    if (values.length > 0) {
+      int nulls = WritableUtils.readVInt(in);
+      if (nulls == values.length) {
+        return;
+      }
+      String valueType = Text.readString(in);
+      setValueType(valueType);
+      for (int i = 0; i < values.length; i++) {
+        Writable value = WritableFactories.newInstance(valueClass);
+        value.readFields(in);                       // read a value
+        values[i] = value;                          // store it in values
+      }
+    }
+  }
+  
+  protected void setValueType(String valueType) {
+    if (valueClass == null) {
+      try {
+        valueClass = Class.forName(valueType).asSubclass(Writable.class);      
+      } catch (ClassNotFoundException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    } else if (!valueType.equals(valueClass.getName()))  {
+      throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass);
+    }
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, values.length);
+    int nulls = 0;
+    for (int i = 0; i < values.length; i++) {
+      if (values[i] == null) {
+        nulls++;
+      }
+    }
+    WritableUtils.writeVInt(out, nulls);
+    if (values.length - nulls > 0) {
+      if (valueClass == null) {
+        throw new IllegalStateException("Value class not set by constructor or read");
+      }
+      Text.writeString(out, valueClass.getName());
+      for (int i = 0; i < values.length; i++) {
+        if (values[i] != null) {
+          values[i].write(out);
+        }
+      }
+    }
+  }
+  
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(values).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    GenericArrayWritable other = (GenericArrayWritable) obj;
+    if (!Arrays.equals(values, other.values))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java b/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
new file mode 100644
index 0000000..303fc41
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Maps;
+
+public class TextMapWritable<T extends Writable> implements Writable {
+
+  private Class<T> valueClazz;
+  private final Map<Text, T> instance;
+
+  public TextMapWritable() {
+    this.instance = Maps.newHashMap();
+  }
+
+  public TextMapWritable(Class<T> valueClazz) {
+    this.valueClazz = valueClazz;
+    this.instance = Maps.newHashMap();
+  }
+
+  public void put(Text txt, T value) {
+    instance.put(txt, value);
+  }
+
+  public Set<Map.Entry<Text, T>> entrySet() {
+    return instance.entrySet();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    instance.clear();
+    try {
+      this.valueClazz = (Class<T>) Class.forName(Text.readString(in));
+    } catch (ClassNotFoundException e) {
+      throw (IOException) new IOException("Failed map init").initCause(e);
+    }
+    int entries = WritableUtils.readVInt(in);
+    try {
+      for (int i = 0; i < entries; i++) {
+        Text txt = new Text();
+        txt.readFields(in);
+        T value = valueClazz.newInstance();
+        value.readFields(in);
+        instance.put(txt, value);
+      }
+    } catch (IllegalAccessException e) {
+      throw (IOException) new IOException("Failed map init").initCause(e);
+    } catch (InstantiationException e) {
+      throw (IOException) new IOException("Failed map init").initCause(e);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, valueClazz.getName());
+    WritableUtils.writeVInt(out, instance.size());
+    for (Map.Entry<Text, T> e : instance.entrySet()) {
+      e.getKey().write(out);
+      e.getValue().write(out);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
new file mode 100644
index 0000000..ee4e80b
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A straight copy of the TupleWritable implementation in the join package,
+ * added here because of its package visibility restrictions.
+ * 
+ */
+public class TupleWritable implements WritableComparable<TupleWritable> {
+
+  private long written;
+  private Writable[] values;
+
+  /**
+   * Create an empty tuple with no allocated storage for writables.
+   */
+  public TupleWritable() {
+  }
+
+  /**
+   * Initialize tuple with storage; unknown whether any of them contain
+   * &quot;written&quot; values.
+   */
+  public TupleWritable(Writable[] vals) {
+    written = 0L;
+    values = vals;
+  }
+
+  /**
+   * Return true if tuple has an element at the position provided.
+   */
+  public boolean has(int i) {
+    return 0 != ((1 << i) & written);
+  }
+
+  /**
+   * Get ith Writable from Tuple.
+   */
+  public Writable get(int i) {
+    return values[i];
+  }
+
+  /**
+   * The number of children in this Tuple.
+   */
+  public int size() {
+    return values.length;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean equals(Object other) {
+    if (other instanceof TupleWritable) {
+      TupleWritable that = (TupleWritable) other;
+      if (this.size() != that.size() || this.written != that.written) {
+        return false;
+      }
+      for (int i = 0; i < values.length; ++i) {
+        if (!has(i))
+          continue;
+        if (!values[i].equals(that.get(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+    builder.append(written);
+    for (Writable v : values) {
+      builder.append(v);
+    }
+    return builder.toHashCode();
+  }
+
+  /**
+   * Convert Tuple to String as in the following.
+   * <tt>[<child1>,<child2>,...,<childn>]</tt>
+   */
+  public String toString() {
+    StringBuffer buf = new StringBuffer("[");
+    for (int i = 0; i < values.length; ++i) {
+      buf.append(has(i) ? values[i].toString() : "");
+      buf.append(",");
+    }
+    if (values.length != 0)
+      buf.setCharAt(buf.length() - 1, ']');
+    else
+      buf.append(']');
+    return buf.toString();
+  }
+
+  /**
+   * Writes each Writable to <code>out</code>. TupleWritable format:
+   * {@code
+   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
+   * }
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, values.length);
+    WritableUtils.writeVLong(out, written);
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i)) {
+        Text.writeString(out, values[i].getClass().getName());
+      }
+    }
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i)) {
+        values[i].write(out);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @SuppressWarnings("unchecked")
+  // No static typeinfo on Tuples
+  public void readFields(DataInput in) throws IOException {
+    int card = WritableUtils.readVInt(in);
+    values = new Writable[card];
+    written = WritableUtils.readVLong(in);
+    Class<? extends Writable>[] cls = new Class[card];
+    try {
+      for (int i = 0; i < card; ++i) {
+        if (has(i)) {
+          cls[i] = Class.forName(Text.readString(in))
+              .asSubclass(Writable.class);
+        }
+      }
+      for (int i = 0; i < card; ++i) {
+        if (has(i)) {
+          values[i] = cls[i].newInstance();
+          values[i].readFields(in);
+        }
+      }
+    } catch (ClassNotFoundException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    } catch (IllegalAccessException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    } catch (InstantiationException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    }
+  }
+
+  /**
+   * Record that the tuple contains an element at the position provided.
+   */
+  public void setWritten(int i) {
+    written |= 1 << i;
+  }
+
+  /**
+   * Record that the tuple does not contain an element at the position provided.
+   */
+  public void clearWritten(int i) {
+    written &= -1 ^ (1 << i);
+  }
+
+  /**
+   * Clear any record of which writables have been written to, without releasing
+   * storage.
+   */
+  public void clearWritten() {
+    written = 0L;
+  }
+
+  @Override
+  public int compareTo(TupleWritable o) {
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i) && !o.has(i)) {
+        return 1;
+      } else if (!has(i) && o.has(i)) {
+        return -1;
+      } else {
+        Writable v1 = values[i];
+        Writable v2 = o.values[i];
+        if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
+          if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+            int cmp = ((WritableComparable)v1).compareTo((WritableComparable)v2);
+            if (cmp != 0) {
+              return cmp;
+            }
+          } else {
+            int cmp = v1.hashCode() - v2.hashCode();
+            if (cmp != 0) {
+              return cmp;
+            }
+          }
+        }
+      }
+    }
+    return values.length - o.values.length;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
new file mode 100644
index 0000000..1d18843
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PGroupedTableType;
+
+public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
+
+  private final MapFn inputFn;
+  private final MapFn outputFn;
+  private final Converter converter;
+  
+  public WritableGroupedTableType(WritableTableType<K, V> tableType) {
+    super(tableType);
+    WritableType keyType = (WritableType) tableType.getKeyType();
+    WritableType valueType = (WritableType) tableType.getValueType();
+    this.inputFn =  new PairIterableMapFn(keyType.getInputMapFn(),
+        valueType.getInputMapFn());
+    this.outputFn = tableType.getOutputMapFn();
+    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
+        valueType.getSerializationClass());
+  }
+  
+  @Override
+  public Class<Pair<K, Iterable<V>>> getTypeClass() {
+    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();  
+  }
+  
+  @Override
+  public Converter getGroupingConverter() {
+    return converter;
+  }
+
+  @Override
+  public MapFn getInputMapFn() {
+    return inputFn;
+  }
+  
+  @Override
+  public MapFn getOutputMapFn() {
+    return outputFn;
+  }
+  
+  @Override
+  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
+    return PTables.getGroupedDetachedValue(this, value);
+  }
+
+  @Override
+  public void configureShuffle(Job job, GroupingOptions options) {
+    if (options != null) {
+      options.configure(job);
+    }
+    WritableType keyType = (WritableType) tableType.getKeyType();
+    WritableType valueType = (WritableType) tableType.getValueType();
+    job.setMapOutputKeyClass(keyType.getSerializationClass());
+    job.setMapOutputValueClass(valueType.getSerializationClass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java b/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
new file mode 100644
index 0000000..ba64b0b
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.Converter;
+
+public class WritablePairConverter<K, V> implements Converter<K, V, Pair<K, V>, Pair<K, Iterable<V>>> {
+  
+  private final Class<K> keyClass;
+  private final Class<V> valueClass;
+  
+  public WritablePairConverter(Class<K> keyClass, Class<V> valueClass) {
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+  }
+  
+  @Override
+  public Pair<K, V> convertInput(K key, V value) {
+    return Pair.of(key, value);
+  }
+
+  @Override
+  public K outputKey(Pair<K, V> value) {
+    return value.first();
+  }
+
+  @Override
+  public V outputValue(Pair<K, V> value) {
+    return value.second();
+  }
+
+  @Override
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+
+  @Override
+  public Class<V> getValueClass() {
+    return valueClass;
+  }
+
+  @Override
+  public Pair<K, Iterable<V>> convertIterableInput(K key, Iterable<V> value) {
+    return Pair.of(key, value);
+  }
+}


Mime
View raw message