hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1197668 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/sync/ core/src/main/java/org/apache/hama/zookeeper/ yarn/src/main/java/org/apache/hama/bsp/ yarn/src/main/java/org/apache...
Date Fri, 04 Nov 2011 17:03:33 GMT
Author: tjungblut
Date: Fri Nov  4 17:03:32 2011
New Revision: 1197668

URL: http://svn.apache.org/viewvc?rev=1197668&view=rev
Log:
[HAMA-454] Add Zookeeper as synchronization service


Removed:
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/zookeeper/QuorumPeer.java
    incubator/hama/trunk/pom.xml
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Nov  4 17:03:32 2011
@@ -3,9 +3,10 @@ Hama Change Log
 Release 0.4 - Unreleased
 
   NEW FEATURES
-  
+   
+   HAMA-454: Add Zookeeper as synchronization service to YARN (tjungblut)
    HAMA-258: Add Input Output system (edwardyoon)
-   HAMA-456: Add Message Combiner (edwardyoon)
+   HAMA-458: Add Message Combiner (edwardyoon)
    HAMA-456: Add getPeerName(int index) and getNumPeers() (edwardyoon)
    HAMA-431: MapReduce NG integration (tjungblut)
    HAMA-449: Add tasks num of Job to web UI (edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Nov 
4 17:03:32 2011
@@ -294,7 +294,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
       leaveBarrier();
       currentTaskStatus.incrementSuperstepCount();
       umbilical.statusUpdate(taskId, currentTaskStatus);
-
+      
       // Clear outgoing queues.
       clearOutgoingQueues();
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Fri Nov
 4 17:03:32 2011
@@ -31,7 +31,10 @@ public interface SyncServer {
   /**
    * In YARN port and hostname of the sync server is only known at runtime, so
    * this method should modify the conf to set the host:port of the syncserver
-   * that is going to start and return it.
+   * that is going to start and return it.<br/>
+   * <br/>
+   * The property key is "hama.sync.server.address" and the value is
+   * hostname:port.
    * 
    * @param conf
    * @return

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
Fri Nov  4 17:03:32 2011
@@ -33,7 +33,6 @@ public class SyncServerRunner implements
     try {
       syncServer = SyncServiceFactory.getSyncServer(conf);
     } catch (ClassNotFoundException e) {
-      // TODO Auto-generated catch block
       e.printStackTrace();
     }
   }
@@ -42,6 +41,10 @@ public class SyncServerRunner implements
     return syncServer.init(conf);
   }
 
+  public void stop() {
+    syncServer.stopServer();
+  }
+
   @Override
   public Object call() throws Exception {
     syncServer.start();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
Fri Nov  4 17:03:32 2011
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPJobID;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -225,6 +224,18 @@ public class ZooKeeperSyncClientImpl imp
   @Override
   public void register(BSPJobID jobId, TaskAttemptID taskId,
       String hostAddress, long port) {
+    try {
+      if (zk.exists("/" + jobId.toString(), false) == null) {
+        LOG.info("Root node for job: " + jobId.toString()
+            + " does not exists! Creating...");
+        zk.create("/" + jobId.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT);
+      }
+    } catch (KeeperException e) {
+      LOG.error(e);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    }
     registerTask(zk, jobId, hostAddress, port);
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
Fri Nov  4 17:03:32 2011
@@ -17,10 +17,14 @@
  */
 package org.apache.hama.bsp.sync;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.bsp.sync.SyncServer;
 import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.hama.zookeeper.QuorumPeer.ShutdownableZooKeeperServerMain;
+import org.apache.hama.zookeeper.QuorumPeer.ZookeeperTuple;
 
 /**
  * 
@@ -29,7 +33,9 @@ import org.apache.hama.zookeeper.QuorumP
  */
 public class ZooKeeperSyncServerImpl implements SyncServer {
 
+  private final Log LOG = LogFactory.getLog(ZooKeeperSyncServerImpl.class);
   private Configuration conf;
+  private ShutdownableZooKeeperServerMain zooKeeper;
 
   @Override
   public Configuration init(Configuration conf) throws Exception {
@@ -50,6 +56,15 @@ public class ZooKeeperSyncServerImpl imp
       host = hostConfigured.split(",")[0];
     }
 
+    if (conf.get("hama.zookeeper.property.dataDir") == null) {
+      conf.set("hama.zookeeper.property.dataDir", conf.get("bsp.local.dir")
+          + "zookeeper");
+    }
+
+    // always set them
+    conf.set("hama.zookeeper.quorum", host);
+    conf.set("hama.zookeeper.property.clientPort", port + "");
+
     conf.set("hama.sync.server.address", host + ":" + port);
 
     return conf;
@@ -57,12 +72,22 @@ public class ZooKeeperSyncServerImpl imp
 
   @Override
   public void start() throws Exception {
-    QuorumPeer.runZooKeeper(conf);
+    try {
+      ZookeeperTuple tuple = QuorumPeer.runShutdownableZooKeeper(conf);
+      zooKeeper = tuple.main;
+      zooKeeper.runFromConfig(tuple.conf);
+    } catch (Exception e) {
+      LOG.debug(e);
+    }
   }
 
   @Override
   public void stopServer() {
-    // TODO this could be done somehow
+    try {
+      zooKeeper.shutdownZookeeperMain();
+    } catch (Throwable e) {
+      LOG.debug(e);
+    }
   }
 
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/zookeeper/QuorumPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/zookeeper/QuorumPeer.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/zookeeper/QuorumPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/zookeeper/QuorumPeer.java Fri
Nov  4 17:03:32 2011
@@ -85,13 +85,31 @@ public class QuorumPeer implements Const
    * @param baseConf Hadoop Configuration.
    */
   public static void runZooKeeper(Configuration conf) throws Exception {
-
     Properties zkProperties = makeZKProps(conf);
     writeMyID(zkProperties);
     QuorumPeerConfig zkConfig = new QuorumPeerConfig();
     zkConfig.parseProperties(zkProperties);
     runZKServer(zkConfig);
+  }
 
+  /**
+   * Runs a shutdownable Zookeeper main server. This does not work with multiple
+   * quorums!
+   * 
+   * @param conf
+   * @return
+   * @throws Exception
+   */
+  public static ZookeeperTuple runShutdownableZooKeeper(Configuration conf)
+      throws Exception {
+    Properties zkProperties = makeZKProps(conf);
+    writeMyID(zkProperties);
+    QuorumPeerConfig zkConfig = new QuorumPeerConfig();
+    zkConfig.parseProperties(zkProperties);
+    ShutdownableZooKeeperServerMain zk = new ShutdownableZooKeeperServerMain();
+    ServerConfig serverConfig = new ServerConfig();
+    serverConfig.readFrom(zkConfig);
+    return new ZookeeperTuple(zk, serverConfig);
   }
 
   private static void runZKServer(QuorumPeerConfig zkConfig)
@@ -376,4 +394,30 @@ public class QuorumPeer implements Const
   public static String getZKQuorumServersString(Configuration conf) {
     return getZKQuorumServersString(makeZKProps(conf));
   }
+
+  public static class ShutdownableZooKeeperServerMain extends
+      ZooKeeperServerMain {
+
+    public void shutdownZookeeperMain() {
+      this.shutdown();
+    }
+
+    @Override
+    protected void shutdown() {
+      super.shutdown();
+    }
+
+  }
+
+  public static class ZookeeperTuple {
+    public ShutdownableZooKeeperServerMain main;
+    public ServerConfig conf;
+
+    public ZookeeperTuple(ShutdownableZooKeeperServerMain main,
+        ServerConfig conf) {
+      super();
+      this.main = main;
+      this.conf = conf;
+    }
+  }
 }

Modified: incubator/hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/pom.xml (original)
+++ incubator/hama/trunk/pom.xml Fri Nov  4 17:03:32 2011
@@ -187,6 +187,7 @@
     <module>core</module>
     <module>graph</module>
     <module>examples</module>
+    <!--<module>yarn</module>-->
   </modules>
 
   <build>

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
(original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
Fri Nov  4 17:03:32 2011
@@ -18,12 +18,10 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,7 +34,6 @@ import org.apache.hadoop.ipc.ProtocolSig
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hama.util.BSPNetUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
@@ -53,13 +50,15 @@ import org.apache.hadoop.yarn.util.Conve
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.Job.JobState;
-import org.apache.hama.bsp.sync.SyncServer;
-import org.apache.hama.bsp.sync.SyncServerImpl;
+import org.apache.hama.bsp.sync.SyncServerRunner;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.util.BSPNetUtils;
 
 /**
  * BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
  */
-public class BSPApplicationMaster implements BSPClient {
+public class BSPApplicationMaster implements BSPClient, BSPPeerProtocol {
 
   private static final Log LOG = LogFactory.getLog(BSPApplicationMaster.class);
   private static final ExecutorService threadPool = Executors
@@ -80,74 +79,90 @@ public class BSPApplicationMaster implem
   private JobImpl job;
   private BSPJobID jobId;
 
-  private SyncServerImpl syncServer;
-  private Future<Long> syncServerFuture;
-
   // RPC info where the AM receive client side requests
   private String hostname;
   private int clientPort;
+  private int taskServerPort;
 
   private Server clientServer;
+  private Server taskServer;
 
-  private BSPApplicationMaster(String[] args) throws IOException {
+  private long superstep;
+  private SyncServerRunner syncServer;
+
+  private BSPApplicationMaster(String[] args) throws Exception {
     if (args.length != 1) {
       throw new IllegalArgumentException();
     }
 
-    jobFile = args[0];
-    localConf = new YarnConfiguration();
-    jobConf = getSubmitConfiguration(jobFile);
+    this.jobFile = args[0];
+    this.localConf = new YarnConfiguration();
+    this.jobConf = getSubmitConfiguration(jobFile);
 
-    applicationName = jobConf.get("bsp.job.name", "<no bsp job name defined>");
+    this.applicationName = jobConf.get("bsp.job.name",
+        "<no bsp job name defined>");
     if (applicationName.isEmpty()) {
-      applicationName = "<no bsp job name defined>";
+      this.applicationName = "<no bsp job name defined>";
     }
 
-    appAttemptId = getApplicationAttemptId();
+    this.appAttemptId = getApplicationAttemptId();
+
+    this.yarnRPC = YarnRPC.create(localConf);
+    this.clock = new SystemClock();
+    this.startTime = clock.getTime();
 
-    yarnRPC = YarnRPC.create(localConf);
-    clock = new SystemClock();
-    startTime = clock.getTime();
+    this.jobId = new BSPJobID(appAttemptId.toString(), 0);
 
-    jobId = new BSPJobID(appAttemptId.toString(), 0);
+    this.hostname = BSPNetUtils.getCanonicalHostname();
+    this.clientPort = BSPNetUtils.getFreePort(12000);
 
-    // TODO this is not localhost, is it?
-    hostname = BSPNetUtils.getCanonicalHostname();
+    // start our synchronization service
     startSyncServer();
-    clientPort = BSPNetUtils.getFreePort();
-    // TODO should have a configurable amount of RPC handlers
-    this.clientServer = RPC.getServer(this, hostname, clientPort, 10, false,
-        jobConf);
 
+    startRPCServers();
     /*
-     * Make sure that this executes after the start of the sync server, because
-     * we are readjusting the configuration.
+     * Make sure that this executes after the start the RPC servers, because we
+     * are readjusting the configuration.
      */
     rewriteSubmitConfiguration(jobFile, jobConf);
 
-    amrmRPC = getYarnRPCConnection(localConf);
+    this.amrmRPC = getYarnRPCConnection(localConf);
     registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, null);
   }
 
   /**
+   * This method starts the needed RPC servers: client server and the task
+   * server. This method manipulates the configuration and therefore needs to be
+   * executed BEFORE the submitconfiguration gets rewritten.
+   * 
+   * @throws IOException
+   */
+  private void startRPCServers() throws IOException {
+    // start the RPC server which talks to the client
+    this.clientServer = RPC.getServer(BSPClient.class, this, hostname,
+        clientPort, jobConf);
+    this.clientServer.start();
+
+    // start the RPC server which talks to the tasks
+    this.taskServerPort = BSPNetUtils.getFreePort(10000);
+    this.taskServer = RPC.getServer(BSPPeerProtocol.class, this, hostname,
+        taskServerPort, jobConf);
+    this.taskServer.start();
+    // readjusting the configuration to let the tasks know where we are.
+    this.jobConf.set("hama.umbilical.address", hostname + ":" + taskServerPort);
+  }
+
+  /**
    * This method starts the sync server on a specific port and waits for it to
    * come up. Be aware that this method adds the "bsp.sync.server.address" that
    * is needed for a task to connect to the service.
    * 
    * @throws IOException
    */
-  private void startSyncServer() throws IOException {
-    int syncPort = BSPNetUtils.getFreePort(15000);
-    syncServer = new SyncServerImpl(jobConf.getInt("bsp.peers.num", 1),
-        hostname, syncPort);
-    syncServerFuture = threadPool.submit(syncServer);
-    // wait for the RPC to come up
-    InetSocketAddress syncAddress = NetUtils.createSocketAddr(hostname + ":"
-        + syncPort);
-    LOG.info("Waiting for the Sync Master at " + syncAddress);
-    RPC.waitForProxy(SyncServer.class, SyncServer.versionID, syncAddress,
-        jobConf);
-    jobConf.set("hama.sync.server.address", hostname + ":" + syncPort);
+  private void startSyncServer() throws Exception {
+    syncServer = SyncServiceFactory.getSyncServerRunner(jobConf);
+    jobConf = syncServer.init(jobConf);
+    threadPool.submit(syncServer);
   }
 
   /**
@@ -200,12 +215,12 @@ public class BSPApplicationMaster implem
    */
   private ApplicationAttemptId getApplicationAttemptId() throws IOException {
     Map<String, String> envs = System.getenv();
-    if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
+    if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
       throw new IllegalArgumentException(
           "ApplicationAttemptId not set in the environment");
     }
     return ConverterUtils.toApplicationAttemptId(envs
-        .get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
+        .get(ApplicationConstants.AM_CONTAINER_ID_ENV));
   }
 
   private void start() throws Exception {
@@ -214,13 +229,9 @@ public class BSPApplicationMaster implem
       job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
       finalState = job.startJob();
     } finally {
-      if (this.syncServer != null) {
-        this.syncServer.stopServer();
-      }
       if (finalState != null) {
         LOG.info("Job \"" + applicationName + "\"'s state after completion: "
             + finalState.toString());
-        LOG.info("Made " + (syncServerFuture.get() - 1L) + " supersteps!");
         LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L)
             + "s to finish!");
       }
@@ -229,9 +240,12 @@ public class BSPApplicationMaster implem
   }
 
   private void cleanup() throws YarnRemoteException {
-    if (this.syncServer != null) {
-      this.syncServer.stopServer();
+    syncServer.stop();
+    if (threadPool != null && !threadPool.isShutdown()) {
+      threadPool.shutdownNow();
     }
+    clientServer.stop();
+    taskServer.stop();
     FinishApplicationMasterRequest finishReq = Records
         .newRecord(FinishApplicationMasterRequest.class);
     finishReq.setAppAttemptId(appAttemptId);
@@ -252,7 +266,7 @@ public class BSPApplicationMaster implem
   }
 
   public static void main(String[] args) throws YarnRemoteException {
-    // TODO we expect getting the qualified path of the job.xml as the first
+    // we expect getting the qualified path of the job.xml as the first
     // element in the arguments
     BSPApplicationMaster master = null;
     try {
@@ -300,19 +314,70 @@ public class BSPApplicationMaster implem
 
   @Override
   public long getProtocolVersion(String arg0, long arg1) throws IOException {
-    return BSPClient.VERSION;
+    return BSPClient.versionID;
   }
 
   @Override
   public LongWritable getCurrentSuperStep() {
-    return syncServer.getSuperStep();
+    return new LongWritable(superstep);
   }
 
   @Override
   public ProtocolSignature getProtocolSignature(String protocol,
       long clientVersion, int clientMethodsHash) throws IOException {
-    // TODO Auto-generated method stub
-    return new ProtocolSignature();
+    return new ProtocolSignature(BSPPeerProtocol.versionID, null);
+  }
+
+  @Override
+  public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    if (taskStatus.getSuperstepCount() > superstep) {
+      superstep = taskStatus.getSuperstepCount();
+      LOG.info("Now in superstep " + superstep);
+    }
+    return true;
+  }
+
+  /**
+   * most of the following methods are already handled over YARN and with the
+   * JobImpl.
+   */
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public Task getTask(TaskAttemptID taskid) throws IOException {
+    return null;
+  }
+
+  @Override
+  public boolean ping(TaskAttemptID taskid) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void done(TaskAttemptID taskid, boolean shouldBePromoted)
+      throws IOException {
+
+  }
+
+  @Override
+  public void fsError(TaskAttemptID taskId, String message) throws IOException {
+
+  }
+
+  @Override
+  public void fatalError(TaskAttemptID taskId, String message)
+      throws IOException {
+
+  }
+
+  @Override
+  public int getAssignedPortNum(TaskAttemptID taskid) {
+    return 0;
   }
 
 }

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java Fri Nov  4
17:03:32 2011
@@ -22,7 +22,7 @@ import org.apache.hadoop.ipc.VersionedPr
 
 public interface BSPClient extends VersionedProtocol {
 
-  public static final int VERSION = 0;
+  public static final int versionID = 1;
   
   public LongWritable getCurrentSuperStep(); 
   

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java Fri Nov  4
17:03:32 2011
@@ -17,15 +17,19 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.util.BSPNetUtils;
 
 public class BSPRunner {
 
@@ -33,30 +37,51 @@ public class BSPRunner {
 
   private Configuration conf;
   private TaskAttemptID id;
-  private YARNBSPPeerImpl peer;
+  private BSPPeerImpl<?, ?, ?, ?> peer;
 
+  @SuppressWarnings("rawtypes")
   Class<? extends BSP> bspClass;
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   public BSPRunner(String jobId, int taskAttemptId, Path confPath)
-      throws IOException, ClassNotFoundException {
+      throws Exception {
     conf = new HamaConfiguration();
     conf.addResource(confPath);
     this.id = new TaskAttemptID(jobId, 0, taskAttemptId, 0);
     this.id.id = taskAttemptId;
-    peer = new YARNBSPPeerImpl(conf, id);
+
+    // use a calculatory trick to prevent port collision on the same host
+    int port = BSPNetUtils.getFreePort(taskAttemptId * 2 + 16000);
+    conf.setInt(Constants.PEER_PORT, port);
+    conf.set(Constants.PEER_HOST, BSPNetUtils.getCanonicalHostname());
+
+    String umbilicalAddress = conf.get("hama.umbilical.address");
+    if (umbilicalAddress == null || umbilicalAddress.isEmpty()
+        || !umbilicalAddress.contains(":")) {
+      throw new IllegalArgumentException(
+          "Umbilical address must contain a colon and must be non-empty and not-null! Property
\"hama.umbilical.address\" was: "
+              + umbilicalAddress);
+    }
+    String[] hostPort = umbilicalAddress.split(":");
+    InetSocketAddress address = new InetSocketAddress(hostPort[0],
+        Integer.valueOf(hostPort[1]));
+
+    BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
+        BSPPeerProtocol.class, BSPPeerProtocol.versionID, address, conf);
+
+    BSPJob job = new BSPJob(new HamaConfiguration(conf));
+
+    peer = new BSPPeerImpl(job, conf, id, umbilical, port, umbilicalAddress,
+        null);
     // this is a checked cast because we can only set a class via the BSPJob
     // class which only allows derivates of BSP.
     bspClass = (Class<? extends BSP>) conf.getClassByName(conf
         .get("bsp.work.class"));
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   public void startComputation() throws Exception {
     BSP bspInstance = ReflectionUtils.newInstance(bspClass, conf);
-    LOG.info("Syncing for the first time to wait for all the tasks to come up...");
-    peer.getSyncService().enterBarrier(id);
-    peer.getSyncService().leaveBarrier(id);
-    LOG.info("Initial sync was successful, now running the computation!");
     try {
       bspInstance.setup(peer);
       bspInstance.bsp(peer);
@@ -64,6 +89,7 @@ public class BSPRunner {
       throw e;
     } finally {
       bspInstance.cleanup(peer);
+      peer.close();
     }
   }
 
@@ -85,5 +111,6 @@ public class BSPRunner {
     BSPRunner bspRunner = new BSPRunner(args[0], Integer.valueOf(args[1]),
         new Path(args[2]));
     bspRunner.startComputation();
+    LOG.info("Task successfully ended!");
   }
 }

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java Fri Nov  4 17:03:32
2011
@@ -80,7 +80,6 @@ public class JobImpl implements Job {
       threadPool);
   private Map<Integer, BSPTaskLauncher> launchers = new HashMap<Integer, BSPTaskLauncher>();
   private int lastResponseID = 0;
-  private Resource availableResources;
 
   public JobImpl(ApplicationAttemptId appAttemptId,
       Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
@@ -110,7 +109,7 @@ public class JobImpl implements Job {
     }
   }
 
-  // TODO This really needs a testcase
+  // This really needs a testcase
   private int getMemoryFromOptString(String opts) {
     if (!opts.contains("-Xmx")) {
       LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
@@ -158,7 +157,7 @@ public class JobImpl implements Job {
           + amResponse.getAvailableResources().getMemory() + "mb");
       this.lastResponseID = amResponse.getResponseId();
 
-      this.availableResources = amResponse.getAvailableResources();
+//      availableResources = amResponse.getAvailableResources();
       this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
       LOG.info("Waiting to allocate "
           + (numBSPTasks - allocatedContainers.size()) + " more containers...");
@@ -202,7 +201,7 @@ public class JobImpl implements Job {
         state = JobState.FAILED;
         return state;
       } else {
-        LOG.info("Task \"" + id + "\" sucessfully finished!");
+        LOG.info("Task \"" + returnedTask.getId() + "\" sucessfully finished!");
       }
       cleanupTask(returnedTask.getId());
     }

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Fri Nov  4
17:03:32 2011
@@ -240,7 +240,7 @@ public class YARNBSPJob extends BSPJob {
       this.submit();
     }
 
-    client = (BSPClient) RPC.waitForProxy(BSPClient.class, BSPClient.VERSION,
+    client = (BSPClient) RPC.waitForProxy(BSPClient.class, BSPClient.versionID,
         NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
 
     GetApplicationReportRequest reportRequest = Records
@@ -256,23 +256,20 @@ public class YARNBSPJob extends BSPJob {
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.FAILED
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.KILLED
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.SUCCEEDED)
{
-      LOG.info("currently in state: " + localReport.getFinalApplicationStatus());
+      LOG.debug("currently in state: " + localReport.getFinalApplicationStatus());
       if (verbose) {
         long remoteSuperStep = client.getCurrentSuperStep().get();
         if (clientSuperStep > remoteSuperStep) {
           clientSuperStep = remoteSuperStep;
           LOG.info("Current supersteps number: " + clientSuperStep);
         }
-        reportResponse = applicationsManager
-            .getApplicationReport(reportRequest);
-        localReport = reportResponse.getApplicationReport();
       }
+      reportResponse = applicationsManager.getApplicationReport(reportRequest);
+      localReport = reportResponse.getApplicationReport();
+
       Thread.sleep(3000L);
     }
 
-    reportResponse = applicationsManager.getApplicationReport(reportRequest);
-    localReport = reportResponse.getApplicationReport();
-
     if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
       LOG.info("Job succeeded!");
       return true;

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
(original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
Fri Nov  4 17:03:32 2011
@@ -22,19 +22,20 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.zookeeper.KeeperException;
 
 public class YarnSerializePrinting {
 
-  public static class HelloBSP extends BSP {
+  public static class HelloBSP extends BSP<NullWritable, NullWritable, NullWritable, NullWritable>
{
     public static final Log LOG = LogFactory.getLog(HelloBSP.class);
     private Configuration conf;
     private final static int PRINT_INTERVAL = 1000;
     private int num;
 
     @Override
-    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable>
bspPeer) throws IOException, KeeperException,
         InterruptedException {
       num = conf.getInt("bsp.peers.num", 0);
       LOG.info(bspPeer.getAllPeerNames());
@@ -75,7 +76,6 @@ public class YarnSerializePrinting {
     job.setJobName("Serialize Printing");
     job.setMemoryUsedPerTaskInMb(50);
     job.setNumBspTask(2);
-    // TODO waitForCompletion(true) throws exceptions
     job.waitForCompletion(false);
   }
 }

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Fri Nov
 4 17:03:32 2011
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp.sync;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.util.StringArrayWritable;
-
-/**
- * Hadoop RPC based barrier synchronization service.
- * 
- */
-public interface SyncServer extends VersionedProtocol {
-
-  public static final long versionID = 0L;
-
-  public void enterBarrier(TaskAttemptID id);
-
-  public void leaveBarrier(TaskAttemptID id);
-
-  public void register(TaskAttemptID id, Text hostAddress, LongWritable port);
-
-  public LongWritable getSuperStep();
-
-  public StringArrayWritable getAllPeerNames();
-
-  public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
-      LongWritable port);
-
-  public void stopServer();
-
-}

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java?rev=1197668&r1=1197667&r2=1197668&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java Fri
Nov  4 17:03:32 2011
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp.sync;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CyclicBarrier;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.util.StringArrayWritable;
-
-/**
- * Synchronization Deamon. <br\>
- */
-public class SyncServerImpl implements SyncServer, Callable<Long> {
-
-  private static final Log LOG = LogFactory.getLog(SyncServerImpl.class);
-
-  private Configuration conf = new Configuration();
-  private Server server;
-
-  private int parties;
-
-  private CyclicBarrier barrier;
-  private CyclicBarrier leaveBarrier;
-  private Set<Integer> partySet;
-  private Set<String> peerAddresses;
-
-  private volatile long superstep = 0L;
-
-  public SyncServerImpl(int parties, String host, int port) throws IOException {
-    this.parties = parties;
-    this.barrier = new CyclicBarrier(parties);
-    this.leaveBarrier = new CyclicBarrier(parties, new SuperStepIncrementor(
-        this));
-
-    this.partySet = Collections.synchronizedSet(new HashSet<Integer>(parties));
-    // tree set so there is ascending order for consistent returns in
-    // getAllPeerNames()
-    this.peerAddresses = Collections.synchronizedSet(new TreeSet<String>());
-    // allocate ten more rpc handler than parties for additional services to
-    // plug in or to deal with failure.
-    this.server = RPC.getServer(this, host, port, parties + 10, false, conf);
-    LOG.info("Sync Server is now up at: " + host + ":" + port + "!");
-  }
-
-  public void start() throws IOException {
-    server.start();
-  }
-
-  @Override
-  public void stopServer() {
-    server.stop();
-  }
-
-  public void join() throws InterruptedException {
-    server.join();
-  }
-
-  public static SyncServer getService(Configuration conf)
-      throws NumberFormatException, IOException {
-    String syncAddress = conf.get("hama.sync.server.address");
-    if (syncAddress == null || syncAddress.isEmpty()
-        || !syncAddress.contains(":")) {
-      throw new IllegalArgumentException(
-          "Server sync address must contain a colon and must be non-empty and not-null! Property
\"hama.sync.server.address\" was: "
-              + syncAddress);
-    }
-    String[] hostPort = syncAddress.split(":");
-    return (SyncServer) RPC.waitForProxy(SyncServer.class,
-        SyncServer.versionID,
-        new InetSocketAddress(hostPort[0], Integer.valueOf(hostPort[1])), conf);
-
-  }
-
-  @Override
-  public void enterBarrier(TaskAttemptID id) {
-    LOG.info("Task: " + id.getId() + " entered Barrier!");
-    if (partySet.contains(id.getId())) {
-      try {
-        barrier.await();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      } catch (BrokenBarrierException e) {
-        e.printStackTrace();
-      }
-    } else {
-      LOG.warn("TaskID " + id + " is no verified task!");
-    }
-  }
-
-  @Override
-  public void leaveBarrier(TaskAttemptID id) {
-    LOG.info("Task: " + id.getId() + " leaves Barrier!");
-    if (partySet.contains(id.getId())) {
-      try {
-        leaveBarrier.await();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      } catch (BrokenBarrierException e) {
-        e.printStackTrace();
-      }
-    } else {
-      LOG.warn("TaskID " + id + " is no verified task!");
-    }
-  }
-
-  @Override
-  public synchronized void register(TaskAttemptID id, Text hostAddress,
-      LongWritable port) {
-    partySet.add(id.getId());
-    String peer = hostAddress.toString() + ":" + port.get();
-    peerAddresses.add(peer);
-    LOG.info("Registered: " + id.getId() + " for peer " + peer);
-    if (partySet.size() > parties) {
-      LOG.warn("Registered more tasks than configured!");
-    }
-  }
-
-  @Override
-  public long getProtocolVersion(String protocol, long clientVersion)
-      throws IOException {
-    return clientVersion;
-  }
-
-  private static class SuperStepIncrementor implements Runnable {
-
-    private final SyncServerImpl instance;
-
-    public SuperStepIncrementor(SyncServerImpl syncServer) {
-      this.instance = syncServer;
-    }
-
-    @Override
-    public void run() {
-      synchronized (instance) {
-        this.instance.superstep += 1L;
-        LOG.info("Entering superstep: " + this.instance.superstep);
-      }
-    }
-
-  }
-
-  public static void main(String[] args) throws IOException,
-      InterruptedException {
-    LOG.info(Arrays.toString(args));
-    if (args.length == 3) {
-      SyncServerImpl syncServer = new SyncServerImpl(Integer.valueOf(args[0]),
-          args[1], Integer.valueOf(args[2]));
-      syncServer.start();
-      syncServer.join();
-    } else {
-      throw new IllegalArgumentException(
-          "Argument count does not match 3! Given size was " + args.length
-              + " and parameters were " + Arrays.toString(args));
-    }
-  }
-
-  @Override
-  public Long call() throws Exception {
-    this.start();
-    this.join();
-    return this.superstep;
-  }
-
-  @Override
-  public synchronized LongWritable getSuperStep() {
-    return new LongWritable(superstep);
-  }
-
-  @Override
-  public synchronized StringArrayWritable getAllPeerNames() {
-    return new StringArrayWritable(
-        peerAddresses.toArray(new String[peerAddresses.size()]));
-  }
-
-  @Override
-  public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
-      LongWritable port) {
-    // TODO Auto-generated method stub
-    // basically has to recreate the barriers and remove from the two basic
-    // sets.
-  }
-
-  @Override
-  public ProtocolSignature getProtocolSignature(String protocol,
-      long clientVersion, int clientMethodsHash) throws IOException {
-    // TODO Auto-generated method stub
-    return new ProtocolSignature();
-  }
-
-}



Mime
View raw message