crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [3/5] git commit: Use the distributed cache for map side joins
Date Fri, 06 Jul 2012 16:43:44 GMT
Use the distributed cache for map side joins


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f70a6df8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f70a6df8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f70a6df8

Branch: refs/heads/master
Commit: f70a6df8b2fe48efa14b432fddf731c3e8d94d86
Parents: b13bd4f
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Mon Jul 2 14:06:31 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Fri Jul 6 17:56:57 2012 +0200

----------------------------------------------------------------------
 .../cloudera/crunch/io/avro/AvroFileSource.java    |    3 +-
 .../com/cloudera/crunch/io/seq/SeqFileSource.java  |   12 ++--
 .../cloudera/crunch/io/seq/SeqFileTableSource.java |    2 +-
 .../cloudera/crunch/io/text/TextFileSource.java    |    4 +-
 .../com/cloudera/crunch/lib/join/MapsideJoin.java  |   45 +++++++++++----
 5 files changed, 43 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java b/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
index 1122d62..4debfeb 100644
--- a/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
+++ b/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
@@ -45,7 +45,8 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements
ReadableSour
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    return CompositePathIterable.create(FileSystem.get(conf), path, new AvroFileReaderFactory<T>(
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>(
         (AvroType<T>) ptype, conf));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java b/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
index 24dec2d..462ef93 100644
--- a/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
+++ b/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
@@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource;
 import com.cloudera.crunch.io.impl.FileSourceImpl;
 import com.cloudera.crunch.types.PType;
 
-public class SeqFileSource<T> extends FileSourceImpl<T> implements
-	ReadableSource<T> {
+public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T>
{
 
   public SeqFileSource(Path path, PType<T> ptype) {
-	super(path, ptype, SequenceFileInputFormat.class);
+    super(path, ptype, SequenceFileInputFormat.class);
   }
-  
+
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-	FileSystem fs = FileSystem.get(conf);
-	return CompositePathIterable.create(fs, path, 
-	    new SeqFileReaderFactory<T>(ptype, conf));
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype,
conf));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java b/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
index 69ca12b..4db6658 100644
--- a/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
+++ b/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
@@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K,
V> implemen
 
   @Override
   public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
     return CompositePathIterable.create(fs, path, 
         new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java b/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
index a876843..e0dbe68 100644
--- a/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
+++ b/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
@@ -67,7 +67,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements
   
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-	return CompositePathIterable.create(FileSystem.get(conf), path,
-	    new TextFileReaderFactory<T>(ptype, conf));
+    return CompositePathIterable.create(FileSystem.get(path.toUri(), conf), path,
+        new TextFileReaderFactory<T>(ptype, conf));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java b/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
index 958b010..8072e07 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
@@ -2,6 +2,8 @@ package com.cloudera.crunch.lib.join;
 
 import java.io.IOException;
 
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.cloudera.crunch.DoFn;
@@ -53,21 +55,25 @@ public class MapsideJoin {
     MRPipeline pipeline = (MRPipeline) right.getPipeline();
     pipeline.materialize(right);
 
-    // TODO Make this method internal to MRPipeline so that we don't run once
-    // for every separate MapsideJoin at the same level
+    // TODO Move necessary logic to MRPipeline so that we can theoretically
+    // optimize his by running the setup of multiple map-side joins concurrently
     pipeline.run();
 
-    // TODO Verify that this cast is safe -- are there any situations where this
-    // wouldn't work?
-    SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K,
V>>) pipeline
+    ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
         .getMaterializeSourceTarget(right);
+    if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
+      throw new CrunchRuntimeException("Right-side contents can't be read from a path");
+    }
 
-    // TODO Put the data in the distributed cache
+    // Suppress warnings because we've just checked this cast via instanceof
+    @SuppressWarnings("unchecked")
+    SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K,
V>>) readableSourceTarget;
 
     Path path = sourcePathTarget.getPath();
-    PType<Pair<K, V>> pType = right.getPType();
+    DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration());
 
-    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(),
pType);
+    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(),
+        right.getPType());
     PTypeFamily typeFamily = left.getTypeFamily();
     return left.parallelDo(
         "mapjoin",
@@ -79,21 +85,36 @@ public class MapsideJoin {
 
   static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K,
Pair<U, V>>> {
 
-    private String path;
+    private String inputPath;
     private PType<Pair<K, V>> ptype;
     private Multimap<K, V> joinMap;
 
-    public MapsideJoinDoFn(String path, PType<Pair<K, V>> ptype) {
-      this.path = path;
+    public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
+      this.inputPath = inputPath;
       this.ptype = ptype;
     }
 
+    private Path getCacheFilePath() {
+      try {
+        for (Path localPath : DistributedCache.getLocalCacheFiles(getConfiguration())) {
+          if (localPath.toString().endsWith(inputPath)) {
+            return localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
+
+          }
+        }
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
+      }
+
+      throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath +
"'");
+    }
+
     @Override
     public void initialize() {
       super.initialize();
 
       ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K,
V>>) ptype
-          .getDefaultFileSource(new Path(path));
+          .getDefaultFileSource(getCacheFilePath());
       Iterable<Pair<K, V>> iterable = null;
       try {
         iterable = sourceTarget.read(getConfiguration());


Mime
View raw message