hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1145840 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/examples/org/apache/hama/examples/graph/ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/ src/test/org/apache/hama/bsp/ src/test/testjar/
Date Wed, 13 Jul 2011 02:30:40 GMT
Author: edwardyoon
Date: Wed Jul 13 02:30:39 2011
New Revision: 1145840

URL: http://svn.apache.org/viewvc?rev=1145840&view=rev
Log:
Move BSPPeer constructor into child process.

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/PeerNames.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java
    incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Jul 13 02:30:39 2011
@@ -1,5 +1,15 @@
 Hama Change Log
 
+Release 0.4 - Unreleased
+
+  NEW FEATURES
+
+  BUG FIXES
+
+  IMPROVEMENTS
+  
+    HAMA-414: Move BSPPeer constructor into child process (edwardyoon)
+    
 Release 0.3 - Unreleased
 
   NEW FEATURES

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Jul 13 02:30:39 2011
@@ -31,7 +31,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
-import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.DoubleMessage;
 import org.apache.zookeeper.KeeperException;
@@ -46,7 +46,7 @@ public class PiEstimator {
     private String masterTask;
     private static final int iterations = 10000;
 
-    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+    public void bsp(BSPPeer bspPeer) throws IOException,
         KeeperException, InterruptedException {
       int in = 0, out = 0;
       for (int i = 0; i < iterations; i++) {
@@ -133,7 +133,7 @@ public class PiEstimator {
       bsp.setNumBspTask(Integer.parseInt(args[0]));
     } else {
       // Set to maximum
-      bsp.setNumBspTask(cluster.getGroomServers());
+      bsp.setNumBspTask(cluster.getMaxTasks());
     }
 
     // Choose one as a master

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java Wed Jul 13 02:30:39 2011
@@ -28,7 +28,7 @@ import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPMessage;
-import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.ByteMessage;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.util.Bytes;
@@ -48,7 +48,7 @@ public class RandBench {
     private int nSupersteps;
 
     @Override
-    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+    public void bsp(BSPPeer bspPeer) throws IOException,
         KeeperException, InterruptedException {
       byte[] dummyData = new byte[sizeOfMsg];
       BSPMessage msg = null;
@@ -111,7 +111,7 @@ public class RandBench {
     // Set the task size as a number of GroomServer
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(false);
-    bsp.setNumBspTask(cluster.getGroomServers());
+    bsp.setNumBspTask(cluster.getMaxTasks());
 
     long startTime = System.currentTimeMillis();
     bsp.waitForCompletion(true);

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Wed Jul 13 02:30:39 2011
@@ -27,13 +27,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
-import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.zookeeper.KeeperException;
 
@@ -47,7 +47,7 @@ public class SerializePrinting {
     private FileSystem fileSys;
     private int num;
 
-    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+    public void bsp(BSPPeer bspPeer) throws IOException,
         KeeperException, InterruptedException {
 
       int i = 0;

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java Wed Jul 13 02:30:39 2011
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
-import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.DoubleMessage;
 import org.apache.zookeeper.KeeperException;
@@ -48,7 +48,7 @@ public class PageRank extends PageRankBa
   private String[] peerNames;
 
   @Override
-  public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
+  public void bsp(BSPPeer peer) throws IOException, KeeperException,
       InterruptedException {
     String master = conf.get(MASTER_TASK);
     // setup the datasets
@@ -109,7 +109,7 @@ public class PageRank extends PageRankBa
     LOG.info("Finished with iteration " + iteration + "!");
   }
 
-  private double broadcastError(BSPPeerProtocol peer, String master,
+  private double broadcastError(BSPPeer peer, String master,
       double error) throws IOException, KeeperException, InterruptedException {
     peer.send(master, new DoubleMessage("", error));
     peer.sync();
@@ -148,7 +148,7 @@ public class PageRank extends PageRankBa
     }
   }
 
-  private void sendMessageToNeighbors(BSPPeerProtocol peer, PageRankVertex v)
+  private void sendMessageToNeighbors(BSPPeer peer, PageRankVertex v)
       throws IOException {
     List<PageRankVertex> outgoingEdges = adjacencyList.get(v);
     for (PageRankVertex adjacent : outgoingEdges) {

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java Wed Jul 13 02:30:39 2011
@@ -39,7 +39,7 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BSPPeer;
 
 public abstract class PageRankBase extends BSP {
   public static final Log LOG = LogFactory.getLog(PageRankBase.class);
@@ -52,7 +52,7 @@ public abstract class PageRankBase exten
   protected static double EPSILON = 0.001;
 
   static HashMap<PageRankVertex, List<PageRankVertex>> mapAdjacencyList(
-      Configuration conf, BSPPeerProtocol peer) throws FileNotFoundException,
+      Configuration conf, BSPPeer peer) throws FileNotFoundException,
       IOException {
     FileSystem fs = FileSystem.get(conf);
     HashMap<PageRankVertex, List<PageRankVertex>> adjacencyList = new HashMap<PageRankVertex, List<PageRankVertex>>();
@@ -199,7 +199,7 @@ public abstract class PageRankBase exten
     return conf;
   }
 
-  static void savePageRankMap(BSPPeerProtocol peer, Configuration conf,
+  static void savePageRankMap(BSPPeer peer, Configuration conf,
       Map<PageRankVertex, Double> tentativePagerank) throws IOException {
     FileSystem fs = FileSystem.get(conf);
     Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp"

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java Wed Jul 13 02:30:39 2011
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
-import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BooleanMessage;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.IntegerMessage;
@@ -48,7 +48,7 @@ public class ShortestPaths extends Short
   private String[] peerNames;
 
   @Override
-  public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
+  public void bsp(BSPPeer peer) throws IOException, KeeperException,
       InterruptedException {
     LOG.info("Mapping graph into ram...");
     // map our input into ram
@@ -119,7 +119,7 @@ public class ShortestPaths extends Short
    * @throws KeeperException
    * @throws InterruptedException
    */
-  private boolean broadcastUpdatesMade(BSPPeerProtocol peer, String master,
+  private boolean broadcastUpdatesMade(BSPPeer peer, String master,
       int updates) throws IOException, KeeperException, InterruptedException {
     peer.send(master, new IntegerMessage(peer.getPeerName(), updates));
     peer.sync();
@@ -154,7 +154,7 @@ public class ShortestPaths extends Short
    * @param id The vertex to all adjacent vertices the new cost has to be send.
    * @throws IOException
    */
-  private void sendMessageToNeighbors(BSPPeerProtocol peer,
+  private void sendMessageToNeighbors(BSPPeer peer,
       ShortestPathVertex id) throws IOException {
     List<ShortestPathVertex> outgoingEdges = adjacencyList.get(id);
     for (ShortestPathVertex adjacent : outgoingEdges) {

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java Wed Jul 13 02:30:39 2011
@@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BSPPeer;
 
 public abstract class ShortestPathsBase extends BSP {
   
@@ -54,7 +54,7 @@ public abstract class ShortestPathsBase 
    * @param adjacencyList
    * @throws IOException
    */
-  static void saveVertexMap(Configuration conf, BSPPeerProtocol peer,
+  static void saveVertexMap(Configuration conf, BSPPeer peer,
       Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList)
       throws IOException {
     Path outPath = new Path(conf.get(OUT_PATH) + Path.SEPARATOR
@@ -106,7 +106,7 @@ public abstract class ShortestPathsBase 
    * @param adjacencyList
    * @param vertexLookupMap
    */
-  static void mapAdjacencyList(Configuration conf, BSPPeerProtocol peer,
+  static void mapAdjacencyList(Configuration conf, BSPPeer peer,
       Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList,
       Map<String, ShortestPathVertex> vertexLookupMap)
       throws FileNotFoundException, IOException {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java Wed Jul 13 02:30:39 2011
@@ -39,7 +39,6 @@ public interface BSPInterface extends Co
    * @throws KeeperException
    * @throws InterruptedException
    */
-  public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException,
+  public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
       InterruptedException;
-
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Wed Jul 13 02:30:39 2011
@@ -412,13 +412,9 @@ public class BSPJobClient extends Config
       IOException {
     BSPJobClient jc = new BSPJobClient(job.getConf());
 
-    // TODO this code must be removed
-    // when GroomServer supports the multiple tasks.
     if (job.getNumBspTask() == 0
-        || job.getNumBspTask() > jc.getClusterStatus(false).getGroomServers()) {
-      // If the number of tasks is greater than the number of GroomServer,
-      // reset the number of tasks as number of GroomServer.
-      job.setNumBspTask(jc.getClusterStatus(false).getGroomServers());
+        || job.getNumBspTask() > jc.getClusterStatus(false).getMaxTasks()) {
+      job.setNumBspTask(jc.getClusterStatus(false).getMaxTasks());
     }
 
     RunningJob running = jc.submitJobInternal(job);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Wed Jul 13 02:30:39 2011
@@ -62,7 +62,7 @@ public class BSPMaster implements JobSub
   private HamaConfiguration conf;
 
   /**
-   *  Constants for BSPMaster's status.
+   * Constants for BSPMaster's status.
    */
   public static enum State {
     INITIALIZING, RUNNING

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Wed Jul 13 02:30:39 2011
@@ -72,11 +72,29 @@ public class BSPPeer implements Watcher,
   private InetSocketAddress peerAddress;
   private TaskStatus currentTaskStatus;
 
+  private TaskAttemptID taskid;
+  private BSPPeerProtocol umbilical;
+
+  /**
+   * Protected default constructor for LocalBSPRunner.
+   */
+  protected BSPPeer() {
+    bspRoot = null;
+    quorumServers = null;
+  }
+
   /**
    * Constructor
+   * 
+   * @param umbilical
+   * @param taskid
    */
-  public BSPPeer(Configuration conf) throws IOException {
+  public BSPPeer(Configuration conf, TaskAttemptID taskid,
+      BSPPeerProtocol umbilical) throws IOException {
     this.conf = conf;
+    this.taskid = taskid;
+    this.umbilical = umbilical;
+
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
@@ -89,15 +107,14 @@ public class BSPPeer implements Watcher,
     // TODO: may require to dynamic reflect the underlying
     // network e.g. ip address, port.
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
-
     reinitialize();
   }
 
   public void reinitialize() {
     try {
       LOG.debug("reinitialize(): " + getPeerName());
-      server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
-          .getPort(), conf);
+      server = RPC.getServer(this, peerAddress.getHostName(),
+          peerAddress.getPort(), conf);
       server.start();
       LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
           + peerAddress.getPort());
@@ -202,7 +219,8 @@ public class BSPPeer implements Watcher,
 
     leaveBarrier();
     currentTaskStatus.incrementSuperstepCount();
-
+    umbilical.incrementSuperstepCount(taskid);
+    
     startTime = System.currentTimeMillis();
     // Clear outgoing queues.
     clearOutgoingQueues();
@@ -219,8 +237,8 @@ public class BSPPeer implements Watcher,
     // TODO: This is a quite temporary solution of HAMA-387.
     // If zk.getChildren() response is slower than 200 milliseconds,
     // BSP system will be hanged.
-    
-    // We have to consider another way to avoid this problem. 
+
+    // We have to consider another way to avoid this problem.
     if ((System.currentTimeMillis() - startTime) < 200) {
       Thread.sleep(200); // at least wait
     }
@@ -229,8 +247,9 @@ public class BSPPeer implements Watcher,
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
     LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
         + this.getSuperstepCount());
-    zk.create(bspRoot + "/" + getPeerName(), Bytes.toBytes(this
-        .getSuperstepCount()), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    zk.create(bspRoot + "/" + getPeerName(),
+        Bytes.toBytes(this.getSuperstepCount()), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.EPHEMERAL);
 
     while (true) {
       synchronized (mutex) {
@@ -322,8 +341,8 @@ public class BSPPeer implements Watcher,
 
   private InetSocketAddress getAddress(String peerName) {
     String[] peerAddrParts = peerName.split(":");
-    return new InetSocketAddress(peerAddrParts[0], Integer
-        .parseInt(peerAddrParts[1]));
+    return new InetSocketAddress(peerAddrParts[0],
+        Integer.parseInt(peerAddrParts[1]));
   }
 
   @Override

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerProtocol.java Wed Jul 13 02:30:39 2011
@@ -17,12 +17,16 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hama.Constants;
+
 /**
  * Protocol that task child process uses to contact its parent process.
  */
-public interface BSPPeerProtocol extends BSPPeerInterface {
+public interface BSPPeerProtocol extends BSPRPCProtocolVersion, Closeable,
+    Constants {
 
   /** Called when a child task process starts, to get its task. */
   Task getTask(TaskAttemptID taskid) throws IOException;
@@ -46,4 +50,11 @@ public interface BSPPeerProtocol extends
   /** Report that the task encounted a local filesystem error. */
   void fsError(TaskAttemptID taskId, String message) throws IOException;
 
+  void incrementSuperstepCount(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * @return the all BSPPeer names.
+   */
+  PeerNames getAllPeerNames();
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Jul 13 02:30:39 2011
@@ -49,14 +49,14 @@ public class BSPTask extends Task {
   }
 
   @Override
-  public void run(BSPJob job, BSPPeerProtocol umbilical)
+  public void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
       throws IOException {
     
     BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
         "bsp.work.class", BSP.class), job.getConf());
 
     try {
-      bsp.bsp(umbilical);
+      bsp.bsp(bspPeer);
     } catch (IOException e) {
       LOG.error("Exception during BSP execution!", e);
     } catch (KeeperException e) {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Jul 13 02:30:39 2011
@@ -49,15 +49,14 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.ipc.MasterProtocol;
 import org.apache.hama.ipc.WorkerProtocol;
 import org.apache.log4j.LogManager;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * A Groom Server (shortly referred to as groom) is a process that performs bsp
@@ -68,9 +67,7 @@ import org.apache.zookeeper.KeeperExcept
  * physical node.
  */
 public class GroomServer implements Runnable, WorkerProtocol, BSPPeerProtocol {
-
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
-  private BSPPeer bspPeer;
   static final String SUBDIR = "groomServer";
 
   private volatile static int REPORT_INTERVAL = 1 * 1000;
@@ -92,6 +89,7 @@ public class GroomServer implements Runn
   // Attributes
   String groomServerName;
   String localHostname;
+  String groomHostName;
   InetSocketAddress bspMasterAddr;
   private Instructor instructor;
 
@@ -121,21 +119,25 @@ public class GroomServer implements Runn
   InetSocketAddress taskReportAddress;
   Server taskReportServer = null;
 
-//  private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
+  private PeerNames allPeerNames = null;
+
+  // private BlockingQueue<GroomServerAction> tasksToCleanup = new
+  // LinkedBlockingQueue<GroomServerAction>();
 
   private class DispatchTasksHandler implements DirectiveHandler {
 
     public void handle(Directive directive) throws DirectiveException {
       GroomServerAction[] actions = ((DispatchTasksDirective) directive)
           .getActions();
-      synchronized (bspPeer) {
-        bspPeer.setAllPeerNames(((DispatchTasksDirective) directive)
-            .getGroomServerPeers().values());
-      }
+
+      allPeerNames = new PeerNames(((DispatchTasksDirective) directive)
+          .getGroomServerPeers().values());
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("Got Response from BSPMaster with "
             + ((actions != null) ? actions.length : 0) + " actions");
       }
+
       if (actions != null) {
         for (GroomServerAction action : actions) {
           if (action instanceof LaunchTaskAction) {
@@ -216,8 +218,9 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
-          "default"), conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(
+          conf.get("bsp.dns.interface", "default"),
+          conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -230,7 +233,6 @@ public class GroomServer implements Runn
     this.finishedTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
     this.conf.set(Constants.PEER_HOST, localHostname);
     this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
-    bspPeer = new BSPPeer(conf);
 
     int rpcPort = -1;
     String rpcAddr = null;
@@ -270,8 +272,9 @@ public class GroomServer implements Runn
         + ":" + taskReportAddress.getPort());
     LOG.info("GroomServer up at: " + this.taskReportAddress);
 
-    this.groomServerName = "groomd_" + bspPeer.getPeerName().replace(':', '_');
-    LOG.info("Starting groom: " + this.groomServerName);
+    this.groomHostName = rpcAddr;
+    this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_');
+    LOG.info("Starting groom: " + this.rpcServer);
 
     DistributedCache.purgeCache(this.conf);
 
@@ -284,7 +287,7 @@ public class GroomServer implements Runn
       throw new IllegalArgumentException("Error rpc address " + rpcAddr
           + " port" + rpcPort);
     if (!this.masterClient.register(new GroomServerStatus(groomServerName,
-        bspPeer.getPeerName(), cloneAndResetRunningTaskStatuses(), failures,
+        getBspPeerName(), cloneAndResetRunningTaskStatuses(), failures,
         maxCurrentTasks, this.rpcServer))) {
       LOG.error("There is a problem in establishing communication"
           + " link with BSPMaster");
@@ -376,14 +379,16 @@ public class GroomServer implements Runn
           Thread.sleep(REPORT_INTERVAL);
           TaskInProgress tip = e.getValue();
           TaskStatus taskStatus = tip.getStatus();
-          taskStatus.setProgress(bspPeer.getSuperstepCount());
 
-          if (bspPeer.getLocalQueueSize() == 0
-              && bspPeer.getOutgoingQueueSize() == 0 && !tip.runner.isAlive()) {
-            if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
-              taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+          if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+            taskStatus.setProgress(taskStatus.getSuperstepCount());
+
+            if (!tip.runner.isAlive()) {
+              if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
+                taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+              }
+              taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
             }
-            taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
           }
 
           doReport(taskStatus);
@@ -451,7 +456,7 @@ public class GroomServer implements Runn
    */
   public void doReport(TaskStatus taskStatus) {
     GroomServerStatus groomStatus = new GroomServerStatus(groomServerName,
-        bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures,
+        getBspPeerName(), updateTaskStatus(taskStatus), failures,
         maxCurrentTasks, rpcServer);
     try {
       boolean ret = masterClient.report(new ReportGroomStatusDirective(
@@ -647,7 +652,6 @@ public class GroomServer implements Runn
   public synchronized void close() throws IOException {
     this.running = false;
     this.initialized = false;
-    bspPeer.close();
     cleanupStorage();
     this.workerServer.stop();
     RPC.stopProxy(masterClient);
@@ -728,8 +732,6 @@ public class GroomServer implements Runn
     public void launchTask() throws IOException {
       localizeTask(task);
       taskStatus.setRunState(TaskStatus.State.RUNNING);
-      bspPeer.setJobConf(jobConf);
-      bspPeer.setCurrentTaskStatus(taskStatus);
       this.runner = task.createRunner(GroomServer.this);
       this.runner.start();
     }
@@ -806,14 +808,11 @@ public class GroomServer implements Runn
   }
 
   /**
-   * GroomServer address information.
-   * 
    * @return bsp peer information in the form of "address:port".
    */
   public String getBspPeerName() {
-    if (null != bspPeer)
-      return bspPeer.getPeerName();
-    return null;
+    // TODO Later, peers list should be returned.
+    return this.groomHostName + ":" + Constants.DEFAULT_PEER_PORT;
   }
 
   /**
@@ -841,12 +840,26 @@ public class GroomServer implements Runn
       defaultConf.addResource(new Path(task.getJobFile()));
       BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
 
+      defaultConf.set(Constants.PEER_HOST, args[3]);
+      defaultConf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
+      BSPPeer bspPeer = new BSPPeer(defaultConf, taskid, umbilical);
+      bspPeer.setJobConf(job);
+      bspPeer.setAllPeerNames(umbilical.getAllPeerNames().getAllPeerNames());
+
+      TaskStatus taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(),
+          0, TaskStatus.State.RUNNING, "running", host,
+          TaskStatus.Phase.STARTING);
+
+      bspPeer.setCurrentTaskStatus(taskStatus);
+
       try {
         // use job-specified working directory
         FileSystem.get(job.getConf()).setWorkingDirectory(
             job.getWorkingDirectory());
 
-        task.run(job, umbilical); // run the task
+        task.run(job, bspPeer, umbilical); // run the task
+
       } catch (FSError e) {
         LOG.fatal("FSError from child", e);
         umbilical.fsError(taskid, e.getMessage());
@@ -856,6 +869,8 @@ public class GroomServer implements Runn
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         throwable.printStackTrace(new PrintStream(baos));
       } finally {
+        bspPeer.close(); // close peer.
+
         RPC.stopProxy(umbilical);
         // Shutting down log4j of the child-vm...
         // This assumes that on return from Task.run()
@@ -875,6 +890,11 @@ public class GroomServer implements Runn
     }
   }
 
+  public void incrementSuperstepCount(TaskAttemptID taskid) throws IOException {
+    TaskInProgress tip = tasks.get(taskid);
+    tip.getStatus().incrementSuperstepCount();
+  }
+
   @Override
   public boolean ping(TaskAttemptID taskid) throws IOException {
     // TODO Auto-generated method stub
@@ -885,62 +905,15 @@ public class GroomServer implements Runn
   public void done(TaskAttemptID taskid, boolean shouldBePromoted)
       throws IOException {
     // TODO Auto-generated method stub
-
   }
 
   @Override
   public void fsError(TaskAttemptID taskId, String message) throws IOException {
     // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void send(String peerName, BSPMessage msg) throws IOException {
-    bspPeer.send(peerName, msg);
-  }
-
-  @Override
-  public void put(BSPMessage msg) throws IOException {
-    bspPeer.put(msg);
-  }
-
-  @Override
-  public void put(BSPMessageBundle messages) throws IOException {
-    bspPeer.put(messages);
-  }
-
-  @Override
-  public BSPMessage getCurrentMessage() throws IOException {
-    return bspPeer.getCurrentMessage();
-  }
-
-  @Override
-  public int getNumCurrentMessages() {
-    return bspPeer.getNumCurrentMessages();
-  }
-
-  @Override
-  public void sync() throws IOException, KeeperException, InterruptedException {
-    bspPeer.sync();
-  }
-
-  @Override
-  public long getSuperstepCount() {
-    return bspPeer.getSuperstepCount();
-  }
-
-  @Override
-  public String getPeerName() {
-    return bspPeer.getPeerName();
-  }
-
-  @Override
-  public String[] getAllPeerNames() {
-    return bspPeer.getAllPeerNames();
   }
 
   @Override
-  public void clear() {
-    bspPeer.clear();
+  public PeerNames getAllPeerNames() {
+    return allPeerNames;
   }
-}
+}
\ No newline at end of file

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java Wed Jul 13 02:30:39 2011
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.WritableFact
  */
 public class JobProfile implements Writable {
 
-  static { // register a ctor
+  static { // register actor
     WritableFactories.setFactory(JobProfile.class, new WritableFactory() {
       public Writable newInstance() {
         return new JobProfile();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java Wed Jul 13 02:30:39 2011
@@ -48,7 +48,6 @@ import org.apache.zookeeper.KeeperExcept
  * 
  */
 public class LocalBSPRunner implements JobSubmissionProtocol {
-
   public static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
 
   private static final String IDENTIFIER = "localrunner";
@@ -65,7 +64,7 @@ public class LocalBSPRunner implements J
         .newFixedThreadPool(threadPoolSize);
   }
 
-  protected HashMap<String, BSPPeerProtocol> localGrooms = new HashMap<String, BSPPeerProtocol>();
+  protected HashMap<String, LocalGroom> localGrooms = new HashMap<String, LocalGroom>();
   protected String jobFile;
   protected String jobName;
 
@@ -115,7 +114,7 @@ public class LocalBSPRunner implements J
         JobStatus.RUNNING);
     for (int i = 0; i < threadPoolSize; i++) {
       String name = IDENTIFIER + " " + i;
-      BSPPeerProtocol localGroom = new LocalGroom(name);
+      LocalGroom localGroom = new LocalGroom(name);
       localGrooms.put(name, localGroom);
       futureList.add(threadPool.submit(new BSPRunner(conf, job, ReflectionUtils
           .newInstance(job.getBspClass(), conf), localGroom)));
@@ -127,7 +126,7 @@ public class LocalBSPRunner implements J
   @Override
   public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
     Map<String, String> map = new HashMap<String, String>();
-    for (Entry<String, BSPPeerProtocol> entry : localGrooms.entrySet()) {
+    for (Entry<String, LocalGroom> entry : localGrooms.entrySet()) {
       map.put(entry.getKey(), entry.getValue().getPeerName());
     }
     return new ClusterStatus(map, threadPoolSize, threadPoolSize, State.RUNNING);
@@ -185,10 +184,9 @@ public class LocalBSPRunner implements J
     Configuration conf;
     BSPJob job;
     BSP bsp;
-    BSPPeerProtocol groom;
+    LocalGroom groom;
 
-    public BSPRunner(Configuration conf, BSPJob job, BSP bsp,
-        BSPPeerProtocol groom) {
+    public BSPRunner(Configuration conf, BSPJob job, BSP bsp, LocalGroom groom) {
       super();
       this.conf = conf;
       this.job = job;
@@ -199,7 +197,7 @@ public class LocalBSPRunner implements J
     public void run() {
       bsp.setConf(conf);
       try {
-        bsp.bsp(groom);
+         bsp.bsp(groom);
       } catch (Exception e) {
         LOG.error("Exception during BSP execution!", e);
       }
@@ -247,15 +245,14 @@ public class LocalBSPRunner implements J
 
   }
 
-  class LocalGroom implements BSPPeerProtocol {
+  class LocalGroom extends BSPPeer {
     private long superStepCount = 0;
     private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
     // outgoing queue
     private final Map<String, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<BSPMessage>>();
     private final String peerName;
 
-    public LocalGroom(String peerName) {
-      super();
+    public LocalGroom(String peerName) throws IOException {
       this.peerName = peerName;
     }
 
@@ -352,33 +349,8 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public Task getTask(TaskAttemptID taskid) throws IOException {
-      return null;
-    }
-
-    @Override
-    public boolean ping(TaskAttemptID taskid) throws IOException {
-      return true;
-    }
-
-    @Override
-    public void done(TaskAttemptID taskid, boolean shouldBePromoted)
-        throws IOException {
-
-    }
-
-    @Override
-    public void fsError(TaskAttemptID taskId, String message)
-        throws IOException {
-
-    }
-
-    @Override
     public void put(BSPMessageBundle messages) throws IOException {
-      throw new UnsupportedOperationException(
-          "Messagebundle is not supported by local testing");
     }
 
   }
-
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/PeerNames.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/PeerNames.java?rev=1145840&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/PeerNames.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/PeerNames.java Wed Jul 13 02:30:39 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * 
+ */
+public class PeerNames implements Writable {
+  Collection<String> allPeers;
+  
+  public PeerNames() {
+    this.allPeers = new ArrayList<String>();
+  }
+  
+  public PeerNames(Collection<String> allPeers) {
+    this.allPeers = allPeers;
+  }
+  
+  public Collection<String> getAllPeerNames() {
+    return allPeers;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(allPeers.size());
+    for (String peerName : allPeers) {
+      Text.writeString(out, peerName);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int peersNum = in.readInt();
+    for (int i = 0; i < peersNum; i++) {
+      allPeers.add(Text.readString(in));
+    }
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Wed Jul 13 02:30:39 2011
@@ -130,6 +130,7 @@ class SimpleTaskScheduler extends TaskSc
     public void run() {
       // obtain tasks
       Task t = jip.obtainNewTask(this.stus, groomNum);
+      
       // assembly into actions
       // List<Task> tasks = new ArrayList<Task>();
       if (jip.getStatus().getRunState() == JobStatus.RUNNING) {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Jul 13 02:30:39 2011
@@ -121,9 +121,10 @@ public abstract class Task implements Wr
    * Run this task as a part of the named job. This method is executed in the
    * child process.
    * 
-   * @param umbilical for progress reports
+   * @param bspPeer for communications
+   * @param umbilical for communications with GroomServer
    */
-  public abstract void run(BSPJob job, BSPPeerProtocol umbilical)
+  public abstract void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
       throws IOException;
 
   public abstract BSPTaskRunner createRunner(GroomServer groom);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskRunner.java Wed Jul 13 02:30:39 2011
@@ -116,6 +116,7 @@ public class TaskRunner extends Thread {
       vargs.add(addr.getHostName());
       vargs.add(Integer.toString(addr.getPort()));
       vargs.add(task.getTaskID().toString());
+      vargs.add(groomServer.groomHostName);
 
       // Run java
       runChild((String[]) vargs.toArray(new String[0]), workDir);

Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java Wed Jul 13 02:30:39 2011
@@ -64,7 +64,6 @@ public abstract class HamaTestCase exten
     conf = new HamaConfiguration();
     conf.setStrings("bsp.local.dir", "/tmp/hama-test");
     conf.set("bsp.master.address", "localhost");
-    conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     conf.set("bsp.groom.report.address", "127.0.0.1:0");
   }
 
@@ -113,7 +112,7 @@ public abstract class HamaTestCase exten
 
   protected Path getUnitTestdir(String testName) {
     return new Path(
-        conf.get(TEST_DIRECTORY_KEY, "test/build/data"), testName);
+        conf.get(TEST_DIRECTORY_KEY, "/tmp/hama-test/build/data"), testName);
   }
 
   /**

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java Wed Jul 13 02:30:39 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
 
@@ -42,6 +43,8 @@ public class TestBSPMasterGroomServer ex
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
     configuration.setStrings("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
   }
 
   public void setUp() throws Exception {
@@ -56,9 +59,13 @@ public class TestBSPMasterGroomServer ex
 
     // Set the task size as a number of GroomServer
     BSPJobClient jobClient = new BSPJobClient(configuration);
+    configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
     ClusterStatus cluster = jobClient.getClusterStatus(false);
     assertEquals(this.numOfGroom, cluster.getMaxTasks());
-    bsp.setNumBspTask(cluster.getGroomServers());
+    
+    // TODO test the multi-tasks 
+    bsp.setNumBspTask(1);
+    
     FileSystem fileSys = FileSystem.get(conf);
 
     if (bsp.waitForCompletion(true)) {
@@ -69,7 +76,7 @@ public class TestBSPMasterGroomServer ex
 
   private static void checkOutput(FileSystem fileSys, ClusterStatus cluster,
       HamaConfiguration conf) throws Exception {
-    for (int i = 0; i < cluster.getGroomServers(); i++) {
+    for (int i = 0; i < 1; i++) { // TODO test the multi-tasks
       SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
           TMP_OUTPUT + i), conf);
       LongWritable timestamp = new LongWritable();

Modified: incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java?rev=1145840&r1=1145839&r2=1145840&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java (original)
+++ incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java Wed Jul 13 02:30:39 2011
@@ -26,10 +26,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BSPPeer;
 import org.apache.zookeeper.KeeperException;
 
 public class ClassSerializePrinting {
@@ -42,7 +42,7 @@ public class ClassSerializePrinting {
     private FileSystem fileSys;
     private int num;
 
-    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+    public void bsp(BSPPeer bspPeer) throws IOException,
         KeeperException, InterruptedException {
 
       int i = 0;



Mime
View raw message