hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1197057 - in /incubator/hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/sync/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/testjar/ examples/...
Date Thu, 03 Nov 2011 10:31:11 GMT
Author: tjungblut
Date: Thu Nov  3 10:31:10 2011
New Revision: 1197057

URL: http://svn.apache.org/viewvc?rev=1197057&view=rev
Log:
Improved API and refactoring.

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java   (with props)
Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
    incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java
    incubator/hama/trunk/core/src/test/java/testjar/testjob.jar
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Thu Nov  3 10:31:10 2011
@@ -25,7 +25,8 @@ import org.apache.zookeeper.KeeperExcept
 /**
  * This class provides an abstract implementation of the BSP interface.
  */
-public abstract class BSP<K1, V1, K2, V2> implements BSPInterface<K1, V1, K2, V2> {
+public abstract class BSP<K1, V1, K2, V2> implements
+    BSPInterface<K1, V1, K2, V2> {
 
   protected Configuration conf;
 
@@ -35,9 +36,8 @@ public abstract class BSP<K1, V1, K2, V2
    * 
    * @param peer Your BSPPeer instance.
    */
-  public abstract void bsp(BSPPeer peer, RecordReader<K1, V1> input,
-      OutputCollector<K2, V2> output) throws IOException, KeeperException,
-      InterruptedException;
+  public abstract void bsp(BSPPeer<K1, V1, K2, V2> peer) throws IOException,
+      KeeperException, InterruptedException;
 
   /**
    * This method is called before the BSP method. It can be used for setup
@@ -45,8 +45,10 @@ public abstract class BSP<K1, V1, K2, V2
    * 
    * @param peer Your BSPPeer instance.
    */
-  public abstract void setup(BSPPeer peer) throws IOException, KeeperException,
-      InterruptedException;
+  public void setup(BSPPeer<K1, V1, K2, V2> peer) throws IOException,
+      KeeperException, InterruptedException {
+
+  }
 
   /**
    * This method is called after the BSP method. It can be used for cleanup
@@ -55,7 +57,9 @@ public abstract class BSP<K1, V1, K2, V2
    * 
    * @param peer Your BSPPeer instance.
    */
-  public abstract void cleanup(BSPPeer peer);
+  public void cleanup(BSPPeer<K1, V1, K2, V2> peer) {
+
+  }
 
   /**
    * Returns the configuration of this BSP Job.

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java Thu Nov  3 10:31:10 2011
@@ -17,10 +17,7 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configurable;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Interface BSP defines the basic operations needed to implement the BSP
@@ -28,18 +25,4 @@ import org.apache.zookeeper.KeeperExcept
  */
 public interface BSPInterface<K1, V1, K2, V2> extends Configurable {
 
-  /**
-   * A user defined function for programming in the BSP style.
-   * 
-   * Applications can use the {@link org.apache.hama.bsp.BSPPeer} to handle the
-   * communication and synchronization between processors.
-   * 
-   * @param bspPeer
-   * @throws IOException
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  public void bsp(BSPPeer bspPeer, RecordReader<K1, V1> input,
-      OutputCollector<K2, V2> output) throws IOException, KeeperException,
-      InterruptedException;
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Thu Nov  3 10:31:10 2011
@@ -98,14 +98,14 @@ public class BSPJob extends BSPJobContex
    * @param cls
    * @throws IllegalStateException
    */
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings("rawtypes")
   public void setBspClass(Class<? extends BSP> cls)
       throws IllegalStateException {
     ensureState(JobState.DEFINE);
     conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   public Class<? extends BSP> getBspClass() {
     return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);
   }
@@ -233,13 +233,14 @@ public class BSPJob extends BSPJobContex
     return conf.getInt("bsp.peers.num", 0);
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "rawtypes" })
   public InputFormat getInputFormat() {
     return (InputFormat) ReflectionUtils.newInstance(conf.getClass(
-        "bsp.input.format.class", TextInputFormat.class, InputFormat.class), conf);
+        "bsp.input.format.class", TextInputFormat.class, InputFormat.class),
+        conf);
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "rawtypes" })
   public void setInputFormat(Class<? extends InputFormat> cls) {
     conf.setClass("bsp.input.format.class", cls, InputFormat.class);
   }
@@ -281,15 +282,34 @@ public class BSPJob extends BSPJobContex
     conf.setClass("bsp.output.value.class", theClass, Object.class);
   }
 
