crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [4/5] git commit: Initial implementation of map side joins
Date Fri, 06 Jul 2012 16:43:44 GMT
Initial implementation of map side joins


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b13bd4f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b13bd4f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b13bd4f4

Branch: refs/heads/master
Commit: b13bd4f416dcff62fd006a8104df252a406262d0
Parents: 8b64b84
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Mon Jul 2 11:37:00 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Fri Jul 6 17:56:57 2012 +0200

----------------------------------------------------------------------
 .../com/cloudera/crunch/lib/join/MapsideJoin.java  |  122 +++++++++++++++
 .../cloudera/crunch/lib/join/MapsideJoinTest.java  |  102 ++++++++++++
 src/test/resources/customers.txt                   |    4 +
 src/test/resources/orders.txt                      |    4 +
 4 files changed, 232 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b13bd4f4/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java b/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
new file mode 100644
index 0000000..958b010
--- /dev/null
+++ b/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
@@ -0,0 +1,122 @@
+package com.cloudera.crunch.lib.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.PTable;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
+import com.cloudera.crunch.io.ReadableSourceTarget;
+import com.cloudera.crunch.io.impl.SourcePathTargetImpl;
+import com.cloudera.crunch.types.PType;
+import com.cloudera.crunch.types.PTypeFamily;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Utility for doing map side joins on a common key between two {@link PTable}s.
+ * <p>
+ * A map side join is an optimized join which doesn't use a reducer; instead,
+ * the right side of the join is loaded into memory and the join is performed in
+ * a mapper. This style of join has the important implication that the output of
+ * the join is not sorted, which is the case with a conventional (reducer-based)
+ * join.
+ * <p>
+ * <b>Note:</b>This utility is only supported when running with a
+ * {@link MRPipeline} as the pipeline.
+ */
+public class MapsideJoin {
+
+  /**
+   * Join two tables using a map side join. The right-side table will be loaded
+   * fully in memory, so this method should only be used if the right side
+   * table's contents can fit in the memory allocated to mappers. The join
+   * performed by this method is an inner join.
+   * 
+   * @param left
+   *          The left-side table of the join
+   * @param right
+   *          The right-side table of the join, whose contents will be fully
+   *          read into memory
+   * @return A table keyed on the join key, containing pairs of joined values
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U>
left, PTable<K, V> right) {
+
+    if (!(right.getPipeline() instanceof MRPipeline)) {
+      throw new CrunchRuntimeException("Map-side join is only supported within a MapReduce
context");
+    }
+
+    MRPipeline pipeline = (MRPipeline) right.getPipeline();
+    pipeline.materialize(right);
+
+    // TODO Make this method internal to MRPipeline so that we don't run once
+    // for every separate MapsideJoin at the same level
+    pipeline.run();
+
+    // TODO Verify that this cast is safe -- are there any situations where this
+    // wouldn't work?
+    SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K,
V>>) pipeline
+        .getMaterializeSourceTarget(right);
+
+    // TODO Put the data in the distributed cache
+
+    Path path = sourcePathTarget.getPath();
+    PType<Pair<K, V>> pType = right.getPType();
+
+    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(),
pType);
+    PTypeFamily typeFamily = left.getTypeFamily();
+    return left.parallelDo(
+        "mapjoin",
+        mapJoinDoFn,
+        typeFamily.tableOf(left.getKeyType(),
+            typeFamily.pairs(left.getValueType(), right.getValueType())));
+
+  }
+
+  static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K,
Pair<U, V>>> {
+
+    private String path;
+    private PType<Pair<K, V>> ptype;
+    private Multimap<K, V> joinMap;
+
+    public MapsideJoinDoFn(String path, PType<Pair<K, V>> ptype) {
+      this.path = path;
+      this.ptype = ptype;
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+
+      ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K,
V>>) ptype
+          .getDefaultFileSource(new Path(path));
+      Iterable<Pair<K, V>> iterable = null;
+      try {
+        iterable = sourceTarget.read(getConfiguration());
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
+      }
+
+      joinMap = ArrayListMultimap.create();
+      for (Pair<K, V> joinPair : iterable) {
+        joinMap.put(joinPair.first(), joinPair.second());
+      }
+    }
+
+    @Override
+    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>>
emitter) {
+      K key = input.first();
+      U value = input.second();
+      for (V joinValue : joinMap.get(key)) {
+        Pair<U, V> valuePair = Pair.of(value, joinValue);
+        emitter.emit(Pair.of(key, valuePair));
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b13bd4f4/src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
new file mode 100644
index 0000000..97e0c63
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
@@ -0,0 +1,102 @@
+package com.cloudera.crunch.lib.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.FilterFn;
+import com.cloudera.crunch.MapFn;
+import com.cloudera.crunch.PTable;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.impl.mem.MemPipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
+import com.cloudera.crunch.test.FileHelper;
+import com.cloudera.crunch.types.writable.Writables;
+import com.google.common.collect.Lists;
+
+public class MapsideJoinTest {
+
+  private static class LineSplitter extends MapFn<String, Pair<Integer, String>>
{
+
+    @Override
+    public Pair<Integer, String> map(String input) {
+      String[] fields = input.split("\\|");
+      return Pair.of(Integer.parseInt(fields[0]), fields[1]);
+    }
+
+  }
+
+  private static class NegativeFilter extends FilterFn<Pair<Integer, String>>
{
+
+    @Override
+    public boolean accept(Pair<Integer, String> input) {
+      return false;
+    }
+
+  }
+
+  @Test(expected = CrunchRuntimeException.class)
+  public void testNonMapReducePipeline() {
+    runMapsideJoin(MemPipeline.getInstance());
+  }
+
+  @Test
+  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
+    MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class);
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> filteredOrderTable = orderTable.parallelDo(new NegativeFilter(),
+        orderTable.getPTableType());
+
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable,
+        filteredOrderTable);
+
+    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined
+        .materialize());
+
+    assertTrue(materializedJoin.isEmpty());
+
+  }
+
+  @Test
+  public void testMapsideJoin() throws IOException {
+    runMapsideJoin(new MRPipeline(MapsideJoinTest.class));
+  }
+
+  private void runMapsideJoin(Pipeline pipeline) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable,
orderTable);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush")));
+
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined
+        .materialize());
+    Collections.sort(joinedResultList);
+
+    assertEquals(expectedJoinResult, joinedResultList);
+  }
+
+  private static PTable<Integer, String> readTable(Pipeline pipeline, String filename)
{
+    try {
+      return pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable",
+          new LineSplitter(), Writables.tableOf(Writables.ints(), Writables.strings()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b13bd4f4/src/test/resources/customers.txt
----------------------------------------------------------------------
diff --git a/src/test/resources/customers.txt b/src/test/resources/customers.txt
new file mode 100644
index 0000000..98f3f3d
--- /dev/null
+++ b/src/test/resources/customers.txt
@@ -0,0 +1,4 @@
+111|John Doe
+222|Jane Doe
+333|Someone Else
+444|Has No Orders
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b13bd4f4/src/test/resources/orders.txt
----------------------------------------------------------------------
diff --git a/src/test/resources/orders.txt b/src/test/resources/orders.txt
new file mode 100644
index 0000000..2f1383f
--- /dev/null
+++ b/src/test/resources/orders.txt
@@ -0,0 +1,4 @@
+222|Toilet plunger
+333|Toilet brush
+222|Toilet paper
+111|Corn flakes
\ No newline at end of file


Mime
View raw message