crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/PipelineResult.java b/src/main/java/com/cloudera/crunch/PipelineResult.java
deleted file mode 100644
index 271b2de..0000000
--- a/src/main/java/com/cloudera/crunch/PipelineResult.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Container for the results of a call to {@code run} or {@code done} on the Pipeline interface that includes
- * details and statistics about the component stages of the data pipeline.
- */
-public class PipelineResult {
-
-  public static class StageResult {
-    
-    private final String stageName;
-    private final Counters counters;
-    
-    public StageResult(String stageName, Counters counters) {
-      this.stageName = stageName;
-      this.counters = counters;
-    }
-    
-    public String getStageName() {
-      return stageName;
-    }
-    
-    public Counters getCounters() {
-      return counters;
-    }
-    
-    public Counter findCounter(Enum<?> key) {
-      return counters.findCounter(key);
-    }
-    
-    public long getCounterValue(Enum<?> key) {
-      return findCounter(key).getValue();
-    }
-  }
-  
-  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult>of());
-  
-  private final List<StageResult> stageResults;
-  
-  public PipelineResult(List<StageResult> stageResults) {
-    this.stageResults = ImmutableList.copyOf(stageResults);
-  }
-  
-  public boolean succeeded() {
-    return !stageResults.isEmpty();
-  }
-  
-  public List<StageResult> getStageResults() {
-    return stageResults;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/Source.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/Source.java b/src/main/java/com/cloudera/crunch/Source.java
deleted file mode 100644
index 51586cc..0000000
--- a/src/main/java/com/cloudera/crunch/Source.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.types.PType;
-
-/**
- * A {@code Source} represents an input data set that is an input to one
- * or more MapReduce jobs.
- *
- */
-public interface Source<T> { 
-  /**
-   * Returns the {@code PType} for this source.
-   */
-  PType<T> getType();
-
-  /**
-   * Configure the given job to use this source as an input.
-   * 
-   * @param job The job to configure
-   * @param inputId For a multi-input job, an identifier for this input to the job
-   * @throws IOException
-   */
-  void configureSource(Job job, int inputId) throws IOException;
-
-  /**
-   * Returns the number of bytes in this {@code Source}.
-   */
-  long getSize(Configuration configuration);  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/SourceTarget.java b/src/main/java/com/cloudera/crunch/SourceTarget.java
deleted file mode 100644
index 5fb0c17..0000000
--- a/src/main/java/com/cloudera/crunch/SourceTarget.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-
-/**
- * An interface for classes that implement both the {@code Source} and
- * the {@code Target} interfaces.
- *
- */
-public interface SourceTarget<T> extends Source<T>, Target {
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/TableSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/TableSource.java b/src/main/java/com/cloudera/crunch/TableSource.java
deleted file mode 100644
index 3af5056..0000000
--- a/src/main/java/com/cloudera/crunch/TableSource.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import com.cloudera.crunch.types.PTableType;
-
-/**
- * The interface {@code Source} implementations that return a {@link PTable}.
- *
- */
-public interface TableSource<K, V> extends Source<Pair<K, V>> {
-  PTableType<K, V> getTableType();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/Target.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/Target.java b/src/main/java/com/cloudera/crunch/Target.java
deleted file mode 100644
index 26c5d49..0000000
--- a/src/main/java/com/cloudera/crunch/Target.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import com.cloudera.crunch.io.OutputHandler;
-import com.cloudera.crunch.types.PType;
-
-/**
- * A {@code Target} represents the output destination of a Crunch job.
- *
- */
-public interface Target {
-  boolean accept(OutputHandler handler, PType<?> ptype);
-  
-  <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/Tuple.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/Tuple.java b/src/main/java/com/cloudera/crunch/Tuple.java
deleted file mode 100644
index ca9c41d..0000000
--- a/src/main/java/com/cloudera/crunch/Tuple.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.crunch;
-
-/**
- * A fixed-size collection of Objects, used in Crunch for representing
- * joins between {@code PCollection}s.
- *
- */
-public interface Tuple {
-
-  /**
-   * Returns the Object at the given index.
-   */
-  Object get(int index);
-
-  /**
-   * Returns the number of elements in this Tuple.
-   */
-  int size();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/Tuple3.java b/src/main/java/com/cloudera/crunch/Tuple3.java
deleted file mode 100644
index 57e192b..0000000
--- a/src/main/java/com/cloudera/crunch/Tuple3.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for three-element {@link Tuple}s.
- */
-public class Tuple3<V1, V2, V3> implements Tuple {
-
-  private final V1 first;
-  private final V2 second;
-  private final V3 third;
-
-  public static <A, B, C> Tuple3<A, B, C> of(A a, B b, C c) {
-    return new Tuple3<A, B, C>(a, b, c);
-  }
-  
-  public Tuple3(V1 first, V2 second, V3 third) {
-    this.first = first;
-    this.second = second;
-    this.third = third;
-  }
-
-  public V1 first() {
-    return first;
-  }
-
-  public V2 second() {
-    return second;
-  }
-
-  public V3 third() {
-    return third;
-  }
-
-  public Object get(int index) {
-    switch (index) {
-    case 0:
-      return first;
-    case 1:
-      return second;
-    case 2:
-      return third;
-    default:
-      throw new ArrayIndexOutOfBoundsException();
-    }
-  }
-
-  public int size() {
-    return 3;
-  }
-  
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).append(third).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first))) &&
-    	(second == other.second || (second != null && second.equals(other.second))) &&
-    	(third == other.third || (third != null && third.equals(other.third)));
-  }
-
-  @Override
-  public String toString() {
-	StringBuilder sb = new StringBuilder("Tuple3[");
-	sb.append(first).append(",").append(second).append(",").append(third);
-	return sb.append("]").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/Tuple4.java b/src/main/java/com/cloudera/crunch/Tuple4.java
deleted file mode 100644
index d1edf9d..0000000
--- a/src/main/java/com/cloudera/crunch/Tuple4.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for four-element {@link Tuple}s.
- */
-public class Tuple4<V1, V2, V3, V4> implements Tuple {
-
-  private final V1 first;
-  private final V2 second;
-  private final V3 third;
-  private final V4 fourth;
-
-  public static <A, B, C, D> Tuple4<A, B, C, D> of(A a, B b, C c, D d) {
-    return new Tuple4<A, B, C, D>(a, b, c, d);
-  }
-  
-  public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
-    this.first = first;
-    this.second = second;
-    this.third = third;
-    this.fourth = fourth;
-  }
-
-  public V1 first() {
-    return first;
-  }
-
-  public V2 second() {
-    return second;
-  }
-
-  public V3 third() {
-    return third;
-  }
-
-  public V4 fourth() {
-    return fourth;
-  }
-
-  public Object get(int index) {
-    switch (index) {
-    case 0:
-      return first;
-    case 1:
-      return second;
-    case 2:
-      return third;
-    case 3:
-      return fourth;
-    default:
-      throw new ArrayIndexOutOfBoundsException();
-    }
-  }
-
-  public int size() {
-    return 4;
-  }
-  
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).append(third)
-    	.append(fourth).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first))) &&
-    	(second == other.second || (second != null && second.equals(other.second))) &&
-    	(third == other.third || (third != null && third.equals(other.third))) &&
-    	(fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
-  }
-
-  @Override
-  public String toString() {
-	StringBuilder sb = new StringBuilder("Tuple4[");
-	sb.append(first).append(",").append(second).append(",").append(third);
-	return sb.append(",").append(fourth).append("]").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/TupleN.java b/src/main/java/com/cloudera/crunch/TupleN.java
deleted file mode 100644
index 493ec7f..0000000
--- a/src/main/java/com/cloudera/crunch/TupleN.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.crunch;
-
-import java.util.Arrays;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A {@link Tuple} instance for an arbitrary number of values.
- */
-public class TupleN implements Tuple {
-
-  private final Object values[];
-
-  public TupleN(Object... values) {
-    this.values = new Object[values.length];
-    System.arraycopy(values, 0, this.values, 0, values.length);
-  }
-
-  public Object get(int index) {
-    return values[index];
-  }
-
-  public int size() {
-    return values.length;
-  }
-  
-  @Override
-  public int hashCode() {
-  	HashCodeBuilder hcb = new HashCodeBuilder();
-  	for (Object v : values) {
-  	  hcb.append(v);
-  	}
-  	return hcb.toHashCode();
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    TupleN other = (TupleN) obj;
-    return Arrays.equals(this.values, other.values);
-  }
-
-  @Override
-  public String toString() {
-    return Arrays.toString(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java b/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java
deleted file mode 100644
index 9b4e6cd..0000000
--- a/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.MapFn;
-
-public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
-  
-  private final MapFn<R, S> first;
-  private final MapFn<S, T> second;
-  
-  public CompositeMapFn(MapFn<R, S> first, MapFn<S, T> second) {
-    this.first = first;
-    this.second = second;
-  }
-  
-  @Override
-  public void initialize() {
-    first.setContext(getContext());
-    second.setContext(getContext());
-  }
-  
-  public MapFn<R, S> getFirst() {
-    return first;
-  }
-  
-  public MapFn<S, T> getSecond() {
-    return second;
-  }
-  
-  @Override
-  public T map(R input) {
-    return second.map(first.map(input));
-  }
-  
-  @Override
-  public void cleanup(Emitter<T> emitter) {
-    first.cleanup(null);
-    second.cleanup(null);
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-    first.configure(conf);
-    second.configure(conf);
-  }
-
-  @Override
-  public void setConfigurationForTest(Configuration conf) {
-    first.setConfigurationForTest(conf);
-    second.setConfigurationForTest(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java b/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java
deleted file mode 100644
index 716d4eb..0000000
--- a/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.Pair;
-
-/**
- * Wrapper function for converting a {@code MapFn} into a key-value pair that
- * is used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
- */
-public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
-  
-  private final MapFn<V, K> mapFn;
-  
-  public ExtractKeyFn(MapFn<V, K> mapFn) {
-    this.mapFn = mapFn;
-  }
-  
-  @Override
-  public void initialize() {
-    this.mapFn.setContext(getContext());
-  }
-  
-  @Override
-  public Pair<K, V> map(V input) {
-    return Pair.of(mapFn.map(input), input);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/fn/IdentityFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/IdentityFn.java b/src/main/java/com/cloudera/crunch/fn/IdentityFn.java
deleted file mode 100644
index fd75215..0000000
--- a/src/main/java/com/cloudera/crunch/fn/IdentityFn.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import com.cloudera.crunch.MapFn;
-
-public class IdentityFn<T> extends MapFn<T, T> {
-  
-  private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
-
-  @SuppressWarnings("unchecked")
-  public static <T> IdentityFn<T> getInstance() {
-    return (IdentityFn<T>) INSTANCE;
-  }
-
-  // Non-instantiable
-  private IdentityFn() {
-  }
-
-  @Override
-  public T map(T input) {
-    return input;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java b/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
deleted file mode 100644
index 26397d4..0000000
--- a/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.Pair;
-
-public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
-  
-  @Override
-  public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
-    emitter.emit(Pair.of(map(input.first()), input.second()));
-  }
-
-  public abstract K2 map(K1 k1);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java b/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
deleted file mode 100644
index 881ba1c..0000000
--- a/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.Pair;
-
-public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
-  
-  @Override
-  public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
-    emitter.emit(Pair.of(input.first(), map(input.second())));
-  }
-
-  public abstract V2 map(V1 v);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/PairMapFn.java b/src/main/java/com/cloudera/crunch/fn/PairMapFn.java
deleted file mode 100644
index f0c731f..0000000
--- a/src/main/java/com/cloudera/crunch/fn/PairMapFn.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.Pair;
-
-public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
-  
-  private MapFn<K, S> keys;
-  private MapFn<V, T> values;
-
-  public PairMapFn(MapFn<K, S> keys, MapFn<V, T> values) {
-    this.keys = keys;
-    this.values = values;
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-    keys.configure(conf);
-    values.configure(conf);
-  }
-  
-  @Override
-  public void initialize() {
-    keys.setContext(getContext());
-    values.setContext(getContext());
-  }
-
-  @Override
-  public Pair<S, T> map(Pair<K, V> input) {
-    return Pair.of(keys.map(input.first()), values.map(input.second()));
-  }
-  
-  @Override
-  public void cleanup(Emitter<Pair<S, T>> emitter) {
-    keys.cleanup(null);
-    values.cleanup(null);
-  }
-
-  @Override
-  public void setConfigurationForTest(Configuration conf) {
-    keys.setConfigurationForTest(conf);
-    values.setConfigurationForTest(conf);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java b/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java
deleted file mode 100644
index ac81e95..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mem;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.PipelineResult;
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.TableSource;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.impl.mem.collect.MemCollection;
-import com.cloudera.crunch.impl.mem.collect.MemTable;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.io.PathTarget;
-import com.cloudera.crunch.io.ReadableSource;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class MemPipeline implements Pipeline {
-
-  private static final Log LOG = LogFactory.getLog(MemPipeline.class);
-  
-  private static final MemPipeline INSTANCE = new MemPipeline();
-  
-  public static Pipeline getInstance() {
-    return INSTANCE;
-  }
-  
-  public static <T> PCollection<T> collectionOf(T...ts) {
-    return new MemCollection<T>(ImmutableList.copyOf(ts));  
-  }
-  
-  public static <T> PCollection<T> collectionOf(Iterable<T> collect) {
-    return new MemCollection<T>(collect);
-  }
-  
-  public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, T... ts) {
-    return new MemCollection<T>(ImmutableList.copyOf(ts), ptype, null);  
-  }
-  
-  public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, Iterable<T> collect) {
-    return new MemCollection<T>(collect, ptype, null);  
-  }
-  
-  public static <S, T> PTable<S, T> tableOf(S s, T t, Object... more) {
-    List<Pair<S, T>> pairs = Lists.newArrayList();
-    pairs.add(Pair.of(s, t));
-    for (int i = 0; i < more.length; i += 2) {
-      pairs.add(Pair.of((S) more[i], (T) more[i + 1]));
-    }
-    return new MemTable<S, T>(pairs);
-  }
-  
-  public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, S s, T t, Object... more) {
-    List<Pair<S, T>> pairs = Lists.newArrayList();
-    pairs.add(Pair.of(s, t));
-    for (int i = 0; i < more.length; i += 2) {
-      pairs.add(Pair.of((S) more[i], (T) more[i + 1]));
-    }
-    return new MemTable<S, T>(pairs, ptype, null);
-  }
-  
-  public static <S, T> PTable<S, T> tableOf(Iterable<Pair<S, T>> pairs) {
-    return new MemTable<S, T>(pairs);
-  }
-  
-  public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, Iterable<Pair<S, T>> pairs) {
-    return new MemTable<S, T>(pairs, ptype, null);
-  }
-  
-  private Configuration conf = new Configuration();
-
-  private MemPipeline() {
-  }
-  
-  @Override
-  public void setConfiguration(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  @Override
-  public <T> PCollection<T> read(Source<T> source) {
-    if (source instanceof ReadableSource) {
-      try {
-        Iterable<T> iterable = ((ReadableSource<T>) source).read(conf);
-        return new MemCollection<T>(iterable, source.getType(), source.toString());
-      } catch (IOException e) {
-        LOG.error("Exception reading source: " + source.toString(), e);
-        throw new IllegalStateException(e);
-      }
-    }
-    LOG.error("Source " + source + " is not readable");
-    throw new IllegalStateException("Source " + source + " is not readable");
-  }
-
-  @Override
-  public <K, V> PTable<K, V> read(TableSource<K, V> source) {
-    if (source instanceof ReadableSource) {
-      try {
-        Iterable<Pair<K, V>> iterable = ((ReadableSource<Pair<K, V>>) source).read(conf);
-        return new MemTable<K, V>(iterable, source.getTableType(), source.toString());
-      } catch (IOException e) {
-        LOG.error("Exception reading source: " + source.toString(), e);
-        throw new IllegalStateException(e);
-      }
-    }
-    LOG.error("Source " + source + " is not readable");
-    throw new IllegalStateException("Source " + source + " is not readable");
-  }
-
-  @Override
-  public void write(PCollection<?> collection, Target target) {
-    if (target instanceof PathTarget) {
-      Path path = ((PathTarget) target).getPath();
-      try {
-        FileSystem fs = FileSystem.get(conf);
-        FSDataOutputStream os = fs.create(new Path(path, "out"));
-        if (collection instanceof PTable) {
-          for (Object o : collection.materialize()) {
-            Pair p = (Pair) o;
-            os.writeBytes(p.first().toString());
-            os.writeBytes("\t");
-            os.writeBytes(p.second().toString());
-            os.writeBytes("\r\n");
-          }
-        } else {
-          for (Object o : collection.materialize()) {
-            os.writeBytes(o.toString() + "\r\n");
-          }
-        }
-        os.close();
-      } catch (IOException e) {
-        LOG.error("Exception writing target: " + target, e);
-      }
-    } else {
-      LOG.error("Target " + target + " is not a PathTarget instance");
-    }
-  }
-
-  @Override
-  public PCollection<String> readTextFile(String pathName) {
-    return read(At.textFile(pathName));
-  }
-
-  @Override
-  public <T> void writeTextFile(PCollection<T> collection, String pathName) {
-    write(collection, At.textFile(pathName));
-  }
-
-  @Override
-  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
-    return pcollection.materialize();
-  }
-
-  @Override
-  public PipelineResult run() {
-    return PipelineResult.EMPTY;
-  }
-
-  @Override
-  public PipelineResult done() {
-    return PipelineResult.EMPTY;
-  }
-
-  @Override
-  public void enableDebug() {
-	LOG.info("Note: in-memory pipelines do not have debug logging");
-  }
-  
-  @Override
-  public String getName() {
-	  return "Memory Pipeline";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mem/collect/MemCollection.java b/src/main/java/com/cloudera/crunch/impl/mem/collect/MemCollection.java
deleted file mode 100644
index 76758d8..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mem/collect/MemCollection.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mem.collect;
-
-import java.util.Collection;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.FilterFn;
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.fn.ExtractKeyFn;
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.lib.Sample;
-import com.cloudera.crunch.lib.Sort;
-import com.cloudera.crunch.test.InMemoryEmitter;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-
-public class MemCollection<S> implements PCollection<S> {
-
-  private final Collection<S> collect;
-  private final PType<S> ptype;
-  private String name;
-  
-  public MemCollection(Iterable<S> collect) {
-    this(collect, null, null);
-  }
-  
-  public MemCollection(Iterable<S> collect, PType<S> ptype) {
-    this(collect, ptype, null);
-  }
-  
-  public MemCollection(Iterable<S> collect, PType<S> ptype, String name) {
-    this.collect = ImmutableList.copyOf(collect);
-    this.ptype = ptype;
-    this.name = name;
-  }
-  
-  @Override
-  public Pipeline getPipeline() {
-    return MemPipeline.getInstance();
-  }
-
-  @Override
-  public PCollection<S> union(PCollection<S>... collections) {
-    Collection<S> output = Lists.newArrayList();    
-    for (PCollection<S> pcollect : collections) {
-      for (S s : pcollect.materialize()) {
-        output.add(s);
-      }
-    }
-    output.addAll(collect);
-    return new MemCollection<S>(output, collections[0].getPType());
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type) {
-    return parallelDo(null, doFn, type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type) {
-    InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
-    doFn.initialize();
-    for (S s : collect) {
-      doFn.process(s, emitter);
-    }
-    doFn.cleanup(emitter);
-    return new MemCollection<T>(emitter.getOutput(), type, name);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
-    return parallelDo(null, doFn, type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn,
-      PTableType<K, V> type) {
-    InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
-    doFn.initialize();
-    for (S s : collect) {
-      doFn.process(s, emitter);
-    }
-    doFn.cleanup(emitter);
-    return new MemTable<K, V>(emitter.getOutput(), type, name);
-  }
-
-  @Override
-  public PCollection<S> write(Target target) {
-    getPipeline().write(this, target);
-    return this;
-  }
-
-  @Override
-  public Iterable<S> materialize() {
-    return collect;
-  }
-
-  public Collection<S> getCollection() {
-    return collect;
-  }
-  
-  @Override
-  public PType<S> getPType() {
-    return ptype;
-  }
-
-  @Override
-  public PTypeFamily getTypeFamily() {
-    if (ptype != null) {
-      return ptype.getFamily();
-    }
-    return null;
-  }
-
-  @Override
-  public long getSize() {
-    return collect.size();
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-  
-  @Override
-  public String toString() {
-    return collect.toString();
-  }
-
-  @Override
-  public PTable<S, Long> count() {
-	return Aggregate.count(this);
-  }
-
-  @Override
-  public PCollection<S> sample(double acceptanceProbability) {
-	return Sample.sample(this, acceptanceProbability);
-  }
-
-  @Override
-  public PCollection<S> sample(double acceptanceProbability, long seed) {
-	return Sample.sample(this, seed, acceptanceProbability);
-  }
-
-  @Override
-  public PCollection<S> max() {
-	return Aggregate.max(this);
-  }
-
-  @Override
-  public PCollection<S> min() {
-	return Aggregate.min(this);
-  }
-
-  @Override
-  public PCollection<S> sort(boolean ascending) {
-	return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING);
-  }
-
-  @Override
-  public PCollection<S> filter(FilterFn<S> filterFn) {
-    return parallelDo(filterFn, getPType());
-  }
-  
-  @Override
-  public PCollection<S> filter(String name, FilterFn<S> filterFn) {
-    return parallelDo(name, filterFn, getPType());
-  }
-  
-  @Override
-  public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
-  public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mem/collect/MemGroupedTable.java b/src/main/java/com/cloudera/crunch/impl/mem/collect/MemGroupedTable.java
deleted file mode 100644
index 802c6c8..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mem/collect/MemGroupedTable.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mem.collect;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.cloudera.crunch.CombineFn;
-import com.cloudera.crunch.GroupingOptions;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PGroupedTable;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
-
-  private final MemTable<K, V> parent;
-  
-  private static <S, T> Map<S, Collection<T>> createMapFor(PType<S> keyType, GroupingOptions options, Pipeline pipeline) {
-    if (options != null && options.getSortComparatorClass() != null) {
-      RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(),
-          pipeline.getConfiguration());
-      return new TreeMap<S, Collection<T>>(rc);
-    } else if (keyType != null && Comparable.class.isAssignableFrom(keyType.getTypeClass())) {
-      return new TreeMap<S, Collection<T>>();
-    }
-    return Maps.newHashMap();
-  }
-  
-  private static <S, T> Iterable<Pair<S, Iterable<T>>> buildMap(MemTable<S, T> parent, GroupingOptions options) {
-    PType<S> keyType = parent.getKeyType();
-    Map<S, Collection<T>> map = createMapFor(keyType, options, parent.getPipeline());
-    
-    for (Pair<S, T> pair : parent.materialize()) {
-      S key = pair.first();
-      if (!map.containsKey(key)) {
-        map.put(key, Lists.<T>newArrayList());
-      }
-      map.get(key).add(pair.second());
-    }
-    
-    List<Pair<S, Iterable<T>>> values = Lists.newArrayList();
-    for (Map.Entry<S, Collection<T>> e : map.entrySet()) {
-      values.add(Pair.of(e.getKey(), (Iterable<T>) e.getValue()));
-    }
-    return values;
-  }
-  
-  public MemGroupedTable(MemTable<K, V> parent, GroupingOptions options) {
-	super(buildMap(parent, options));
-    this.parent = parent;
-  }
-
-  @Override
-  public PCollection<Pair<K, Iterable<V>>> union(
-      PCollection<Pair<K, Iterable<V>>>... collections) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public PCollection<Pair<K, Iterable<V>>> write(Target target) {
-    getPipeline().write(this.ungroup(), target);
-    return this;
-  }
-
-  @Override
-  public PType<Pair<K, Iterable<V>>> getPType() {
-    PTableType<K, V> parentType = parent.getPTableType();
-    if (parentType != null) {
-      return parentType.getGroupedTableType();
-    }
-    return null;
-  }
-
-  @Override
-  public PTypeFamily getTypeFamily() {
-    return parent.getTypeFamily();
-  }
-
-  @Override
-  public long getSize() {
-    return parent.getSize();
-  }
-
-  @Override
-  public String getName() {
-    return "MemGrouped(" + parent.getName() + ")";
-  }
-
-  @Override
-  public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
-    return parallelDo(combineFn, parent.getPTableType());
-  }
-
-  @Override
-  public PTable<K, V> ungroup() {
-    return parent;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mem/collect/MemTable.java b/src/main/java/com/cloudera/crunch/impl/mem/collect/MemTable.java
deleted file mode 100644
index 352ee9d..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mem/collect/MemTable.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mem.collect;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import com.cloudera.crunch.GroupingOptions;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PGroupedTable;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.lib.Cogroup;
-import com.cloudera.crunch.lib.Join;
-import com.cloudera.crunch.lib.PTables;
-import com.cloudera.crunch.materialize.MaterializableMap;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.Lists;
-
-public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<K, V> {
-
-  private PTableType<K, V> ptype;
-  
-  public MemTable(Iterable<Pair<K, V>> collect) {
-    this(collect, null, null);
-  }
-  
-  public MemTable(Iterable<Pair<K, V>> collect, PTableType<K, V> ptype, String name) {
-    super(collect, ptype, name);
-    this.ptype = ptype;
-  }
-  
-  @Override
-  public PTable<K, V> union(PTable<K, V>... others) {
-    List<Pair<K, V>> values = Lists.newArrayList();
-    values.addAll(getCollection());
-    for (PTable<K, V> ptable : others) {
-      for (Pair<K, V> p : ptable.materialize()) {
-        values.add(p);
-      }
-    }
-    return new MemTable<K, V>(values, others[0].getPTableType(), null);
-  }
-
-  @Override
-  public PGroupedTable<K, V> groupByKey() {
-    return groupByKey(null);
-  }
-
-  @Override
-  public PGroupedTable<K, V> groupByKey(int numPartitions) {
-    return groupByKey(null);
-  }
-
-  @Override
-  public PGroupedTable<K, V> groupByKey(GroupingOptions options) {
-    return new MemGroupedTable<K, V>(this, options);
-  }
-
-  @Override
-  public PTable<K, V> write(Target target) {
-    super.write(target);
-    return this;
-  }
-  
-  @Override
-  public PTableType<K, V> getPTableType() {
-    return ptype;
-  }
-
-  @Override
-  public PType<K> getKeyType() {
-    if (ptype != null) {
-      return ptype.getKeyType();
-    }
-    return null;
-  }
-
-  @Override
-  public PType<V> getValueType() {
-    if (ptype != null) {
-      return ptype.getValueType();
-    }
-    return null;
-  }
-
-  @Override
-  public PTable<K, V> top(int count) {
-	return Aggregate.top(this, count, true);
-  }
-
-  @Override
-  public PTable<K, V> bottom(int count) {
-	return Aggregate.top(this, count, false);
-  }
-
-  @Override
-  public PTable<K, Collection<V>> collectValues() {
-	return Aggregate.collectValues(this);
-  }
-
-  @Override
-  public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
-	return Join.join(this, other);
-  }
-  
-  @Override
-  public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
-	return Cogroup.cogroup(this, other);
-  }
-  
-  @Override
-  public PCollection<K> keys() {
-	return PTables.keys(this);
-  }
-
-  @Override
-  public PCollection<V> values() {
-    return PTables.values(this);
-  }
-
-  @Override
-  public Map<K, V> materializeToMap() {
-    return new MaterializableMap<K, V>(this.materialize());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java b/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
deleted file mode 100644
index c8ba596..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- * 
- */
-package com.cloudera.crunch.impl.mr;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.PipelineResult;
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.TableSource;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.fn.IdentityFn;
-import com.cloudera.crunch.impl.mr.collect.InputCollection;
-import com.cloudera.crunch.impl.mr.collect.InputTable;
-import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
-import com.cloudera.crunch.impl.mr.collect.PGroupedTableImpl;
-import com.cloudera.crunch.impl.mr.collect.UnionCollection;
-import com.cloudera.crunch.impl.mr.collect.UnionTable;
-import com.cloudera.crunch.impl.mr.plan.MSCRPlanner;
-import com.cloudera.crunch.impl.mr.run.RuntimeParameters;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.io.ReadableSourceTarget;
-import com.cloudera.crunch.materialize.MaterializableIterable;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class MRPipeline implements Pipeline {
-
-  private static final Log LOG = LogFactory.getLog(MRPipeline.class);
-
-  private static final Random RANDOM = new Random();
-
-  private final Class<?> jarClass;
-  private final String name;
-  private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
-  private final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
-  private final Path tempDirectory;
-  private int tempFileIndex;
-  private int nextAnonymousStageId;
-
-  private Configuration conf;
-
-  public MRPipeline(Class<?> jarClass) throws IOException {
-    this(jarClass, new Configuration());
-  }
-
-  public MRPipeline(Class<?> jarClass, String name) {
-    this(jarClass, name, new Configuration());
-  }
-
-  public MRPipeline(Class<?> jarClass, Configuration conf) {
-    this(jarClass, jarClass.getName(), conf);
-  }
-
-  public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
-    this.jarClass = jarClass;
-    this.name = name;
-    this.outputTargets = Maps.newHashMap();
-    this.outputTargetsToMaterialize = Maps.newHashMap();
-    this.conf = conf;
-    this.tempDirectory = createTempDirectory(conf);
-    this.tempFileIndex = 0;
-    this.nextAnonymousStageId = 0;
-  }
-
-  @Override
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  @Override
-  public void setConfiguration(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public PipelineResult run() {
-    MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
-    PipelineResult res = null;
-    try {
-      res = planner.plan(jarClass, conf).execute();
-    } catch (IOException e) {
-      LOG.error(e);
-      return PipelineResult.EMPTY;
-    }
-    for (PCollectionImpl<?> c : outputTargets.keySet()) {
-      if (outputTargetsToMaterialize.containsKey(c)) {
-        MaterializableIterable iter = outputTargetsToMaterialize.get(c);
-        iter.materialize();
-        c.materializeAt(iter.getSourceTarget());
-        outputTargetsToMaterialize.remove(c);
-      } else {
-        boolean materialized = false;
-        for (Target t : outputTargets.get(c)) {
-          if (!materialized && t instanceof Source) {
-            c.materializeAt((SourceTarget) t);
-            materialized = true;
-          }
-        }
-      }
-    }
-    outputTargets.clear();
-    return res;
-  }
-
-  @Override
-  public PipelineResult done() {
-    PipelineResult res = null;
-    if (!outputTargets.isEmpty()) {
-      res = run();
-    }
-    cleanup();
-    return res;
-  }
-
-  public <S> PCollection<S> read(Source<S> source) {
-    return new InputCollection<S>(source, this);
-  }
-
-  public <K, V> PTable<K, V> read(TableSource<K, V> source) {
-    return new InputTable<K, V>(source, this);
-  }
-
-  public PCollection<String> readTextFile(String pathName) {
-    return read(At.textFile(pathName));
-  }
-
-  @SuppressWarnings("unchecked")
-  public void write(PCollection<?> pcollection, Target target) {
-    if (pcollection instanceof PGroupedTableImpl) {
-      pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
-    } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
-      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
-          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
-    }
-    addOutput((PCollectionImpl<?>) pcollection, target);
-  }
-
-  private void addOutput(PCollectionImpl<?> impl, Target target) {
-    if (!outputTargets.containsKey(impl)) {
-      outputTargets.put(impl, Sets.<Target> newHashSet());
-    }
-    outputTargets.get(impl).add(target);
-  }
-
-  @Override
-  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
-
-    PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
-    ReadableSourceTarget<T> srcTarget = getMaterializeSourceTarget(pcollectionImpl);
-
-    MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget);
-    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
-      outputTargetsToMaterialize.put(pcollectionImpl, c);
-    }
-    return c;
-  }
-
-  /**
-   * Retrieve a ReadableSourceTarget that provides access to the contents of a
-   * {@link PCollection}. This is primarily intended as a helper method to
-   * {@link #materialize(PCollection)}. The underlying data of the
-   * ReadableSourceTarget may not be actually present until the pipeline is run.
-   * 
-   * @param pcollection
-   *          The collection for which the ReadableSourceTarget is to be
-   *          retrieved
-   * @return The ReadableSourceTarget
-   * @throws IllegalArgumentException
-   *           If no ReadableSourceTarget can be retrieved for the given
-   *           PCollection
-   */
-  public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T> pcollection) {
-    PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
-    SourceTarget<T> matTarget = impl.getMaterializedAt();
-    if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
-      return (ReadableSourceTarget<T>) matTarget;
-    }
-
-    ReadableSourceTarget<T> srcTarget = null;
-    if (outputTargets.containsKey(pcollection)) {
-      for (Target target : outputTargets.get(impl)) {
-        if (target instanceof ReadableSourceTarget) {
-          srcTarget = (ReadableSourceTarget<T>) target;
-          break;
-        }
-      }
-    }
-
-    if (srcTarget == null) {
-      SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
-      if (!(st instanceof ReadableSourceTarget)) {
-        throw new IllegalArgumentException("The PType for the given PCollection is not readable"
-            + " and cannot be materialized");
-      } else {
-        srcTarget = (ReadableSourceTarget<T>) st;
-        addOutput(impl, srcTarget);
-      }
-    }
-
-    return srcTarget;
-  }
-
-  /**
-   * Safely cast a PCollection into a PCollectionImpl, including handling the case of UnionCollections.
-   * @param pcollection The PCollection to be cast/transformed
-   * @return The PCollectionImpl representation
-   */
-  private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) {
-    PCollectionImpl<T> pcollectionImpl = null;
-    if (pcollection instanceof UnionCollection) {
-      pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper",
-          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
-    } else {
-      pcollectionImpl = (PCollectionImpl<T>) pcollection;
-    }
-    return pcollectionImpl;
-  }
-
-  public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
-    return ptype.getDefaultFileSource(createTempPath());
-  }
-
-  public Path createTempPath() {
-    tempFileIndex++;
-    return new Path(tempDirectory, "p" + tempFileIndex);
-  }
-
-  private static Path createTempDirectory(Configuration conf) {
-    Path dir = new Path("/tmp/crunch" + RANDOM.nextInt());
-    try {
-      FileSystem.get(conf).mkdirs(dir);
-    } catch (IOException e) {
-      LOG.error("Exception creating job output directory", e);
-      throw new RuntimeException(e);
-    }
-    return dir;
-  }
-
-  @Override
-  public <T> void writeTextFile(PCollection<T> pcollection, String pathName) {
-    // Ensure that this is a writable pcollection instance.
-    pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily
-        .getInstance().as(pcollection.getPType()));
-    write(pcollection, At.textFile(pathName));
-  }
-
-  private void cleanup() {
-    if (!outputTargets.isEmpty()) {
-      LOG.warn("Not running cleanup while output targets remain");
-      return;
-    }
-    try {
-      FileSystem fs = FileSystem.get(conf);
-      if (fs.exists(tempDirectory)) {
-        fs.delete(tempDirectory, true);
-      }
-    } catch (IOException e) {
-      LOG.info("Exception during cleanup", e);
-    }
-  }
-
-  public int getNextAnonymousStageId() {
-    return nextAnonymousStageId++;
-  }
-
-  @Override
-  public void enableDebug() {
-    // Turn on Crunch runtime error catching.
-    getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
-
-    // Write Hadoop's WARN logs to the console.
-    Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch");
-    Appender console = crunchInfoLogger.getAppender("A");
-    if (console != null) {
-      Logger hadoopLogger = LogManager.getLogger("org.apache.hadoop");
-      hadoopLogger.setLevel(Level.WARN);
-      hadoopLogger.addAppender(console);
-    } else {
-      LOG.warn("Could not find console appender named 'A' for writing Hadoop warning logs");
-    }
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImpl.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImpl.java
deleted file mode 100644
index 677bdbb..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImpl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-
-public class DoCollectionImpl<S> extends PCollectionImpl<S> {
-
-  private final PCollectionImpl<Object> parent;
-  private final DoFn<Object, S> fn;
-  private final PType<S> ntype;
-
-  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn,
-      PType<S> ntype) {
-    super(name);
-    this.parent = (PCollectionImpl<Object>) parent;
-    this.fn = (DoFn<Object, S>) fn;
-    this.ntype = ntype;
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return (long) (fn.scaleFactor() * parent.getSize());
-  }
-  
-  @Override
-  public PType<S> getPType() {
-    return ntype;
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitDoFnCollection(this);
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> of(parent);
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(), fn, ntype);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/DoTableImpl.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/DoTableImpl.java
deleted file mode 100644
index 367719b..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/DoTableImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import com.cloudera.crunch.CombineFn;
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-
-public class DoTableImpl<K, V> extends PTableBase<K, V> implements
-    PTable<K, V> {
-
-  private final PCollectionImpl<?> parent;
-  private final DoFn<?, Pair<K, V>> fn;
-  private final PTableType<K, V> type;
-
-  <S> DoTableImpl(String name, PCollectionImpl<S> parent,
-      DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
-    super(name);
-    this.parent = parent;
-    this.fn = fn;
-    this.type = ntype;
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return (long) (fn.scaleFactor() * parent.getSize());
-  }
-
-  @Override
-  public PTableType<K, V> getPTableType() {
-    return type;
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitDoTable(this);
-  }
-
-  @Override
-  public PType<Pair<K, V>> getPType() {
-    return type;
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> of(parent);
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(), fn, type);
-  }
-  
-  public boolean hasCombineFn() {
-    return fn instanceof CombineFn;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/InputCollection.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/InputCollection.java
deleted file mode 100644
index b764c68..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/InputCollection.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-
-public class InputCollection<S> extends PCollectionImpl<S> {
-
-  private final Source<S> source;
-
-  public InputCollection(Source<S> source, MRPipeline pipeline) {
-    super(source.toString());
-    this.source = source;
-    this.pipeline = pipeline;
-  }
-
-  @Override
-  public PType<S> getPType() {
-    return source.getType();
-  }
-
-  public Source<S> getSource() {
-    return source;
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    long sz = source.getSize(pipeline.getConfiguration());
-    if (sz < 0) {
-      throw new IllegalStateException("Input source " + source + " does not exist!");
-    }
-    return sz;
-  }
-  
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitInputCollection(this);
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.of();
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    return DoNode.createInputNode(source);
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null || !(obj instanceof InputCollection)) {
-      return false;
-    }
-    return source.equals(((InputCollection) obj).source);
-  }
-  
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(source).toHashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/InputTable.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/InputTable.java
deleted file mode 100644
index 8a36c66..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/InputTable.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.TableSource;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-
-public class InputTable<K, V> extends PTableBase<K, V> {
-
-  private final TableSource<K, V> source;
-  private final InputCollection<Pair<K, V>> asCollection;
-  
-  public InputTable(TableSource<K, V> source, MRPipeline pipeline) {
-    super(source.toString());
-    this.source = source;
-    this.pipeline = pipeline;
-    this.asCollection = new InputCollection<Pair<K, V>>(source, pipeline);
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return asCollection.getSizeInternal();
-  }
-
-  @Override
-  public PTableType<K, V> getPTableType() {
-    return source.getTableType();
-  }
-
-  @Override
-  public PType<Pair<K, V>> getPType() {
-    return source.getType();
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.of();
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitInputCollection(asCollection);
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    return DoNode.createInputNode(source);
-  }
-  
-  @Override
-  public int hashCode() {
-    return asCollection.hashCode();
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-    return asCollection.equals(other);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/PCollectionImpl.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/PCollectionImpl.java
deleted file mode 100644
index 6ed98b1..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/PCollectionImpl.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.FilterFn;
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.fn.ExtractKeyFn;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.lib.Sample;
-import com.cloudera.crunch.lib.Sort;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.google.common.collect.Lists;
-
-public abstract class PCollectionImpl<S> implements PCollection<S> {
-
-  private static final Log LOG = LogFactory.getLog(PCollectionImpl.class);
-
-  private final String name;
-  protected MRPipeline pipeline;
-  private SourceTarget<S> materializedAt;
-
-  public PCollectionImpl(String name) {
-    this.name = name;
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public String toString() {
-    return getName();
-  }
-
-  @Override
-  public PCollection<S> union(PCollection<S>... collections) {
-    List<PCollectionImpl<S>> internal = Lists.newArrayList();
-    internal.add(this);
-    for (PCollection<S> collection : collections) {
-      internal.add((PCollectionImpl<S>) collection);
-    }
-    return new UnionCollection<S>(internal);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(DoFn<S, T> fn, PType<T> type) {
-    MRPipeline pipeline = (MRPipeline) getPipeline();
-    return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
-    return new DoCollectionImpl<T>(name, this, fn, type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    MRPipeline pipeline = (MRPipeline) getPipeline();
-    return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return new DoTableImpl<K, V>(name, this, fn, type);
-  }
-
-  @Override
-  public PCollection<S> write(Target target) {
-    getPipeline().write(this, target);
-    return this;
-  }
-
-  @Override
-  public Iterable<S> materialize() {
-    if (getSize() == 0) {
-      LOG.warn("Materializing an empty PCollection: " + this.getName());
-      return Collections.emptyList();
-    }
-    return getPipeline().materialize(this);
-  }
-
-  public SourceTarget<S> getMaterializedAt() {
-    return materializedAt;
-  }
-
-  public void materializeAt(SourceTarget<S> sourceTarget) {
-    this.materializedAt = sourceTarget;
-  }
-
-  @Override
-  public PCollection<S> filter(FilterFn<S> filterFn) {
-    return parallelDo(filterFn, getPType());
-  }
-
-  @Override
-  public PCollection<S> filter(String name, FilterFn<S> filterFn) {
-    return parallelDo(name, filterFn, getPType());
-  }
-
-  @Override
-  public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
-  public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
-  public PCollection<S> sort(boolean ascending) {
-    return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING);
-  }
-
-  @Override
-  public PTable<S, Long> count() {
-    return Aggregate.count(this);
-  }
-
-  @Override
-  public PCollection<S> max() {
-    return Aggregate.max(this);
-  }
-
-  @Override
-  public PCollection<S> min() {
-    return Aggregate.min(this);
-  }
-
-  @Override
-  public PCollection<S> sample(double acceptanceProbability) {
-    return Sample.sample(this, acceptanceProbability);
-  }
-
-  @Override
-  public PCollection<S> sample(double acceptanceProbability, long seed) {
-    return Sample.sample(this, seed, acceptanceProbability);
-  }
-
-  @Override
-  public PTypeFamily getTypeFamily() {
-    return getPType().getFamily();
-  }
-
-  public abstract DoNode createDoNode();
-
-  public abstract List<PCollectionImpl<?>> getParents();
-
-  public PCollectionImpl<?> getOnlyParent() {
-    List<PCollectionImpl<?>> parents = getParents();
-    if (parents.size() != 1) {
-      throw new IllegalArgumentException("Expected exactly one parent PCollection");
-    }
-    return parents.get(0);
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    if (pipeline == null) {
-      pipeline = (MRPipeline) getParents().get(0).getPipeline();
-    }
-    return pipeline;
-  }
-
-  public int getDepth() {
-    int parentMax = 0;
-    for (PCollectionImpl parent : getParents()) {
-      parentMax = Math.max(parent.getDepth(), parentMax);
-    }
-    return 1 + parentMax;
-  }
-
-  public interface Visitor {
-    void visitInputCollection(InputCollection<?> collection);
-
-    void visitUnionCollection(UnionCollection<?> collection);
-
-    void visitDoFnCollection(DoCollectionImpl<?> collection);
-
-    void visitDoTable(DoTableImpl<?, ?> collection);
-
-    void visitGroupedTable(PGroupedTableImpl<?, ?> collection);
-  }
-
-  public void accept(Visitor visitor) {
-    if (materializedAt != null) {
-      visitor.visitInputCollection(new InputCollection<S>(materializedAt,
-          (MRPipeline) getPipeline()));
-    } else {
-      acceptInternal(visitor);
-    }
-  }
-
-  protected abstract void acceptInternal(Visitor visitor);
-
-  @Override
-  public long getSize() {
-    if (materializedAt != null) {
-      long sz = materializedAt.getSize(getPipeline().getConfiguration());
-      if (sz > 0) {
-        return sz;
-      }
-    }
-    return getSizeInternal();
-  }
-
-  protected abstract long getSizeInternal();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/PGroupedTableImpl.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/PGroupedTableImpl.java
deleted file mode 100644
index 2b271b9..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.CombineFn;
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.GroupingOptions;
-import com.cloudera.crunch.PGroupedTable;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.types.PGroupedTableType;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-
-public class PGroupedTableImpl<K, V> extends
-    PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
-
-  private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class);
-  
-  private final PTableBase<K, V> parent;
-  private final GroupingOptions groupingOptions;
-  private final PGroupedTableType<K, V> ptype;
-
-  PGroupedTableImpl(PTableBase<K, V> parent) {
-    this(parent, null);
-  }
-
-  PGroupedTableImpl(PTableBase<K, V> parent, GroupingOptions groupingOptions) {
-    super("GBK");
-    this.parent = parent;
-    this.groupingOptions = groupingOptions;
-    this.ptype = parent.getPTableType().getGroupedTableType();
-  }
-
-  public void configureShuffle(Job job) {
-    ptype.configureShuffle(job, groupingOptions);
-    if (groupingOptions == null || groupingOptions.getNumReducers() <= 0) {
-      long bytesPerTask = job.getConfiguration().getLong("crunch.bytes.per.reduce.task",
-          (1000L * 1000L * 1000L));
-      int numReduceTasks = 1 + (int) (getSize() / bytesPerTask);
-      if (numReduceTasks > 0) {
-        job.setNumReduceTasks(numReduceTasks);
-        LOG.info(String.format("Setting num reduce tasks to %d", numReduceTasks));
-      } else {
-        LOG.warn("Attempted to set a negative number of reduce tasks");
-      }
-    }
-  }
-  
-  @Override
-  protected long getSizeInternal() {
-    return parent.getSizeInternal();
-  }
-  
-  @Override
-  public PType<Pair<K, Iterable<V>>> getPType() {
-    return ptype;
-  }
-
-  public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
-    return new DoTableImpl<K, V>("combine", this, combineFn,
-        parent.getPTableType());
-  }
-
-  private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
-    @Override
-    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
-      for (V v : input.second()) {
-        emitter.emit(Pair.of(input.first(), v));
-      }
-    }
-  }
-
-  public PTable<K, V> ungroup() {
-    return parallelDo("ungroup", new Ungroup<K, V>(), parent.getPTableType());
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitGroupedTable(this);
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> of(parent);
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(),
-        ptype.getInputMapFn(), ptype);
-  }
-
-  public DoNode getGroupingNode() {
-    return DoNode.createGroupingNode("", ptype);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/PTableBase.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/PTableBase.java
deleted file mode 100644
index 5a8ea92..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/PTableBase.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import com.cloudera.crunch.GroupingOptions;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.lib.Cogroup;
-import com.cloudera.crunch.lib.Join;
-import com.cloudera.crunch.lib.PTables;
-import com.cloudera.crunch.materialize.MaterializableMap;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.Lists;
-
-public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>>
-    implements PTable<K, V> {
-
-  public PTableBase(String name) {
-    super(name);
-  }
-
-  public PType<K> getKeyType() {
-    return getPTableType().getKeyType();
-  }
-  
-  public PType<V> getValueType() {
-    return getPTableType().getValueType();
-  }
-  
-  public PGroupedTableImpl<K, V> groupByKey() {
-    return new PGroupedTableImpl<K, V>(this);
-  }
-
-  public PGroupedTableImpl<K, V> groupByKey(int numReduceTasks) {
-    return new PGroupedTableImpl<K, V>(this,
-        GroupingOptions.builder().numReducers(numReduceTasks).build());
-  }
-  
-  public PGroupedTableImpl<K, V> groupByKey(GroupingOptions groupingOptions) {
-    return new PGroupedTableImpl<K, V>(this, groupingOptions);
-  }
-
-  @Override
-  public PTable<K, V> union(PTable<K, V>... others) {
-    List<PTableBase<K, V>> internal = Lists.newArrayList();
-    internal.add(this);
-    for (PTable<K, V> table : others) {
-      internal.add((PTableBase<K, V>) table);
-    }
-    return new UnionTable<K, V>(internal);
-  }
-  
-  @Override
-  public PTable<K, V> write(Target target) {
-    getPipeline().write(this, target);
-    return this;
-  }
-  
-  @Override
-  public PTable<K, V> top(int count) {
-	return Aggregate.top(this, count, true);
-  }
-
-  @Override
-  public PTable<K, V> bottom(int count) {
-	return Aggregate.top(this, count, false);
-  }
-  
-  @Override
-  public PTable<K, Collection<V>> collectValues() {
-	return Aggregate.collectValues(this);
-  }
-  
-  @Override
-  public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
-	return Join.join(this, other);
-  }
-  
-  @Override
-  public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
-	return Cogroup.cogroup(this, other);
-  }
-  
-  @Override
-  public PCollection<K> keys() {
-	return PTables.keys(this);
-  }
- 
-  @Override
-  public PCollection<V> values() {
-    return PTables.values(this);
-  }
-
-  /**
-   * Returns a Map<K, V> made up of the keys and values in this PTable.
-   */
-  public Map<K, V> materializeToMap() {
-    return new MaterializableMap<K, V>(this.materialize());
-  }
-
-}


Mime
View raw message