-  @SuppressWarnings("unchecked")
+  /**
+   * Sets the output path for the job.
+   * 
+   * @param path where the output gets written.
+   */
+  public void setOutputPath(Path path) {
+    conf.set("bsp.output.dir", path.toString());
+  }
+  
+  /**
+   * Sets the input path for the job.
+   * 
+   * @param path where the output gets written.
+   */
+  public void setInputPath(Path path) {
+    conf.set("bsp.input.dir", path.toString());
+  }
+
+  @SuppressWarnings("rawtypes")
   public void setOutputFormat(Class<? extends OutputFormat> theClass) {
     conf.setClass("bsp.output.format.class", theClass, OutputFormat.class);
   }
-  
-  @SuppressWarnings("unchecked")
+
+  @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {
     return (OutputFormat) ReflectionUtils.newInstance(conf.getClass(
         "bsp.output.format.class", TextOutputFormat.class, OutputFormat.class),
         conf);
   }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Nov  3 10:31:10 2011
@@ -210,8 +210,8 @@ public class BSPJobClient extends Config
     if (masterAdress != null && !masterAdress.equals("local")) {
       this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
           JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
-          BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf,
-              JobSubmissionProtocol.class));
+          BSPMaster.getAddress(conf), conf,
+          NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
     } else {
       LOG.debug("Using local BSP runner.");
       this.jobSubmitClient = new LocalBSPRunner(conf);
@@ -680,8 +680,9 @@ public class BSPJobClient extends Config
         System.out.println("Job name: " + job.getJobName());
         System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
             + "\tFailed : 3\tPrep : 4\n");
-        System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), jobStatus
-            .getRunState(), jobStatus.getStartTime(), jobStatus.getUsername());
+        System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(),
+            jobStatus.getRunState(), jobStatus.getStartTime(),
+            jobStatus.getUsername());
 
         exitCode = 0;
       }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Nov  3 10:31:10 2011
@@ -22,11 +22,13 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.Constants;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.util.KeyValuePair;
 
 /**
  * BSP communication interface.
  */
