hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153430 [4/9] - in /hadoop/common/branches/MR-279/mapreduce: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/loc...
Date Wed, 03 Aug 2011 11:32:10 GMT
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Aug  3 11:31:34 2011
@@ -19,6 +19,13 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
 
 import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
@@ -26,8 +33,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -40,40 +50,74 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
 
-public class ResourceTrackerService extends AbstractService 
-implements ResourceTracker{
+public class ResourceTrackerService extends AbstractService implements
+    ResourceTracker, RMNodeRemovalListener {
 
   private static final Log LOG = LogFactory.getLog(ResourceTrackerService.class);
 
   private static final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
-  private final RMResourceTrackerImpl resourceTracker;
+  private final RMContext rmContext;
+  private final NodesListManager nodesListManager;
+  private final NMLivelinessMonitor nmLivelinessMonitor;
+  private final ContainerTokenSecretManager containerTokenSecretManager;
+
+  /* we dont garbage collect on nodes. A node can come back up again and re register,
+   * so no use garbage collecting. Though admin can break the RM by bouncing 
+   * nodemanagers on different ports again and again.
+   */
+  private Map<String, NodeId> nodes = new ConcurrentHashMap<String, NodeId>();
+  private final AtomicInteger nodeCounter = new AtomicInteger(0);
 
   private Server server;
   private InetSocketAddress resourceTrackerAddress;
 
-  public ResourceTrackerService(RMResourceTrackerImpl resourceTracker) {
-    super(ResourceTrackerService.class.getName());
-    this.resourceTracker = resourceTracker;
+  private static final NodeHeartbeatResponse reboot = recordFactory
+      .newRecordInstance(NodeHeartbeatResponse.class);
+  static {
+    HeartbeatResponse rebootResp = recordFactory
+        .newRecordInstance(HeartbeatResponse.class);
+    rebootResp.setReboot(true);
+    reboot.setHeartbeatResponse(rebootResp);
   }
 
-  public RMResourceTrackerImpl getResourceTracker() {
-    return resourceTracker;
+  private final ConcurrentMap<NodeId, HeartbeatResponse> lastHeartBeats
+    = new ConcurrentHashMap<NodeId, HeartbeatResponse>();
+
+  public ResourceTrackerService(RMContext rmContext,
+      NodesListManager nodesListManager,
+      NMLivelinessMonitor nmLivelinessMonitor,
+      ContainerTokenSecretManager containerTokenSecretManager) {
+    super(ResourceTrackerService.class.getName());
+    this.rmContext = rmContext;
+    this.nodesListManager = nodesListManager;
+    this.nmLivelinessMonitor = nmLivelinessMonitor;
+    this.containerTokenSecretManager = containerTokenSecretManager;
   }
 
   @Override
   public synchronized void init(Configuration conf) {
-    super.init(conf);
     String resourceTrackerBindAddress =
       conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
           YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
     resourceTrackerAddress = NetUtils.createSocketAddr(resourceTrackerBindAddress);
-    resourceTracker.init(conf);
+
+    RackResolver.init(conf);
+    super.init(conf);
   }
 
   @Override
@@ -93,12 +137,10 @@ implements ResourceTracker{
               RMConfig.DEFAULT_RM_RESOURCE_TRACKER_THREADS));
     this.server.start();
 
-    resourceTracker.start();
   }
 
   @Override
   public synchronized void stop() {
-    resourceTracker.stop();
     if (this.server != null) {
       this.server.close();
     }
@@ -108,38 +150,207 @@ implements ResourceTracker{
   @Override
   public RegisterNodeManagerResponse registerNodeManager(
       RegisterNodeManagerRequest request) throws YarnRemoteException {
-    RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
-        RegisterNodeManagerResponse.class);
+
+    String host = request.getHost();
+    int cmPort = request.getContainerManagerPort();
+    int httpPort = request.getHttpPort();
+    Resource capability = request.getResource();
+
     try {
-      response.setRegistrationResponse(
-          resourceTracker.registerNodeManager(
-              request.getHost(), request.getContainerManagerPort(), 
-              request.getHttpPort(), request.getResource()));
+      // Check if this node is a 'valid' node
+      if (!this.nodesListManager.isValidNode(host)) {
+        LOG.info("Disallowed NodeManager from  " + host);
+        throw new IOException("Disallowed NodeManager from  " + host); 
+      }
+
+      String node = host + ":" + cmPort;
+      NodeId nodeId = mayBeCreateAndGetNodeId(node);
+   
+      createNewNode(nodeId, host, cmPort, httpPort, resolve(host), capability);
+
+      this.nmLivelinessMonitor.register(nodeId);
+
+      LOG.info("NodeManager from node " + host + 
+          "(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
+          + "registered with capability: " + capability.getMemory()
+          + ", assigned nodeId " + nodeId.getId());
+
+      RegistrationResponse regResponse = recordFactory.newRecordInstance(
+          RegistrationResponse.class);
+      regResponse.setNodeId(nodeId);
+      SecretKey secretKey =
+        this.containerTokenSecretManager.createAndGetSecretKey(node);
+      regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
+
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setRegistrationResponse(regResponse);
+      return response;
     } catch (IOException ioe) {
       LOG.info("Exception in node registration from " + request.getHost(), ioe);
       throw RPCUtil.getRemoteException(ioe);
     }
-    return response;
   }
 
   @Override
   public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
       throws YarnRemoteException {
-    NodeHeartbeatResponse response = recordFactory.newRecordInstance(
-        NodeHeartbeatResponse.class);
+
+    NodeStatus remoteNodeStatus = request.getNodeStatus();
     try {
-    response.setHeartbeatResponse(
-        resourceTracker.nodeHeartbeat(request.getNodeStatus()));
+      /**
+       * Here is the node heartbeat sequence...
+       * 1. Check if it's a registered node
+       * 2. Check if it's a valid (i.e. not excluded) node
+       * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
+       * 4. Send healthStatus to RMNode
+       */
+      
+      NodeId nodeId = remoteNodeStatus.getNodeId();
+      
+      // 1. Check if it's a registered node
+      RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+      if (rmNode == null) {
+        /* node does not exist */
+        LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
+        return reboot;
+      }
+
+      // Send ping
+      this.nmLivelinessMonitor.receivedPing(nodeId);
+
+      // 2. Check if it's a valid (i.e. not excluded) node
+      if (!this.nodesListManager.isValidNode(rmNode.getNodeHostName())) {
+        LOG.info("Disallowed NodeManager nodeId: " + nodeId +  
+            " hostname: " + rmNode.getNodeAddress());
+        throw new IOException("Disallowed NodeManager nodeId: " + 
+            remoteNodeStatus.getNodeId());
+      }
+
+      NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
+          .newRecordInstance(NodeHeartbeatResponse.class);
+
+      // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
+      if (remoteNodeStatus.getResponseId() + 1 == this.lastHeartBeats.get(nodeId)
+           .getResponseId()) {
+        LOG.info("Received duplicate heartbeat from node " + 
+            rmNode.getNodeAddress());
+        nodeHeartBeatResponse.setHeartbeatResponse(this.lastHeartBeats
+            .get(nodeId));
+        return nodeHeartBeatResponse;
+      } else if (remoteNodeStatus.getResponseId() + 1 < this.lastHeartBeats
+          .get(nodeId).getResponseId()) {
+        LOG.info("Too far behind rm response id:" +
+            this.lastHeartBeats.get(nodeId).getResponseId() + " nm response id:"
+            + remoteNodeStatus.getResponseId());
+        // TODO: Just sending reboot is not enough. Think more.
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
+        return reboot;
+      }
+
+      // 4. Send status to RMNode
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
+              remoteNodeStatus.getAllContainers()));
+
+      // Heartbeat response
+      HeartbeatResponse response = recordFactory
+          .newRecordInstance(HeartbeatResponse.class);
+      response
+          .setResponseId(this.lastHeartBeats.get(nodeId).getResponseId() + 1);
+      response.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
+      response.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
+
+      // Save the response
+      this.lastHeartBeats.put(nodeId, response);
+      nodeHeartBeatResponse.setHeartbeatResponse(response);
+      return nodeHeartBeatResponse;
     } catch (IOException ioe) {
       LOG.info("Exception in heartbeat from node " + 
           request.getNodeStatus().getNodeId(), ioe);
       throw RPCUtil.getRemoteException(ioe);
     }
-    return response;
   }
 
   public void recover(RMState state) {
-    resourceTracker.recover(state);
+//
+//    List<RMNode> nodeManagers = state.getStoredNodeManagers();
+//    for (RMNode nm : nodeManagers) {
+//      createNewNode(nm.getNodeID(), nm.getNodeHostName(), nm
+//          .getCommandPort(), nm.getHttpPort(), nm.getNode(), nm
+//          .getTotalCapability());
+//    }
+//    for (Map.Entry<ApplicationId, ApplicationInfo> entry : state
+//        .getStoredApplications().entrySet()) {
+//      List<Container> containers = entry.getValue().getContainers();
+//      List<Container> containersToAdd = new ArrayList<Container>();
+//      for (Container c : containers) {
+//        RMNode containerNode = this.rmContext.getNodesCollection()
+//            .getNodeInfo(c.getNodeId());
+//        containersToAdd.add(c);
+//        containerNode.allocateContainer(entry.getKey(), containersToAdd);
+//        containersToAdd.clear();
+//      }
+//    }
+  }
+
+  private void createNewNode(NodeId nodeId, String hostName, int cmPort,
+      int httpPort, Node node, Resource capability) throws IOException {
+
+    RMNode rmNode = new RMNodeImpl(nodeId, rmContext, hostName, cmPort,
+        httpPort, node, capability, this);
+
+    if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
+      throw new IOException("Duplicate registration from the node!");
+    }
+
+    // Record the new node
+    synchronized (nodes) {
+      LOG.info("DEBUG -- Adding  " + hostName);
+      HeartbeatResponse response = recordFactory
+          .newRecordInstance(HeartbeatResponse.class);
+      response.setResponseId(0);
+      this.lastHeartBeats.put(nodeId, response);
+      nodes.put(rmNode.getNodeAddress(), nodeId);
+    }
+  }
+
+  @Override
+  public void RMNodeRemoved(NodeId nodeId) {
+    RMNode node = null;  
+    synchronized (nodes) {
+      node = this.rmContext.getRMNodes().get(nodeId);
+      if (node != null) {
+        nodes.remove(node.getNodeAddress());
+        this.lastHeartBeats.remove(nodeId);
+      } else {
+        LOG.warn("Unknown node " + nodeId + " unregistered");
+      }
+    }
+    
+    if (node != null) {
+      this.rmContext.getRMNodes().remove(nodeId);
+    }
+  }
+  
+  private  NodeId mayBeCreateAndGetNodeId(String node) {
+    NodeId nodeId;
+    nodeId = nodes.get(node);
+    if (nodeId == null) {
+      nodeId = recordFactory.newRecordInstance(NodeId.class);
+      nodeId.setId(nodeCounter.getAndIncrement());
+    }
+    return nodeId;
+  }
+
+  /**
+   * resolving the network topology.
+   * @param hostName the hostname of this node.
+   * @return the resolved {@link Node} for this nodemanager.
+   */
+  public static Node resolve(String hostName) {
+    return RackResolver.resolve(hostName);
   }
 
 }

Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Wed Aug  3 11:31:34 2011
@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -42,12 +42,12 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -63,11 +63,11 @@ import org.apache.hadoop.yarn.security.A
 import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMFinishEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 
 /**
  * The launch of the AM itself.
@@ -78,7 +78,7 @@ public class AMLauncher implements Runna
 
   private ContainerManager containerMgrProxy;
 
-  private final Application application;
+  private final RMAppAttempt application;
   private final Configuration conf;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private final ApplicationTokenSecretManager applicationTokenSecretManager;
@@ -89,7 +89,7 @@ public class AMLauncher implements Runna
   private final EventHandler handler;
   
   @SuppressWarnings("unchecked")
-  public AMLauncher(RMContext asmContext, Application application,
+  public AMLauncher(RMContext rmContext, RMAppAttempt application,
       AMLauncherEventType eventType,ApplicationTokenSecretManager applicationTokenSecretManager,
       ClientToAMSecretManager clientToAMSecretManager, Configuration conf) {
     this.application = application;
@@ -100,7 +100,7 @@ public class AMLauncher implements Runna
         YarnConfiguration.YARN_SECURITY_INFO,
         ContainerManagerSecurityInfo.class, SecurityInfo.class);
     this.eventType = eventType;
-    this.handler = asmContext.getDispatcher().getEventHandler();
+    this.handler = rmContext.getDispatcher().getEventHandler();
   }
   
   private void connect() throws IOException {
@@ -116,14 +116,16 @@ public class AMLauncher implements Runna
     ApplicationSubmissionContext applicationContext =
       application.getSubmissionContext();
     LOG.info("Setting up container " + application.getMasterContainer() 
-        + " for AM " + application.getMaster());  
+        + " for AM " + application.getAppAttemptId());  
     ContainerLaunchContext launchContext =
         createAMContainerLaunchContext(applicationContext, masterContainerID);
     StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
     request.setContainerLaunchContext(launchContext);
     containerMgrProxy.startContainer(request);
     LOG.info("Done launching container " + application.getMasterContainer() 
-        + " for AM " + application.getMaster());
+        + " for AM " + application.getAppAttemptId());
+    this.handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
+        RMAppAttemptEventType.LAUNCHED));
   }
   
   private void cleanup() throws IOException {
@@ -172,7 +174,8 @@ public class AMLauncher implements Runna
     ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
     container.addAllCommands(applicationMasterContext.getCommandList());
     StringBuilder mergedCommand = new StringBuilder();
-    String failCount = Integer.toString(application.getFailedCount());
+    String failCount = Integer.toString(application.getAppAttemptId()
+        .getAttemptId());
     List<String> commandList = new ArrayList<String>();
     for (String str : container.getCommandList()) {
       // This is out-right wrong. AM FAIL count should be passed via env.
@@ -219,7 +222,7 @@ public class AMLauncher implements Runna
       }
 
       ApplicationTokenIdentifier id = new ApplicationTokenIdentifier(
-          application.getApplicationID());
+          application.getAppAttemptId().getApplicationId());
       Token<ApplicationTokenIdentifier> token =
           new Token<ApplicationTokenIdentifier>(id,
               this.applicationTokenSecretManager);
@@ -244,7 +247,7 @@ public class AMLauncher implements Runna
       asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
 
       ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
-          this.application.getApplicationID());
+          application.getAppAttemptId().getApplicationId());
       SecretKey clientSecretKey =
           this.clientToAMSecretManager.getMasterKey(identifier);
       String encoded =
@@ -259,20 +262,22 @@ public class AMLauncher implements Runna
   public void run() {
     switch (eventType) {
     case LAUNCH:
-      ApplicationEventType targetEventType = ApplicationEventType.LAUNCHED;
       try {
-        LOG.info("Launching master" + application.getMaster());
+        LOG.info("Launching master" + application.getAppAttemptId());
         launch();
+        handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
+            RMAppAttemptEventType.LAUNCHED));
       } catch(Exception ie) {
-        LOG.info("Error launching ", ie);
-        targetEventType = ApplicationEventType.LAUNCH_FAILED;
+        String message = "Error launching " + application.getAppAttemptId()
+            + ". Got exception: " + StringUtils.stringifyException(ie);
+        LOG.info(message);
+        handler.handle(new RMAppAttemptLaunchFailedEvent(application
+            .getAppAttemptId(), message));
       }
-      handler.handle(new ApplicationEvent(targetEventType, application
-          .getApplicationID()));
       break;
     case CLEANUP:
       try {
-        LOG.info("Cleaning master " + application.getMaster());
+        LOG.info("Cleaning master " + application.getAppAttemptId());
         cleanup();
       } catch(IOException ie) {
         LOG.info("Error cleaning master ", ie);

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+public class AMLauncherEvent extends AbstractEvent<AMLauncherEventType> {
+
+  private final RMAppAttempt appAttempt;
+
+  public AMLauncherEvent(AMLauncherEventType type, RMAppAttempt appAttempt) {
+    super(type);
+    this.appAttempt = appAttempt;
+  }
+
+  public RMAppAttempt getAppAttempt() {
+    return this.appAttempt;
+  }
+
+}

Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEventType.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/AMLauncherEventType.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEventType.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEventType.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/AMLauncherEventType.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/AMLauncherEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEventType.java Wed Aug  3 11:31:34 2011
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events;
+package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
 
 public enum AMLauncherEventType {
   LAUNCH,

Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java Wed Aug  3 11:31:34 2011
@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -27,13 +27,13 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 
-public class ApplicationMasterLauncher extends AbstractService implements EventHandler<ASMEvent<AMLauncherEventType>> {
+public class ApplicationMasterLauncher extends AbstractService implements
+    EventHandler<AMLauncherEvent> {
   private static final Log LOG = LogFactory.getLog(
       ApplicationMasterLauncher.class);
   private final ThreadPoolExecutor launcherPool;
@@ -67,13 +67,13 @@ public class ApplicationMasterLauncher e
     super.start();
   }
   
-  protected Runnable createRunnableLauncher(Application application, AMLauncherEventType event) {
+  protected Runnable createRunnableLauncher(RMAppAttempt application, AMLauncherEventType event) {
     Runnable launcher = new AMLauncher(context, application, event,
         applicationTokenSecretManager, clientToAMSecretManager, getConfig());
     return launcher;
   }
   
-  private void launch(Application application) {
+  private void launch(RMAppAttempt application) {
     Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.LAUNCH);
     masterEvents.add(launcher);
   }
@@ -106,15 +106,15 @@ public class ApplicationMasterLauncher e
     }
   }    
 
-  private void cleanup(Application application) {
+  private void cleanup(RMAppAttempt application) {
     Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.CLEANUP);
     masterEvents.add(launcher);
   } 
   
   @Override
-  public synchronized void  handle(ASMEvent<AMLauncherEventType> appEvent) {
+  public synchronized void  handle(AMLauncherEvent appEvent) {
     AMLauncherEventType event = appEvent.getType();
-    Application application = appEvent.getApplication();
+    RMAppAttempt application = appEvent.getAppAttempt();
     switch (event) {
     case LAUNCH:
       launch(application);

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,295 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.ams;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+@Private
+public class ApplicationMasterService extends AbstractService implements 
+AMRMProtocol, EventHandler<ApplicationMasterServiceEvent> {
+  private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
+  private final AMLivelinessMonitor amLivelinessMonitor;
+  private YarnScheduler rScheduler;
+  private ApplicationTokenSecretManager appTokenManager;
+  private InetSocketAddress masterServiceAddress;
+  private Server server;
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap =
+      new ConcurrentHashMap<ApplicationAttemptId, AMResponse>();
+  private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
+  private final RMContext rmContext;
+  
+  public ApplicationMasterService(RMContext rmContext,
+      AMLivelinessMonitor amLivelinessMonitor,
+      ApplicationTokenSecretManager appTokenManager, YarnScheduler scheduler) {
+    super(ApplicationMasterService.class.getName());
+    this.amLivelinessMonitor = amLivelinessMonitor;
+    this.appTokenManager = appTokenManager;
+    this.rScheduler = scheduler;
+    this.reboot.setReboot(true);
+//    this.reboot.containers = new ArrayList<Container>();
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    String bindAddress =
+      conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+    masterServiceAddress =  NetUtils.createSocketAddr(bindAddress);
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    Configuration serverConf = new Configuration(getConfig());
+    serverConf.setClass(
+        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        SchedulerSecurityInfo.class, SecurityInfo.class);
+    this.server =
+      rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
+          serverConf, this.appTokenManager,
+          serverConf.getInt(RMConfig.RM_AM_THREADS, 
+              RMConfig.DEFAULT_RM_AM_THREADS));
+    this.server.start();
+    super.start();
+  }
+  
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnRemoteException {
+
+    ApplicationAttemptId applicationAttemptId = request
+        .getApplicationAttemptId();
+    AMResponse lastResponse = responseMap.get(applicationAttemptId);
+    if (lastResponse == null) {
+      String message = "Application doesn't exist in cache "
+          + applicationAttemptId;
+      LOG.error(message);
+      throw RPCUtil.getRemoteException(message);
+    }
+
+    // Allow only one thread in AM to do registerApp at a time.
+    synchronized (lastResponse) {
+
+      LOG.info("AM registration " + applicationAttemptId);
+      this.amLivelinessMonitor.receivedPing(applicationAttemptId);
+
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRegistrationEvent(applicationAttemptId, request
+              .getHost(), request.getRpcPort(), request.getTrackingUrl()));
+
+      // Pick up min/max resource from scheduler...
+      RegisterApplicationMasterResponse response = recordFactory
+          .newRecordInstance(RegisterApplicationMasterResponse.class);
+      response.setMinimumResourceCapability(rScheduler
+          .getMinimumResourceCapability());
+      response.setMaximumResourceCapability(rScheduler
+          .getMaximumResourceCapability());
+      return response;
+    }
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnRemoteException {
+
+    ApplicationAttemptId applicationAttemptId = request
+        .getApplicationAttemptId();
+    AMResponse lastResponse = responseMap.get(applicationAttemptId);
+    if (lastResponse == null) {
+      String message = "Application doesn't exist in cache "
+          + applicationAttemptId;
+      LOG.error(message);
+      throw RPCUtil.getRemoteException(message);
+    }
+
+    // Allow only one thread in AM to do finishApp at a time.
+    synchronized (lastResponse) {
+
+      this.amLivelinessMonitor.receivedPing(applicationAttemptId);
+
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
+              .getTrackingUrl(), request.getFinalState(), request
+              .getDiagnostics()));
+
+      FinishApplicationMasterResponse response = recordFactory
+          .newRecordInstance(FinishApplicationMasterResponse.class);
+      return response;
+    }
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnRemoteException {
+
+    ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
+    ApplicationId applicationId = appAttemptId.getApplicationId();
+
+    this.amLivelinessMonitor.receivedPing(appAttemptId);
+
+    /* check if its in cache */
+    AllocateResponse allocateResponse = recordFactory
+        .newRecordInstance(AllocateResponse.class);
+    AMResponse lastResponse = responseMap.get(applicationId);
+    if (lastResponse == null) {
+      LOG.error("Application doesnt exist in cache " + applicationId);
+      allocateResponse.setAMResponse(reboot);
+      return allocateResponse;
+    }
+    if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
+      /* old heartbeat */
+      allocateResponse.setAMResponse(lastResponse);
+      return allocateResponse;
+    } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
+      LOG.error("Invalid responseid from application " + applicationId);
+      // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
+      allocateResponse.setAMResponse(reboot);
+      return allocateResponse;
+    }
+
+    // Allow only one thread in AM to do heartbeat at a time.
+    synchronized (lastResponse) {
+
+      // Send the status update to the appAttempt.
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptStatusupdateEvent(appAttemptId, request
+              .getProgress()));
+
+      List<ResourceRequest> ask = request.getAskList();
+      List<Container> release = request.getReleaseList();
+
+      // Send new requests to appAttempt.
+      if (!ask.isEmpty()) {
+        this.rScheduler.allocate(appAttemptId, ask);
+      }
+
+      // Send events to the containers being released.
+      for (Container releasedContainer : release) {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMContainerEvent(releasedContainer.getId(),
+                RMContainerEventType.RELEASED));
+      }
+
+      RMApp app = this.rmContext.getRMApps().get(applicationId);
+      RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
+
+      // Get the list of finished containers.
+      List<Container> finishedContainers = appAttempt
+          .pullJustFinishedContainers();
+
+      // Get the list of newly allocated containers.
+      List<Container> newlyAllocatedContainers = appAttempt
+          .pullNewlyAllocatedContainers();
+
+      // TODO: For now all containers are combined
+      List<Container> allContainers = new ArrayList<Container>(
+          finishedContainers);
+      allContainers.addAll(newlyAllocatedContainers);
+
+      AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+      response.addAllContainers(allContainers);
+      response.setResponseId(lastResponse.getResponseId() + 1);
+      response.setAvailableResources(rScheduler
+          .getResourceLimit(appAttemptId));
+      responseMap.put(appAttemptId, response);
+      allocateResponse.setAMResponse(response);
+      return allocateResponse;
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (this.server != null) {
+      this.server.close();
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handle(ApplicationMasterServiceEvent amsEvent) {
+    ApplicationMasterServiceEventType eventType = amsEvent.getType();
+    ApplicationAttemptId attemptId = amsEvent.getAppAttemptId();
+    switch (eventType) {
+    case REGISTER:
+      AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+      response.setResponseId(0);
+      responseMap.put(attemptId, response);
+      break;
+    case UNREGISTER:
+      AMResponse lastResponse = responseMap.get(attemptId);
+      if (lastResponse != null) {
+        synchronized (lastResponse) {
+          responseMap.remove(attemptId);
+        }
+      }
+      break;
+    default:
+      LOG.error("Unknown event " + eventType + ". Ignoring..");
+    }
+  }
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,21 @@
+package org.apache.hadoop.yarn.server.resourcemanager.ams;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class ApplicationMasterServiceEvent extends
+    AbstractEvent<ApplicationMasterServiceEventType> {
+
+  private final ApplicationAttemptId attemptId;
+
+  public ApplicationMasterServiceEvent(ApplicationAttemptId attemptId,
+      ApplicationMasterServiceEventType type) {
+    super(type);
+    this.attemptId = attemptId;
+  }
+
+  public ApplicationAttemptId getAppAttemptId() {
+    return this.attemptId;
+  }
+
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEventType.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEventType.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEventType.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,6 @@
+package org.apache.hadoop.yarn.server.resourcemanager.ams;
+
+public enum ApplicationMasterServiceEventType {
+  REGISTER,
+  UNREGISTER
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java Wed Aug  3 11:31:34 2011
@@ -7,14 +7,12 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -45,10 +43,10 @@ public class MemStore implements Store {
   }
 
   @Override
-  public void storeNode(NodeManager node) throws IOException {}
+  public void storeNode(RMNode node) throws IOException {}
 
   @Override
-  public void removeNode(NodeManager node) throws IOException {}
+  public void removeNode(RMNode node) throws IOException {}
 
   private class ApplicationStoreImpl implements ApplicationStore {
     @Override
@@ -102,8 +100,8 @@ public class MemStore implements Store {
     }
 
     @Override
-    public List<NodeManager> getStoredNodeManagers()  {
-      return new ArrayList<NodeManager>();
+    public List<RMNode> getStoredNodeManagers()  {
+      return new ArrayList<RMNode>();
     }
 
     @Override

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java Wed Aug  3 11:31:34 2011
@@ -21,12 +21,12 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 
 public interface NodeStore {
-  public void storeNode(NodeManager node) throws IOException;
-  public void removeNode(NodeManager node) throws IOException;
+  public void storeNode(RMNode node) throws IOException;
+  public void removeNode(RMNode node) throws IOException;
   public NodeId getNextNodeId() throws IOException;
   public boolean isLoggable();
 }
\ No newline at end of file

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java Wed Aug  3 11:31:34 2011
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 
 public interface Store extends NodeStore, ApplicationsStore {
@@ -37,7 +37,7 @@ public interface Store extends NodeStore
     public List<Container> getContainers();
   }
   public interface RMState {
-    public List<NodeManager> getStoredNodeManagers() ;
+    public List<RMNode> getStoredNodeManagers() ;
     public Map<ApplicationId, ApplicationInfo> getStoredApplications();
     public NodeId getLastLoggedNodeId();
   }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Wed Aug  3 11:31:34 2011
@@ -35,23 +35,23 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeManagerInfo;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationMasterPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeManagerInfoPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationMasterProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeManagerInfoProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -105,22 +105,23 @@ public class ZKStore implements Store {
     return new ZKWatcher();   
   }
 
-  private NodeManagerInfoPBImpl createNodeManagerInfo(NodeManager nodeInfo) {
-    NodeManagerInfo node = 
-      recordFactory.newRecordInstance(NodeManagerInfo.class);
+  private NodeReportPBImpl createNodeManagerInfo(RMNode nodeInfo) {
+    NodeReport node = 
+      recordFactory.newRecordInstance(NodeReport.class);
     node.setNodeAddress(nodeInfo.getNodeAddress());
     node.setRackName(nodeInfo.getRackName());
     node.setCapability(nodeInfo.getTotalCapability());
-    node.setUsed(nodeInfo.getUsedResource());
+    // TODO: FIXME
+//    node.setUsed(nodeInfo.getUsedResource());
     node.setNumContainers(nodeInfo.getNumContainers());
-    return (NodeManagerInfoPBImpl)node;
+    return (NodeReportPBImpl)node;
   }
 
   @Override
-  public synchronized void storeNode(NodeManager node) throws IOException {
+  public synchronized void storeNode(RMNode node) throws IOException {
     /** create a storage node and store it in zk **/
     if (!doneWithRecovery) return;
-    NodeManagerInfoPBImpl nodeManagerInfo = createNodeManagerInfo(node);
+    NodeReportPBImpl nodeManagerInfo = createNodeManagerInfo(node);
     byte[] bytes = nodeManagerInfo.getProto().toByteArray();
     try {
       zkClient.create(NODES + Integer.toString(node.getNodeID().getId()), bytes, null,
@@ -135,7 +136,7 @@ public class ZKStore implements Store {
   }
 
   @Override
-  public synchronized void removeNode(NodeManager node) throws IOException {
+  public synchronized void removeNode(RMNode node) throws IOException {
     if (!doneWithRecovery) return;
     
     /** remove a storage node **/
@@ -364,7 +365,7 @@ public class ZKStore implements Store {
   }
 
   private class ZKRMState implements RMState {
-    private List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
+    private List<RMNode> nodeManagers = new ArrayList<RMNode>();
     private Map<ApplicationId, ApplicationInfo> applications = new 
     HashMap<ApplicationId, ApplicationInfo>();
 
@@ -372,17 +373,17 @@ public class ZKStore implements Store {
       LOG.info("Restoring RM state from ZK");
     }
 
-    private synchronized List<NodeManagerInfo> listStoredNodes() throws IOException {
+    private synchronized List<NodeReport> listStoredNodes() throws IOException {
       /** get the list of nodes stored in zk **/
       //TODO PB
-      List<NodeManagerInfo> nodes = new ArrayList<NodeManagerInfo>();
+      List<NodeReport> nodes = new ArrayList<NodeReport>();
       Stat stat = new Stat();
       try {
         List<String> children = zkClient.getChildren(NODES, false);
         for (String child: children) {
           byte[] data = zkClient.getData(NODES + child, false, stat);
-          NodeManagerInfoPBImpl nmImpl = new NodeManagerInfoPBImpl(
-              NodeManagerInfoProto.parseFrom(data));
+          NodeReportPBImpl nmImpl = new NodeReportPBImpl(
+              NodeReportProto.parseFrom(data));
           nodes.add(nmImpl);
         }
       } catch (InterruptedException ie) {
@@ -396,7 +397,7 @@ public class ZKStore implements Store {
     }
 
     @Override
-    public List<NodeManager> getStoredNodeManagers()  {
+    public List<RMNode> getStoredNodeManagers()  {
       return nodeManagers;
     }
 
@@ -453,10 +454,10 @@ public class ZKStore implements Store {
     }
 
     private void load() throws IOException {
-      List<NodeManagerInfo> nodeInfos = listStoredNodes();
+      List<NodeReport> nodeInfos = listStoredNodes();
       final Pattern trackerPattern = Pattern.compile(".*:.*");
       final Matcher m = trackerPattern.matcher("");
-      for (NodeManagerInfo node: nodeInfos) {
+      for (NodeReport node: nodeInfos) {
         m.reset(node.getNodeAddress());
         if (!m.find()) {
           LOG.info("Skipping node, bad node-address " + node.getNodeAddress());
@@ -470,10 +471,10 @@ public class ZKStore implements Store {
           continue;
         }
         int httpPort = Integer.valueOf(m.group(1));
-        NodeManager nm = new NodeManagerImpl(node.getNodeId(),
+        RMNode nm = new RMNodeImpl(node.getNodeId(), null,
             hostName, cmPort, httpPort,
-            RMResourceTrackerImpl.resolve(node.getNodeAddress()), 
-            node.getCapability());
+            ResourceTrackerService.resolve(node.getNodeAddress()), 
+            node.getCapability(), null);
         nodeManagers.add(nm);
       }
       readLastNodeId();

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,40 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+public interface RMApp extends EventHandler<RMAppEvent>{
+
+  ApplicationId getApplicationId();
+
+  RMAppState getState();
+
+  String getUser();
+
+  float getProgress();
+
+  RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId);
+
+  String getQueue();
+
+  String getName();
+
+  RMAppAttempt getCurrentAppAttempt();
+
+  ApplicationReport createAndGetApplicationReport();
+
+  ApplicationStore getApplicationStore();
+
+  long getFinishTime();
+
+  long getStartTime();
+
+  String getTrackingUrl();
+
+  StringBuilder getDiagnostics();
+
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMAppEvent extends AbstractEvent<RMAppEventType>{
+
+  private final ApplicationId appId;
+
+  public RMAppEvent(ApplicationId appId, RMAppEventType type) {
+    super(type);
+    this.appId = appId;
+  }
+
+  public ApplicationId getApplicationId() {
+    return this.appId;
+  }
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,15 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+public enum RMAppEventType {
+  // Source: ClientRMService
+  START,
+  KILL,
+
+  // Source: RMAppAttempt
+  APP_REJECTED,
+  APP_ACCEPTED,
+  ATTEMPT_REGISTERED,
+  ATTEMPT_FINISHED, // Will send the final state
+  ATTEMPT_FAILED,
+  ATTEMPT_KILLED
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,420 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+public class RMAppImpl implements RMApp {
+
+  private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
+
+  // Immutable fields
+  private final ApplicationId applicationId;
+  private final RMContext rmContext;
+  private final Configuration conf;
+  private final String user;
+  private final String queue;
+  private final String name;
+  private final ApplicationSubmissionContext submissionContext;
+  private final String clientTokenStr;
+  private final ApplicationStore appStore;
+  private final Dispatcher dispatcher;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private final Map<ApplicationAttemptId, RMAppAttempt> attempts
+      = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
+
+  // Mutable fields
+  private long startTime;
+  private long finishTime;
+  private StringBuilder diagnostics;
+  private AMLivelinessMonitor amLivelinessMonitor;
+  private YarnScheduler scheduler;
+  private int maxRetries;
+  private RMAppAttempt currentAttempt;
+
+  private static final StateMachineFactory<RMAppImpl,
+                                           RMAppState,
+                                           RMAppEventType,
+                                           RMAppEvent> stateMachineFactory 
+                               = new StateMachineFactory<RMAppImpl,
+                                           RMAppState,
+                                           RMAppEventType,
+                                           RMAppEvent>(RMAppState.NEW)
+
+     // Transitions from NEW state
+    .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
+        RMAppEventType.START, new StartAppAttemptTransition())
+    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
+        new NewAppKilledTransition())
+
+     // Transitions from SUBMITTED state
+    .addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
+        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
+        RMAppEventType.APP_ACCEPTED)
+
+     // Transitions from ACCEPTED state
+    .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
+        RMAppEventType.ATTEMPT_REGISTERED)
+    .addTransition(RMAppState.ACCEPTED,
+        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FAILED),
+        RMAppEventType.ATTEMPT_FAILED,
+        new AttemptFailedTransition(RMAppState.ACCEPTED))
+    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
+        RMAppEventType.KILL)
+
+     // Transitions from RUNNING state
+    .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
+        RMAppEventType.ATTEMPT_FINISHED)
+    .addTransition(RMAppState.RUNNING,
+        EnumSet.of(RMAppState.RESTARTING, RMAppState.FAILED),
+        RMAppEventType.ATTEMPT_FAILED,
+        new AttemptFailedTransition(RMAppState.RUNNING))
+    .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
+        RMAppEventType.KILL)
+
+     // Transitions from RESTARTING state
+    .addTransition(RMAppState.RESTARTING, RMAppState.RUNNING,
+        RMAppEventType.ATTEMPT_REGISTERED)
+    .addTransition(RMAppState.RESTARTING,
+        EnumSet.of(RMAppState.RESTARTING, RMAppState.FAILED),
+        RMAppEventType.ATTEMPT_FAILED,
+        new AttemptFailedTransition(RMAppState.RESTARTING))
+    .addTransition(RMAppState.RESTARTING, RMAppState.KILLED,
+        RMAppEventType.KILL)
+
+     // Transitions from FINISHED state
+    .addTransition(RMAppState.FINISHED, RMAppState.RUNNING,
+        RMAppEventType.KILL)
+
+     // Transitions from FAILED state
+    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+        RMAppEventType.KILL)
+
+     // Transitions from KILLED state
+    .addTransition(
+        RMAppState.KILLED,
+        RMAppState.KILLED,
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED,
+            RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_KILLED))
+
+     .installTopology();
+
+  private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
+                                                                 stateMachine;
+
+  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
+      Configuration config, String name, String user, String queue,
+      ApplicationSubmissionContext submissionContext, String clientTokenStr,
+      ApplicationStore appStore, AMLivelinessMonitor amLivelinessMonitor,
+      YarnScheduler scheduler) {
+
+    this.applicationId = applicationId;
+    this.name = name;
+    this.rmContext = rmContext;
+    this.dispatcher = rmContext.getDispatcher();
+    this.conf = config;
+    this.user = user;
+    this.queue = queue;
+    this.submissionContext = submissionContext;
+    this.clientTokenStr = clientTokenStr;
+    this.appStore = appStore;
+    this.amLivelinessMonitor = amLivelinessMonitor;
+    this.scheduler = scheduler;
+
+    this.maxRetries = conf.getInt(RMConfig.AM_MAX_RETRIES,
+        RMConfig.DEFAULT_AM_MAX_RETRIES);
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.readLock = lock.readLock();
+    this.writeLock = lock.writeLock();
+
+    this.stateMachine = stateMachineFactory.make(this);
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return this.getApplicationId();
+  }
+
+  @Override
+  public RMAppState getState() {
+    this.readLock.lock();
+
+    try {
+      return this.stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public String getUser() {
+    return this.user;
+  }
+
+  @Override
+  public float getProgress() {
+    this.readLock.lock();
+
+    try {
+      if (this.currentAttempt != null) {
+        return this.currentAttempt.getProgress();
+      }
+      return 0;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
+    this.readLock.lock();
+
+    try {
+      return this.attempts.get(appAttemptId);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public String getQueue() {
+    return this.queue;
+  }
+
+  @Override
+  public String getName() {
+    return this.name;
+  }
+
+  @Override
+  public RMAppAttempt getCurrentAppAttempt() {
+    this.readLock.lock();
+
+    try {
+      return this.currentAttempt;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public ApplicationStore getApplicationStore() {
+    return this.appStore;
+  }
+
+  private ApplicationState createApplicationState(RMAppState rmAppState) {
+    switch(rmAppState) {
+    case NEW:
+      return ApplicationState.NEW;
+    case SUBMITTED:
+    case ACCEPTED:
+      return ApplicationState.SUBMITTED;
+    case RESTARTING:
+      return ApplicationState.RESTARTING;
+    case RUNNING:
+      return ApplicationState.RUNNING;
+    case FINISHED:
+      return ApplicationState.SUCCEEDED;
+    case KILLED:
+      return ApplicationState.KILLED;
+    case FAILED:
+      return ApplicationState.FAILED;
+    }
+    throw new YarnException("Unknown state passed!");
+  }
+
+  @Override
+  public ApplicationReport createAndGetApplicationReport() {
+    this.readLock.lock();
+
+    try {
+      return BuilderUtils.newApplicationReport(this.applicationId, this.user,
+          this.queue, this.name, this.currentAttempt.getHost(),
+          this.currentAttempt.getRpcPort(), this.currentAttempt
+              .getClientToken(), createApplicationState(this.stateMachine
+              .getCurrentState()), this.diagnostics.toString(),
+          this.currentAttempt.getTrackingUrl());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public long getFinishTime() {
+    this.readLock.lock();
+
+    try {
+      return this.finishTime;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public long getStartTime() {
+    this.readLock.lock();
+
+    try {
+      return this.startTime;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public String getTrackingUrl() {
+    this.readLock.lock();
+
+    try {
+      return this.currentAttempt.getTrackingUrl();
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public StringBuilder getDiagnostics() {
+    this.readLock.lock();
+
+    try {
+      return this.diagnostics;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public void handle(RMAppEvent event) {
+
+    this.writeLock.lock();
+
+    try {
+      ApplicationId appID = event.getApplicationId();
+      LOG.info("Processing event for " + appID + " of type "
+          + event.getType());
+      final RMAppState oldState = getState();
+      try {
+        /* keep the master in sync with the state machine */
+        this.stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        /* TODO fail the application on the failed transition */
+      }
+
+      if (oldState != getState()) {
+        LOG.info(appID + " State change from " + oldState + " to "
+            + getState());
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private static class RMAppTransition implements
+      SingleArcTransition<RMAppImpl, RMAppEvent> {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+    };
+
+  }
+
+  private static final class StartAppAttemptTransition extends RMAppTransition {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+
+      ApplicationAttemptId appAttemptId = Records
+          .newRecord(ApplicationAttemptId.class);
+      appAttemptId.setApplicationId(app.applicationId);
+      appAttemptId.setAttemptId(app.attempts.size() + 1);
+
+      RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
+          app.clientTokenStr, app.rmContext, app.scheduler, app.submissionContext);
+      app.attempts.put(appAttemptId, attempt);
+      app.currentAttempt = attempt;
+      app.dispatcher.getEventHandler().handle(
+          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+    };
+  }
+
+  private static final class NewAppKilledTransition extends RMAppTransition {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      app.diagnostics.append("Application killed by user.");
+    };
+  }
+
+  private static final class AppRejectedTransition extends
+      RMAppTransition {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event;
+      app.diagnostics.append(rejectedEvent.getMessage());
+    };
+  }
+
+  private static final class AttemptFailedTransition implements
+      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+
+    private final RMAppState initialState;
+
+    public AttemptFailedTransition(RMAppState initialState) {
+      this.initialState = initialState;
+    }
+
+    @Override
+    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+
+      if (app.attempts.size() == app.maxRetries) {
+        app.diagnostics.append("Application " + app.getApplicationId()
+            + " failed " + app.maxRetries
+            + " times. Failing the application.");
+        return RMAppState.FAILED;
+      }
+
+      ApplicationAttemptId appAttemptId = Records
+          .newRecord(ApplicationAttemptId.class);
+      appAttemptId.setApplicationId(app.applicationId);
+      appAttemptId.setAttemptId(app.attempts.size() + 1);
+
+      // Create a new attempt.
+      RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
+          app.clientTokenStr, app.rmContext, app.scheduler,
+          app.submissionContext);
+      app.attempts.put(appAttemptId, attempt);
+      app.currentAttempt = attempt;
+      app.dispatcher.getEventHandler().handle(
+          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+      return initialState;
+    }
+
+  }
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,17 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppRejectedEvent extends RMAppEvent {
+
+  private final String message;
+
+  public RMAppRejectedEvent(ApplicationId appId, String message) {
+    super(appId, RMAppEventType.APP_REJECTED);
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return this.message;
+  }
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,5 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+public enum RMAppState {
+  NEW, SUBMITTED, ACCEPTED, RUNNING, RESTARTING, FINISHED, FAILED, KILLED
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+
+public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {
+
+  private EventHandler dispatcher;
+  
+  public AMLivelinessMonitor(Dispatcher d) {
+    super("AMLivelinessMonitor", new SystemClock());
+    this.dispatcher = d.getEventHandler();
+  }
+
+  public void init(Configuration conf) {
+    super.init(conf);
+    setExpireInterval(conf.getInt(YarnConfiguration.AM_EXPIRY_INTERVAL,
+        RMConfig.DEFAULT_AM_EXPIRY_INTERVAL));
+    setMonitorInterval(conf.getInt(RMConfig.AMLIVELINESS_MONITORING_INTERVAL,
+        RMConfig.DEFAULT_AMLIVELINESS_MONITORING_INTERVAL));
+  }
+
+  @Override
+  protected void expire(ApplicationAttemptId id) {
+    dispatcher.handle(
+        new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,35 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
+
+  ApplicationAttemptId getAppAttemptId();
+
+  RMAppAttemptState getAppAttemptState();
+
+  String getHost();
+
+  int getRpcPort();
+
+  String getTrackingUrl();
+
+  String getClientToken();
+
+  StringBuilder getDiagnostics();
+
+  float getProgress();
+
+  List<Container> pullJustFinishedContainers();
+
+  List<Container> pullNewlyAllocatedContainers();
+
+  Container getMasterContainer();
+
+  ApplicationSubmissionContext getSubmissionContext();
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMAppAttemptEvent extends AbstractEvent<RMAppAttemptEventType> {
+
+  private final ApplicationAttemptId appAttemptId;
+
+  public RMAppAttemptEvent(ApplicationAttemptId appAttemptId,
+      RMAppAttemptEventType type) {
+    super(type);
+    this.appAttemptId = appAttemptId;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return this.appAttemptId;
+  }
+}



Mime
View raw message