hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1197153 - in /incubator/hama/trunk/core/src: main/java/org/apache/hama/bsp/ test/java/org/apache/hama/bsp/ test/java/testjar/
Date Thu, 03 Nov 2011 14:37:45 GMT
Author: tjungblut
Date: Thu Nov  3 14:37:44 2011
New Revision: 1197153

URL: http://svn.apache.org/viewvc?rev=1197153&view=rev
Log:
Added a testcase and sequencefile outputs.

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileOutputFormat.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
  (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/IOSerializePrinting.java 
 (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java   (with props)
Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java
    incubator/hama/trunk/core/src/test/java/testjar/testjob.jar

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java?rev=1197153&r1=1197152&r2=1197153&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java Thu Nov
 3 14:37:44 2011
@@ -26,13 +26,13 @@ public interface OutputFormat<K, V> {
   /**
    * Get the {@link RecordWriter} for the given job.
    * 
-   * @param ignored
+   * @param fs
    * @param job configuration for the job whose output is being written.
    * @param name the unique name for this part of the output.
    * @return a {@link RecordWriter} to write the output for the job.
    * @throws IOException
    */
-  RecordWriter<K, V> getRecordWriter(FileSystem ignored, BSPJob job, String name)
+  RecordWriter<K, V> getRecordWriter(FileSystem fs, BSPJob job, String name)
       throws IOException;
 
   /**

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileOutputFormat.java?rev=1197153&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileOutputFormat.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileOutputFormat.java
Thu Nov  3 14:37:44 2011
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Writable;
+
+public class SequenceFileOutputFormat<K extends Writable, V extends Writable>
+    extends FileOutputFormat<K, V> {
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(FileSystem fs, BSPJob job,
+      String name) throws IOException {
+    try {
+      return new SequenceFileRecordWriter<K, V>(fs, job, name);
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java?rev=1197153&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
Thu Nov  3 14:37:44 2011
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Writable;
+
+public class SequenceFileRecordWriter<K extends Writable, V extends Writable>
+    implements RecordWriter<K, V> {
+
+  private Writer writer;
+
+  public SequenceFileRecordWriter(FileSystem fs, BSPJob job, String name)
+      throws IOException, ClassNotFoundException {
+    Configuration conf = job.getConf();
+    writer = new SequenceFile.Writer(fs, conf, new Path(
+        conf.get("bsp.output.dir"), name), conf.getClassByName(conf
+        .get("bsp.output.key.class")), conf.getClassByName(conf
+        .get("bsp.output.value.class")));
+  }
+
+  @Override 
+  public void write(K key, V value) throws IOException {
+    writer.append(key, value);
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/IOSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/IOSerializePrinting.java?rev=1197153&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/IOSerializePrinting.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/IOSerializePrinting.java Thu
Nov  3 14:37:44 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+public class IOSerializePrinting extends
+    BSP<NullWritable, NullWritable, LongWritable, Text> {
+
+  public static final Log LOG = LogFactory.getLog(IOSerializePrinting.class);
+  private final static int PRINT_INTERVAL = 1000;
+  private int num;
+
+  public void bsp(BSPPeer<NullWritable, NullWritable, LongWritable, Text> peer)
+      throws IOException, KeeperException, InterruptedException {
+
+    int i = 0;
+    for (String otherPeer : peer.getAllPeerNames()) {
+      String peerName = peer.getPeerName();
+      if (peerName.equals(otherPeer)) {
+        peer.write(new LongWritable(System.currentTimeMillis()), new Text(
+            "Hello BSP from " + (i + 1) + " of " + num + ": " + peerName));
+      }
+
+      Thread.sleep(PRINT_INTERVAL);
+      peer.sync();
+      i++;
+    }
+  }
+
+  public void setup(
+      org.apache.hama.bsp.BSPPeer<NullWritable, NullWritable, LongWritable, Text> peer)
+      throws IOException, KeeperException, InterruptedException {
+    num = Integer.parseInt(conf.get("bsp.peers.num"));
+
+  }
+}

Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/IOSerializePrinting.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1197153&r1=1197152&r2=1197153&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
(original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Thu Nov  3 14:37:44 2011
@@ -91,6 +91,7 @@ public class TestBSPMasterGroomServer ex
           .indexOf("Hello BSP from") >= 0);
       reader.close();
     }
+    fileSys.delete(new Path(TMP_OUTPUT), true);
   }
 
   public void tearDown() throws Exception {

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1197153&r1=1197152&r2=1197153&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Thu Nov
 3 14:37:44 2011
@@ -17,16 +17,13 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.HamaConfiguration;
 
@@ -39,6 +36,7 @@ public class TestCheckpoint extends Test
   public void testCheckpoint() throws Exception {
     Configuration config = new HamaConfiguration();
     FileSystem dfs = FileSystem.get(config);
+    @SuppressWarnings("rawtypes")
     BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
     assertNotNull("BSPPeerImpl should not be null.", bspTask);
     if(dfs.mkdirs(new Path("checkpoint"))) {

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java?rev=1197153&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java Thu Nov  3
14:37:44 2011
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+public class TestIOJob extends HamaCluster {
+
+  private static Log LOG = LogFactory.getLog(TestIOJob.class);
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+  private HamaConfiguration configuration;
+
+  public TestIOJob() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.set("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    configuration.set("hama.sync.client.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public void testOutputJob() throws Exception {
+    BSPJob bsp = new BSPJob(configuration);
+    bsp.setJobName("Test Serialize Printing with Output");
+    bsp.setBspClass(IOSerializePrinting.class);
+
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(configuration);
+    configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    assertEquals(this.numOfGroom, cluster.getGroomServers());
+    bsp.setNumBspTask(2);
+    bsp.setInputFormat(NullInputFormat.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(LongWritable.class);
+    bsp.setOutputValueClass(Text.class);
+    bsp.setOutputPath(new Path(TMP_OUTPUT));
+
+    FileSystem fileSys = FileSystem.get(conf);
+
+    if (bsp.waitForCompletion(true)) {
+      TestBSPMasterGroomServer.checkOutput(fileSys, conf, 2);
+    }
+    LOG.info("Client finishes execution job.");
+  }
+
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+}

Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java?rev=1197153&r1=1197152&r2=1197153&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java (original)
+++ incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java Thu Nov  3
14:37:44 2011
@@ -62,7 +62,7 @@ public class ClassSerializePrinting {
 
     private void writeLogToFile(String string, int i) throws IOException {
       SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
-          new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+          new Path(TMP_OUTPUT,i+""), LongWritable.class, Text.class,
           CompressionType.NONE);
       writer.append(new LongWritable(System.currentTimeMillis()), new Text(
           "Hello BSP from " + (i + 1) + " of " + num + ": " + string));

Modified: incubator/hama/trunk/core/src/test/java/testjar/testjob.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/testjar/testjob.jar?rev=1197153&r1=1197152&r2=1197153&view=diff
==============================================================================
Binary files - no diff available.



Mime
View raw message