-public interface BSPPeer extends HamaRPCProtocolVersion, Constants {
+public interface BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
+    HamaRPCProtocolVersion, Constants {
 
   /**
    * Send a data with a tag to another BSPSlave corresponding to hostname.
@@ -72,7 +74,7 @@ public interface BSPPeer extends HamaRPC
    * Sends all the messages in the outgoing message queues to the corresponding
    * remote peers.
    * 
-   * @throws InterruptedException 
+   * @throws InterruptedException
    */
   public void sync() throws InterruptedException;
 
@@ -91,7 +93,7 @@ public interface BSPPeer extends HamaRPC
    * @return the name of n-th peer from sorted array by name.
    */
   public String getPeerName(int index);
-  
+
   /**
    * @return the names of all the peers executing tasks from the same job
    *         (including this peer).
@@ -102,13 +104,40 @@ public interface BSPPeer extends HamaRPC
    * @return the number of peers
    */
   public int getNumPeers();
-  
+
   /**
    * Clears all queues entries.
    */
   public void clear();
 
   /**
+   * Writes a key/value pair to the output collector.
+   * 
+   * @param key your key object
+   * @param value your value object
+   * @throws IOException
+   */
+  public void write(KEYOUT key, VALUEOUT value) throws IOException;
+
+  /**
+   * Deserializes the next input key value into the given objects.
+   * 
+   * @param key
+   * @param value
+   * @return false if there are no records to read anymore
+   * @throws IOException
+   */
+  public boolean readNext(KEYIN key, VALUEIN value) throws IOException;
+
+  /**
+   * Reads the next key value pair and returns it as a pair.
+   * 
+   * @return null if there are no records left.
+   * @throws IOException
+   */
+  public KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException;
+
+  /**
    * @return the jobs configuration
    */
   public Configuration getConfiguration();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Nov  3 10:31:10 2011
@@ -28,9 +28,11 @@ import java.util.concurrent.ConcurrentLi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -38,11 +40,13 @@ import org.apache.hama.Constants;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.util.KeyValuePair;
 
 /**
  * This class represents a BSP peer.
  */
-public class BSPPeerImpl implements BSPPeer {
+public class BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
+    BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
   public static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
@@ -52,7 +56,7 @@ public class BSPPeerImpl implements BSPP
 
   private volatile Server server = null;
 
-  private final Map<InetSocketAddress, BSPPeer> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer>();
+  private final Map<InetSocketAddress, BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>();
   private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
   private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
   private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
@@ -66,8 +70,17 @@ public class BSPPeerImpl implements BSPP
 
   private String[] allPeers;
 
+  // SYNC
   private SyncClient syncClient;
 
+  // IO
+  private int partition;
+  private String splitClass;
+  private BytesWritable split;
+  private OutputCollector<KEYOUT, VALUEOUT> collector;
+  private RecordReader<KEYIN, VALUEIN> in;
+  private RecordWriter<KEYOUT, VALUEOUT> outWriter;
+
   /**
    * Protected default constructor for LocalBSPRunner.
    */
@@ -78,8 +91,9 @@ public class BSPPeerImpl implements BSPP
 
   /**
    * For unit test.
-   * @param conf is the configuration file. 
-   * @param dfs is the Hadoop FileSystem. 
+   * 
+   * @param conf is the configuration file.
+   * @param dfs is the Hadoop FileSystem.
    */
   protected BSPPeerImpl(final Configuration conf, FileSystem dfs) {
     this.conf = conf;
@@ -97,11 +111,16 @@ public class BSPPeerImpl implements BSPP
    * @throws Exception
    */
   public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
-      BSPPeerProtocol umbilical) throws Exception {
+      BSPPeerProtocol umbilical, int partition, String splitClass,
+      BytesWritable split) throws Exception {
     this.conf = conf;
     this.taskId = taskId;
     this.umbilical = umbilical;
     this.bspJob = job;
+    // IO
+    this.partition = partition;
+    this.splitClass = splitClass;
+    this.split = split;
 
     FileSystem fs = null;
     if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
@@ -126,6 +145,7 @@ public class BSPPeerImpl implements BSPP
         TaskStatus.Phase.STARTING));
   }
 
+  @SuppressWarnings("unchecked")
   public void initialize() throws Exception {
     try {
       if (LOG.isDebugEnabled())
@@ -142,6 +162,40 @@ public class BSPPeerImpl implements BSPP
     syncClient = SyncServiceFactory.getSyncClient(conf);
     syncClient.init(conf, taskId.getJobID(), taskId);
 
+    InputSplit inputSplit = null;
+    // reinstantiate the split
+    try {
+      inputSplit = (InputSplit) ReflectionUtils.newInstance(getConfiguration()
+          .getClassByName(splitClass), getConfiguration());
+    } catch (ClassNotFoundException exp) {
+      IOException wrap = new IOException("Split class " + splitClass
+          + " not found");
+      wrap.initCause(exp);
+      throw wrap;
+    }
+
+    DataInputBuffer splitBuffer = new DataInputBuffer();
+    splitBuffer.reset(split.getBytes(), 0, split.getLength());
+    inputSplit.readFields(splitBuffer);
+
+    in = bspJob.getInputFormat().getRecordReader(inputSplit, bspJob);
+
+    // just output something when the user configured it
+    if (conf.get("bsp.output.dir") != null) {
+      Path outdir = new Path(conf.get("bsp.output.dir"),
+          Task.getOutputName(partition));
+
+      outWriter = bspJob.getOutputFormat().getRecordWriter(dfs, bspJob,
+          outdir.makeQualified(dfs).toString());
+      final RecordWriter<KEYOUT, VALUEOUT> finalOut = outWriter;
+
+      collector = new OutputCollector<KEYOUT, VALUEOUT>() {
+        public void collect(KEYOUT key, VALUEOUT value) throws IOException {
+          finalOut.write(key, value);
+        }
+      };
+    }
+
   }
 
   @Override
@@ -156,22 +210,22 @@ public class BSPPeerImpl implements BSPP
    */
   @Override
   public void send(String peerName, BSPMessage msg) throws IOException {
-      LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
-      InetSocketAddress targetPeerAddress = null;
-      // Get socket for target peer.
-      if (peerSocketCache.containsKey(peerName)) {
-        targetPeerAddress = peerSocketCache.get(peerName);
-      } else {
-        targetPeerAddress = getAddress(peerName);
-        peerSocketCache.put(peerName, targetPeerAddress);
-      }
-      ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
-          .get(targetPeerAddress);
-      if (queue == null) {
-        queue = new ConcurrentLinkedQueue<BSPMessage>();
-      }
-      queue.add(msg);
-      outgoingQueues.put(targetPeerAddress, queue);
+    LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+        .get(targetPeerAddress);
+    if (queue == null) {
+      queue = new ConcurrentLinkedQueue<BSPMessage>();
+    }
+    queue.add(msg);
+    outgoingQueues.put(targetPeerAddress, queue);
   }
 
   private String checkpointedPath() {
@@ -188,12 +242,15 @@ public class BSPPeerImpl implements BSPP
     try {
       out = this.dfs.create(new Path(checkpointedPath));
       bundle.write(out);
-    } catch(IOException ioe) {
-      LOG.warn("Fail checkpointing messages to "+checkpointedPath, ioe);
-    } finally { 
-      try { if(null != out) out.close(); } catch(IOException e) {
-        LOG.warn("Fail to close dfs output stream while checkpointing.", e); 
-      } 
+    } catch (IOException ioe) {
+      LOG.warn("Fail checkpointing messages to " + checkpointedPath, ioe);
+    } finally {
+      try {
+        if (null != out)
+          out.close();
+      } catch (IOException e) {
+        LOG.warn("Fail to close dfs output stream while checkpointing.", e);
+      }
     }
   }
 
@@ -212,13 +269,15 @@ public class BSPPeerImpl implements BSPP
         Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
             .next();
 
-        BSPPeer peer = getBSPPeerConnection(entry.getKey());
+        BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> peer = getBSPPeerConnection(entry
+            .getKey());
         Iterable<BSPMessage> messages = entry.getValue();
         BSPMessageBundle bundle = new BSPMessageBundle();
 
-        if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(Combiner.class)) {
+        if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
+            Combiner.class)) {
           Combiner combiner = (Combiner) ReflectionUtils.newInstance(
-            conf.getClass("bsp.combiner.class", Combiner.class), conf);
+              conf.getClass("bsp.combiner.class", Combiner.class), conf);
 
           bundle = combiner.combine(messages);
         } else {
@@ -253,7 +312,6 @@ public class BSPPeerImpl implements BSPP
       LOG.fatal(
           "Caught exception during superstep "
               + currentTaskStatus.getSuperstepCount() + "!", e);
-      // throw new RuntimeException(e);
     }
   }
 
