crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: Have MemPipeline write outputs to directories to be consistent with MRPipeline
Date Thu, 21 Jun 2012 15:31:15 GMT
Updated Branches:
  refs/heads/master e74fc23e1 -> 4b45e134f


Have MemPipeline write outputs to directories to be consistent with MRPipeline


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/4b45e134
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/4b45e134
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/4b45e134

Branch: refs/heads/master
Commit: 4b45e134fc28d3d2a83bb1db7c00487f724d54dc
Parents: e74fc23
Author: Josh Wills <jwills@cloudera.com>
Authored: Thu Jun 21 08:31:01 2012 -0700
Committer: Josh Wills <jwills@cloudera.com>
Committed: Thu Jun 21 08:31:01 2012 -0700

----------------------------------------------------------------------
 .../com/cloudera/crunch/impl/mem/MemPipeline.java  |    3 +-
 .../impl/mem/MemPipelineFileWritingTest.java       |   50 +++++++++++++++
 2 files changed, 52 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4b45e134/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java b/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java
index 7242ca5..2b21efc 100644
--- a/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java
+++ b/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java
@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -145,7 +146,7 @@ public class MemPipeline implements Pipeline {
       Path path = ((PathTarget) target).getPath();
       try {
         FileSystem fs = FileSystem.get(conf);
-        FSDataOutputStream os = fs.create(path);
+        FSDataOutputStream os = fs.create(new Path(path, "out"));
         if (collection instanceof PTable) {
           for (Object o : collection.materialize()) {
             Pair p = (Pair) o;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4b45e134/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java b/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java
new file mode 100644
index 0000000..9051fc1
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.crunch.impl.mem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.Pipeline;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.junit.Test;
+
+public class MemPipelineFileWritingTest {
+  @Test
+  public void testMemPipelineFileWriter() throws Exception {
+    File tmpDir = Files.createTempDir();
+    tmpDir.delete();
+    Pipeline p = MemPipeline.getInstance();
+    PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
+    p.writeTextFile(lines, tmpDir.getAbsolutePath());
+    p.done();
+    assertTrue(tmpDir.exists());
+    File[] files = tmpDir.listFiles();
+    assertTrue(files != null && files.length > 0);
+    for (File f : files) {
+      if (!f.getName().startsWith(".")) {
+        List<String> txt = Files.readLines(f, Charsets.UTF_8);
+        assertEquals(ImmutableList.of("hello", "world"), txt);
+      }
+    }
+  }
+}


Mime
View raw message