@@ -273,10 +331,13 @@ public class BSPPeerImpl implements BSPP
   }
 
   public void close() throws Exception {
+    in.close();
+    outWriter.close();
     this.clear();
     syncClient.close();
-    if (server != null)
+    if (server != null) {
       server.stop();
+    }
   }
 
   @Override
@@ -296,14 +357,15 @@ public class BSPPeerImpl implements BSPP
     return BSPPeer.versionID;
   }
 
-  protected BSPPeer getBSPPeerConnection(InetSocketAddress addr)
-      throws NullPointerException, IOException {
-    BSPPeer peer;
+  @SuppressWarnings("unchecked")
+  protected BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> getBSPPeerConnection(
+      InetSocketAddress addr) throws NullPointerException, IOException {
+    BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> peer;
     peer = peers.get(addr);
     if (peer == null) {
       synchronized (this.peers) {
-        peer = (BSPPeer) RPC.getProxy(BSPPeer.class, BSPPeer.versionID, addr,
-            this.conf);
+        peer = (BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>) RPC.getProxy(
+            BSPPeer.class, BSPPeer.versionID, addr, this.conf);
         this.peers.put(addr, peer);
       }
     }
@@ -411,4 +473,30 @@ public class BSPPeerImpl implements BSPP
   public void clearOutgoingQueues() {
     this.outgoingQueues.clear();
   }
+
+  /*
+   * IO STUFF
+   */
+
+  @Override
+  public void write(KEYOUT key, VALUEOUT value) throws IOException {
+    collector.collect(key, value);
+  }
+
+  @Override
+  public boolean readNext(KEYIN key, VALUEIN value) throws IOException {
+    return in.next(key, value);
+  }
+
+  @Override
+  public KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException {
+    KEYIN k = in.createKey();
+    VALUEIN v = in.createValue();
+    if (in.next(k, v)) {
+      return new KeyValuePair<KEYIN, VALUEIN>(k, v);
+    } else {
+      return null;
+    }
+  }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Thu Nov  3 10:31:10 2011
@@ -23,9 +23,7 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -39,8 +37,8 @@ public class BSPTask extends Task {
   public static final Log LOG = LogFactory.getLog(BSPTask.class);
 
   private BSPJob conf;
-  private BytesWritable split = new BytesWritable();
-  private String splitClass;
+  BytesWritable split = new BytesWritable();
+  String splitClass;
 
   public BSPTask() {
   }
@@ -62,65 +60,33 @@ public class BSPTask extends Task {
   }
 
   @Override
-  public void run(BSPJob job, BSPPeerImpl bspPeer, BSPPeerProtocol umbilical)
-      throws IOException {
+  public void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?> bspPeer,
+      BSPPeerProtocol umbilical) throws IOException {
 
     try {
       runBSP(job, bspPeer, split, umbilical);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Exception during BSP execution!", e);
     } catch (ClassNotFoundException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Exception during instantiation of BSP class!", e);
     }
 
     done(umbilical);
   }
 
   @SuppressWarnings("unchecked")
-  private <INK, INV, OUTK, OUTV> void runBSP(final BSPJob job, BSPPeerImpl bspPeer,
+  private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runBSP(final BSPJob job,
+      BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bspPeer,
       final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
       throws IOException, InterruptedException, ClassNotFoundException {
-    InputSplit inputSplit = null;
-    // reinstantiate the split
-    try {
-      inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getConf()
-          .getClassByName(splitClass), job.getConf());
-    } catch (ClassNotFoundException exp) {
-      IOException wrap = new IOException("Split class " + splitClass
-          + " not found");
-      wrap.initCause(exp);
-      throw wrap;
-    }
 
-    DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(split.getBytes(), 0, split.getLength());
-    inputSplit.readFields(splitBuffer);
-
-    RecordReader<INK, INV> in = job.getInputFormat().getRecordReader(
-        inputSplit, job);
-    
-    FileSystem fs = FileSystem.get(job.getConf());
-    String finalName = getOutputName(getPartition());
-    
-    final RecordWriter<OUTK, OUTV> out = 
-      job.getOutputFormat().getRecordWriter(fs, job, finalName);
-    
-    OutputCollector<OUTK,OUTV> collector = 
-      new OutputCollector<OUTK,OUTV>() {
-        public void collect(OUTK key, OUTV value)
-          throws IOException {
-          out.write(key, value);
-        }
-      };
-      
-    BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
-        "bsp.work.class", BSP.class), job.getConf());
+    BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT>) ReflectionUtils
+        .newInstance(job.getConf().getClass("bsp.work.class", BSP.class),
+            job.getConf());
 
     try {
       bsp.setup(bspPeer);
-      bsp.bsp(bspPeer, in, collector);
+      bsp.bsp(bspPeer);
     } catch (IOException e) {
       LOG.error("Exception during BSP execution!", e);
     } catch (KeeperException e) {
@@ -129,7 +95,11 @@ public class BSPTask extends Task {
       LOG.error("Exception during BSP execution!", e);
     } finally {
       bsp.cleanup(bspPeer);
-      out.close();
+      try {
+        bspPeer.close();
+      } catch (Exception e) {
+        LOG.fatal("Exception during BSP closing!", e);
+      }
     }
 
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.DataInput;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Thu Nov  3 10:31:10 2011
@@ -143,7 +143,7 @@ public class GroomServer implements Runn
 
       if (actions != null) {
         LOG.info("Launch " + actions.length + " tasks.");
-        
+
         assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
         int i = 0;
 
@@ -410,7 +410,8 @@ public class GroomServer implements Runn
             if (!tip.runner.isAlive()) {
               if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
                 taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-                LOG.info("Task '" + taskStatus.getTaskId().toString() + "' has completed.");
+                LOG.info("Task '" + taskStatus.getTaskId().toString()
+                    + "' has completed.");
               }
               taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
             }
@@ -418,7 +419,7 @@ public class GroomServer implements Runn
 
           taskStatuses.add(taskStatus);
         }
-        
+
         doReport(taskStatuses);
         Thread.sleep(REPORT_INTERVAL);
       } catch (InterruptedException ie) {
@@ -461,7 +462,7 @@ public class GroomServer implements Runn
     } catch (IOException e1) {
       LOG.error(e1);
     }
-    
+
     TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
 
     synchronized (this) {
@@ -501,7 +502,7 @@ public class GroomServer implements Runn
   public List<TaskStatus> updateTaskStatuses(List<TaskStatus> taskStatuses) {
     List<TaskStatus> tlist = new ArrayList<TaskStatus>();
 
-    for(TaskStatus taskStatus : taskStatuses) {
+    for (TaskStatus taskStatus : taskStatuses) {
       if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED
           || taskStatus.getRunState() == TaskStatus.State.FAILED) {
         synchronized (finishedTasks) {
@@ -536,7 +537,13 @@ public class GroomServer implements Runn
         conf.addResource(localJobFile);
         jobConf = new BSPJob(conf, task.getJobID().toString());
 
-        Path jarFile = new Path(jobConf.getJar());
+        Path jarFile = null;
+        if (jobConf.getJar() != null) {
+          jarFile = new Path(jobConf.getJar());
+        } else {
+          LOG.warn("No jar file for job " + task.getJobID()
+              + " has been defined!");
+        }
         jobConf.setJar(localJarFile.toString());
 
         if (jarFile != null) {
@@ -627,7 +634,7 @@ public class GroomServer implements Runn
       TaskStatus status = tip.getStatus();
       result.add((TaskStatus) status.clone());
     }
-    
+
     return result;
   }
 
@@ -877,7 +884,7 @@ public class GroomServer implements Runn
           BSPPeerProtocol.class, BSPPeerProtocol.versionID, address,
           defaultConf);
 
-      Task task = umbilical.getTask(taskid);
+      BSPTask task = (BSPTask) umbilical.getTask(taskid);
       int peerPort = umbilical.getAssignedPortNum(taskid);
 
       defaultConf.addResource(new Path(task.getJobFile()));
@@ -889,14 +896,16 @@ public class GroomServer implements Runn
       }
       defaultConf.setInt(Constants.PEER_PORT, peerPort);
 
-      // instantiate and init our peer
-      BSPPeerImpl bspPeer = new BSPPeerImpl(job, defaultConf, taskid, umbilical);
-
       try {
         // use job-specified working directory
         FileSystem.get(job.getConf()).setWorkingDirectory(
             job.getWorkingDirectory());
 
+        // instantiate and init our peer
+        @SuppressWarnings("rawtypes")
+        BSPPeerImpl<?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, defaultConf,
+            taskid, umbilical, task.partition, task.splitClass, task.split);
+
         task.run(job, bspPeer, umbilical); // run the task
 
       } catch (FSError e) {
@@ -908,8 +917,6 @@ public class GroomServer implements Runn
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         throwable.printStackTrace(new PrintStream(baos));
       } finally {
-        bspPeer.close(); // close peer.
-
         RPC.stopProxy(umbilical);
         // Shutting down log4j of the child-vm...
         // This assumes that on return from Task.run()

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;
@@ -6,6 +23,7 @@ public interface InputFormat<K, V> {
 
   InputSplit[] getSplits(BSPJob job, int numBspTask) throws IOException;
 
-  RecordReader<K, V> getRecordReader(InputSplit split, BSPJob job) throws IOException;
+  RecordReader<K, V> getRecordReader(InputSplit split, BSPJob job)
+      throws IOException;
 
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Nov  3 10:31:10 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMaster.State;
 import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.hama.util.KeyValuePair;
 
 /**
  * A multithreaded local BSP runner that can be used for debugging and local
@@ -210,7 +211,7 @@ public class LocalBSPRunner implements J
       try {
         bsp.setup(groom);
         // TODO 
-        bsp.bsp(groom, null, null);
+//        bsp.bsp(groom, null, null);
       } catch (Exception e) {
         LOG.error("Exception during BSP execution!", e);
       }
@@ -379,5 +380,23 @@ public class LocalBSPRunner implements J
       return allPeers.length;
     }
 
+    @Override
+    public void write(Object key, Object value) throws IOException {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public boolean readNext(Object key, Object value) throws IOException {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
+    @Override
+    public KeyValuePair readNext() throws IOException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
   }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.DataInput;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import org.apache.hadoop.fs.FileSystem;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Thu Nov  3 10:31:10 2011
@@ -138,7 +138,7 @@ public abstract class Task implements Wr
    * @param bspPeer for communications
    * @param umbilical for communications with GroomServer
    */
-  public abstract void run(BSPJob job, BSPPeerImpl bspPeer, BSPPeerProtocol umbilical)
+  public abstract void run(BSPJob job, BSPPeerImpl<?,?,?,?> bspPeer, BSPPeerProtocol umbilical)
       throws IOException;
 
   public abstract BSPTaskRunner createRunner(GroomServer groom);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java Thu Nov  3 10:31:10 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.DataOutputStream;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java Thu Nov  3 10:31:10 2011
@@ -34,7 +34,8 @@ public class SyncServiceFactory {
       throws ClassNotFoundException {
     return (SyncClient) ReflectionUtils.newInstance(conf.getClassByName(conf
         .get(SYNC_CLIENT_CLASS,
-            "org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl")), conf);
+            org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+                .getCanonicalName())), conf);
   }
 
   /**
@@ -47,7 +48,7 @@ public class SyncServiceFactory {
       throws ClassNotFoundException {
     return (SyncServer) ReflectionUtils.newInstance(conf.getClassByName(conf
         .get(SYNC_SERVER_CLASS,
-            "org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl")), conf);
+            org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl.class.getCanonicalName())), conf);
   }
 
   /**

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java?rev=1197057&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java Thu Nov  3 10:31:10 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+/**
+ * Immutable class for key values.
+ * 
+ * @param <K>
+ * @param <V>
+ */
+public class KeyValuePair<K, V> {
+
+  private final K key;
+  private final V value;
+
+  public KeyValuePair(K key, V value) {
+    super();
+    this.key = key;
+    this.value = value;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public V getValue() {
+    return value;
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Nov  3 10:31:10 2011
@@ -47,7 +47,8 @@ public class TestBSPMasterGroomServer ex
     configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     configuration.set("hama.sync.client.class",
-        "org.apache.hama.bsp.sync.zookeeper.ZooKeeperSyncClientImpl");
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+        .getCanonicalName());
   }
 
   public void setUp() throws Exception {

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java Thu Nov  3 10:31:10 2011
@@ -36,7 +36,7 @@ public class TestLocalRunner extends Tes
 
     // FIXME
     // assertTrue(bsp.waitForCompletion(true));
-    TestBSPMasterGroomServer.checkOutput(fileSys, configuration, 20);
+    //TestBSPMasterGroomServer.checkOutput(fileSys, configuration, 20);
   }
 
 }

Modified: incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java (original)
+++ incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java Thu Nov  3 10:31:10 2011
@@ -27,12 +27,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
 import org.apache.zookeeper.KeeperException;
 
 public class ClassSerializePrinting {
@@ -46,9 +44,7 @@ public class ClassSerializePrinting {
     private FileSystem fileSys;
     private int num;
 
-    public void bsp(BSPPeer bspPeer,
-        RecordReader<NullWritable, NullWritable> input,
-        OutputCollector<NullWritable, NullWritable> out) throws IOException,
+    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> bspPeer) throws IOException,
         KeeperException, InterruptedException {
 
       int i = 0;
@@ -86,18 +82,5 @@ public class ClassSerializePrinting {
         e.printStackTrace();
       }
     }
-
-    @Override
-    public void cleanup(BSPPeer peer) {
-      // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void setup(BSPPeer peer) throws IOException, KeeperException,
-        InterruptedException {
-      // TODO Auto-generated method stub
-
-    }
   }
 }

Modified: incubator/hama/trunk/core/src/test/java/testjar/testjob.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/testjar/testjob.jar?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
Binary files - no diff available.

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Thu Nov  3 10:31:10 2011
@@ -33,8 +33,6 @@ import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.NullOutputFormat;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
 import org.apache.zookeeper.KeeperException;
 
 public class CombineExample {
@@ -44,9 +42,7 @@ public class CombineExample {
     public static final Log LOG = LogFactory.getLog(MyBSP.class);
 
     @Override
-    public void bsp(BSPPeer peer,
-        RecordReader<NullWritable, NullWritable> input,
-        OutputCollector<NullWritable, NullWritable> output) throws IOException,
+    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) throws IOException,
         KeeperException, InterruptedException {
       for (String peerName : peer.getAllPeerNames()) {
         peer.send(peerName, new IntegerMessage(peer.getPeerName(), 1));
@@ -61,19 +57,6 @@ public class CombineExample {
       }
     }
 
-    @Override
-    public void cleanup(BSPPeer peer) {
-      // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void setup(BSPPeer peer) throws IOException, KeeperException,
-        InterruptedException {
-      // TODO Auto-generated method stub
-
-    }
-
   }
 
   public static class SumCombiner extends Combiner {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Thu Nov  3 10:31:10 2011
@@ -30,7 +30,7 @@ public class ExampleDriver {
       pgd.addClass("bench", RandBench.class, "Random Communication Benchmark");
       pgd.addClass("test", SerializePrinting.class, "Serialize Printing Test");
       pgd.addClass("combine", CombineExample.class, "Combiner Example");
-      
+
       pgd.driver(args);
     } catch (Throwable e) {
       e.printStackTrace();

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Thu Nov  3 10:31:10 2011
@@ -38,8 +38,6 @@ import org.apache.hama.bsp.ClusterStatus
 import org.apache.hama.bsp.DoubleMessage;
 import org.apache.hama.bsp.FileOutputFormat;
 import org.apache.hama.bsp.NullInputFormat;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
 import org.apache.hama.bsp.TextOutputFormat;
 import org.apache.zookeeper.KeeperException;
 
@@ -53,10 +51,9 @@ public class PiEstimator {
     private static final int iterations = 10000;
 
     @Override
-    public void bsp(BSPPeer peer,
-        RecordReader<NullWritable, NullWritable> input,
-        OutputCollector<Text, DoubleWritable> output) throws IOException,
-        KeeperException, InterruptedException {
+    public void bsp(
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+        throws IOException, KeeperException, InterruptedException {
 
       int in = 0, out = 0;
       for (int i = 0; i < iterations; i++) {
@@ -83,21 +80,17 @@ public class PiEstimator {
         }
 
         pi = pi / numPeers;
-        output.collect(new Text("Estimated value of PI is"),
-            new DoubleWritable(pi));
+        peer.write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
       }
     }
 
     @Override
-    public void setup(BSPPeer peer) {
+    public void setup(
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer) {
       // Choose one as a master
       this.masterTask = peer.getPeerName(0);
     }
 
-    @Override
-    public void cleanup(BSPPeer peer) {
-    }
-
   }
 
   private static void printOutput(HamaConfiguration conf) throws IOException {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Thu Nov  3 10:31:10 2011
@@ -33,8 +33,6 @@ import org.apache.hama.bsp.ByteMessage;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.NullOutputFormat;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
 import org.apache.hama.util.Bytes;
 import org.apache.zookeeper.KeeperException;
 
@@ -52,10 +50,9 @@ public class RandBench {
     private int nSupersteps;
 
     @Override
-    public void bsp(BSPPeer peer,
-        RecordReader<NullWritable, NullWritable> input,
-        OutputCollector<NullWritable, NullWritable> output) throws IOException,
-        KeeperException, InterruptedException {
+    public void bsp(
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer)
+        throws IOException, KeeperException, InterruptedException {
       byte[] dummyData = new byte[sizeOfMsg];
       BSPMessage msg = null;
       String[] peers = peer.getAllPeerNames();
@@ -82,17 +79,12 @@ public class RandBench {
     }
 
     @Override
-    public void setup(BSPPeer peer) {
+    public void setup(
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) {
       this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1);
       this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1);
       this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1);
     }
-
-    @Override
-    public void cleanup(BSPPeer peer) {
-      // TODO Auto-generated method stub
-
-    }
   }
 
   public static void main(String[] args) throws Exception {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java Thu Nov  3 10:31:10 2011
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
@@ -37,8 +37,6 @@ import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.NullOutputFormat;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
 import org.apache.zookeeper.KeeperException;
 
 public class SerializePrinting {
@@ -52,10 +50,9 @@ public class SerializePrinting {
     private int num;
 
     @Override
-    public void bsp(BSPPeer peer,
-        RecordReader<NullWritable, NullWritable> input,
-        OutputCollector<NullWritable, NullWritable> output) throws IOException,
-        KeeperException, InterruptedException {
+    public void bsp(
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer)
+        throws IOException, KeeperException, InterruptedException {
 
       LOG.info(peer.getAllPeerNames());
       int i = 0;
@@ -81,7 +78,8 @@ public class SerializePrinting {
     }
 
     @Override
-    public void setup(BSPPeer peer) {
+    public void setup(
+        BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) {
       num = Integer.parseInt(conf.get("bsp.peers.num"));
       try {
         fileSys = FileSystem.get(conf);
@@ -89,12 +87,6 @@ public class SerializePrinting {
         throw new Error("Filesystem could not be initialized! ", e);
       }
     }
-
-    @Override
-    public void cleanup(BSPPeer peer) {
-      // TODO Auto-generated method stub
-
-    }
   }
 
   private static void printOutput(FileSystem fileSys, ClusterStatus cluster,
@@ -128,7 +120,7 @@ public class SerializePrinting {
     bsp.setBspClass(HelloBSP.class);
     bsp.setInputFormat(NullInputFormat.class);
     bsp.setOutputFormat(NullOutputFormat.class);
-    
+
     // Set the task size as a number of GroomServer
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(false);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java Thu Nov  3 10:31:10 2011
@@ -60,7 +60,7 @@ public class PageRank extends PageRankBa
   }
 
   @Override
-  public void bsp(BSPPeer peer, RecordReader input, OutputCollector output) throws IOException, KeeperException,
+  public void bsp(BSPPeer peer) throws IOException, KeeperException,
       InterruptedException {
     String master = peer.getConfiguration().get(MASTER_TASK);
     // setup the datasets

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java?rev=1197057&r1=1197056&r2=1197057&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java Thu Nov  3 10:31:10 2011
@@ -51,7 +51,7 @@ public class ShortestPaths extends Short
   private String[] peerNames;
 
   @Override
-  public void bsp(BSPPeer peer, RecordReader input, OutputCollector output)
+  public void bsp(BSPPeer peer)
       throws IOException, KeeperException, InterruptedException {
     // map our input into ram
     mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,



Mime
View raw message