hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153430 [7/9] - in /hadoop/common/branches/MR-279/mapreduce: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/loc...
Date Wed, 03 Aug 2011 11:32:10 GMT
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug  3 11:31:34 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -46,13 +47,18 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -79,7 +85,7 @@ public class LeafQueue implements Queue 
   private float usedCapacity = 0.0f;
   private volatile int numContainers;
 
-  Set<Application> applications;
+  Set<CSApp> applications;
 
   public final Resource minimumAllocation;
 
@@ -90,8 +96,6 @@ public class LeafQueue implements Queue 
   private final QueueMetrics metrics;
 
   private QueueInfo queueInfo; 
-  private Map<ApplicationId, org.apache.hadoop.yarn.api.records.ApplicationReport> 
-  applicationInfos;
 
   private QueueState state;
 
@@ -105,7 +109,7 @@ public class LeafQueue implements Queue 
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, Queue parent, 
-      Comparator<Application> applicationComparator, Queue old) {
+      Comparator<CSApp> applicationComparator, Queue old) {
     this.scheduler = cs;
     this.queueName = queueName;
     this.parent = parent;
@@ -139,10 +143,6 @@ public class LeafQueue implements Queue 
     this.queueInfo.setQueueName(queueName);
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
 
-    this.applicationInfos = 
-      new HashMap<ApplicationId, 
-      org.apache.hadoop.yarn.api.records.ApplicationReport>();
-
     QueueState state = cs.getConfiguration().getState(getQueuePath());
 
     Map<QueueACL, AccessControlList> acls = 
@@ -158,7 +158,7 @@ public class LeafQueue implements Queue 
         " name=" + queueName + 
         ", fullname=" + getQueuePath());
 
-    this.applications = new TreeSet<Application>(applicationComparator);
+    this.applications = new TreeSet<CSApp>(applicationComparator);
   }
 
   private synchronized void setupQueueConfigs(
@@ -256,11 +256,6 @@ public class LeafQueue implements Queue 
   }
 
   @Override
-  public synchronized List<Application> getApplications() {
-    return new ArrayList<Application>(applications);
-  }
-
-  @Override
   public List<Queue> getChildQueues() {
     return null;
   }
@@ -292,19 +287,9 @@ public class LeafQueue implements Queue 
   }
 
   @Override
-  public synchronized QueueInfo getQueueInfo(boolean includeApplications, 
+  public synchronized QueueInfo getQueueInfo(
       boolean includeChildQueues, boolean recursive) {
     queueInfo.setCurrentCapacity(usedCapacity);
-
-    if (includeApplications) {
-      queueInfo.setApplications( 
-          new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>(
-              applicationInfos.values()));
-    } else {
-      queueInfo.setApplications(
-          new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>());
-    }
-
     return queueInfo;
   }
 
@@ -377,9 +362,8 @@ public class LeafQueue implements Queue 
   }
 
   @Override
-  public void submitApplication(Application application, String userName,
-      String queue, Priority priority) 
-  throws AccessControlException {
+  public void submitApplication(CSApp application, String userName,
+      String queue)  throws AccessControlException {
     // Careful! Locking order is important!
 
     // Check queue ACLs
@@ -430,7 +414,7 @@ public class LeafQueue implements Queue 
 
     // Inform the parent queue
     try {
-      parent.submitApplication(application, userName, queue, priority);
+      parent.submitApplication(application, userName, queue);
     } catch (AccessControlException ace) {
       LOG.info("Failed to submit application to parent-queue: " + 
           parent.getQueuePath(), ace);
@@ -439,12 +423,10 @@ public class LeafQueue implements Queue 
     }
   }
 
-  private synchronized void addApplication(Application application, User user) {
+  private synchronized void addApplication(CSApp application, User user) {
     // Accept 
     user.submitApplication();
     applications.add(application);
-    applicationInfos.put(application.getApplicationId(), 
-        application.getApplicationInfo());
 
     LOG.info("Application added -" +
         " appId: " + application.getApplicationId() +
@@ -454,8 +436,7 @@ public class LeafQueue implements Queue 
   }
 
   @Override
-  public void finishApplication(Application application, String queue) 
-  throws AccessControlException {
+  public void finishApplication(CSApp application, String queue) {
     // Careful! Locking order is important!
     synchronized (this) {
       removeApplication(application, getUser(application.getUser()));
@@ -465,7 +446,7 @@ public class LeafQueue implements Queue 
     parent.finishApplication(application, queue);
   }
 
-  public synchronized void removeApplication(Application application, User user) {
+  public synchronized void removeApplication(CSApp application, User user) {
     applications.remove(application);
 
     user.finishApplication();
@@ -473,8 +454,6 @@ public class LeafQueue implements Queue 
       users.remove(application.getUser());
     }
 
-    applicationInfos.remove(application.getApplicationId());
-
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
         " user: " + application.getUser() + 
@@ -485,27 +464,21 @@ public class LeafQueue implements Queue 
 
   @Override
   public synchronized Resource 
-  assignContainers(Resource clusterResource, NodeInfo node) {
+  assignContainers(Resource clusterResource, CSNode node) {
 
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getNodeAddress() + 
         " #applications=" + applications.size());
     
     // Check for reserved resources
-    Application reservedApplication = node.getReservedApplication();
+    CSApp reservedApplication = node.getReservedApplication();
     if (reservedApplication != null) {
       return assignReservedContainers(reservedApplication, node, 
           clusterResource);
     }
 
     // Try to assign containers to applications in fifo order
-    for (Application application : applications) {
-
-      if (!application.isSchedulable()) {
-        LOG.info("Application " + application.getApplicationId() + 
-            " not schedulable. State = " + application.getState());
-        continue;
-      }
+    for (CSApp application : applications) {
       
       LOG.info("DEBUG --- pre-assignContainers for application "
           + application.getApplicationId());
@@ -525,7 +498,7 @@ public class LeafQueue implements Queue 
 
           // Are we going over limits by allocating to this application?
           ResourceRequest required = 
-            application.getResourceRequest(priority, NodeManager.ANY);
+            application.getResourceRequest(priority, RMNode.ANY);
           
 
           // Maximum Capacity of the queue
@@ -552,7 +525,7 @@ public class LeafQueue implements Queue 
           // Did we schedule or reserve a container?
           if (Resources.greaterThan(assigned, Resources.none())) {
             Resource assignedResource = 
-              application.getResourceRequest(priority, NodeManager.ANY).getCapability();
+              application.getResourceRequest(priority, RMNode.ANY).getCapability();
 
             // Book-keeping
             allocateResource(clusterResource, 
@@ -579,8 +552,8 @@ public class LeafQueue implements Queue 
 
   }
 
-  private synchronized Resource assignReservedContainers(Application application, 
-      NodeInfo node, Resource clusterResource) {
+  private synchronized Resource assignReservedContainers(CSApp application, 
+      CSNode node, Resource clusterResource) {
     synchronized (application) {
       for (Priority priority : application.getPriorities()) {
 
@@ -589,7 +562,7 @@ public class LeafQueue implements Queue 
           
           // Do we really need this reservation still?
           ResourceRequest offSwitchRequest = 
-            application.getResourceRequest(priority, NodeManager.ANY);
+            application.getResourceRequest(priority, RMNode.ANY);
           if (offSwitchRequest.getNumContainers() == 0) {
             // Release
             unreserve(application, priority, node);
@@ -626,12 +599,12 @@ public class LeafQueue implements Queue 
     return true;
   }
 
-  private void setUserResourceLimit(Application application, Resource resourceLimit) {
+  private void setUserResourceLimit(CSApp application, Resource resourceLimit) {
     application.setAvailableResourceLimit(resourceLimit);
     metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit);
   }
   
-  private Resource computeUserLimit(Application application, 
+  private Resource computeUserLimit(CSApp application, 
       Resource clusterResource, Resource required) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
@@ -715,17 +688,17 @@ public class LeafQueue implements Queue 
     return (a + (b - 1)) / b;
   }
 
-  boolean needContainers(Application application, Priority priority) {
+  boolean needContainers(CSApp application, Priority priority) {
     ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, NodeManager.ANY);
+      application.getResourceRequest(priority, RMNode.ANY);
 
     int requiredContainers = offSwitchRequest.getNumContainers();
     int reservedContainers = application.getReservedContainers(priority);
     return ((requiredContainers - reservedContainers) > 0);
   }
 
-  Resource assignContainersOnNode(Resource clusterResource, NodeInfo node, 
-      Application application, Priority priority, boolean reserved) {
+  Resource assignContainersOnNode(Resource clusterResource, CSNode node, 
+      CSApp application, Priority priority, boolean reserved) {
 
     Resource assigned = Resources.none();
 
@@ -746,10 +719,10 @@ public class LeafQueue implements Queue 
         priority, reserved);
   }
 
-  Resource assignNodeLocalContainers(Resource clusterResource, NodeInfo node, 
-      Application application, Priority priority) {
-    ResourceRequest request = 
-      application.getResourceRequest(priority, node.getNodeAddress());
+  Resource assignNodeLocalContainers(Resource clusterResource, CSNode node, 
+      CSApp application, Priority priority) {
+    ResourceRequest request = application.getResourceRequest(priority, node
+        .getNodeAddress());
     if (request != null) {
       if (canAssign(application, priority, node, NodeType.DATA_LOCAL, false)) {
         return assignContainer(clusterResource, node, application, priority, request, 
@@ -760,8 +733,8 @@ public class LeafQueue implements Queue 
     return Resources.none();
   }
 
-  Resource assignRackLocalContainers(Resource clusterResource, NodeInfo node, 
-      Application application, Priority priority) {
+  Resource assignRackLocalContainers(Resource clusterResource, CSNode node, 
+      CSApp application, Priority priority) {
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRackName());
     if (request != null) {
@@ -773,10 +746,10 @@ public class LeafQueue implements Queue 
     return Resources.none();
   }
 
-  Resource assignOffSwitchContainers(Resource clusterResource, NodeInfo node, 
-      Application application, Priority priority, boolean reserved) {
+  Resource assignOffSwitchContainers(Resource clusterResource, CSNode node, 
+      CSApp application, Priority priority, boolean reserved) {
     ResourceRequest request = 
-      application.getResourceRequest(priority, NodeManager.ANY);
+      application.getResourceRequest(priority, RMNode.ANY);
     if (request != null) {
       if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reserved)) {
         return assignContainer(clusterResource, node, application, priority, request, 
@@ -787,11 +760,11 @@ public class LeafQueue implements Queue 
     return Resources.none();
   }
 
-  boolean canAssign(Application application, Priority priority, 
-      NodeInfo node, NodeType type, boolean reserved) {
+  boolean canAssign(CSApp application, Priority priority, 
+      CSNode node, NodeType type, boolean reserved) {
 
     ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, NodeManager.ANY);
+      application.getResourceRequest(priority, RMNode.ANY);
 
     if (offSwitchRequest.getNumContainers() == 0) {
       return false;
@@ -857,8 +830,8 @@ public class LeafQueue implements Queue 
 
     return false;
   }
-  private Resource assignContainer(Resource clusterResource, NodeInfo node, 
-      Application application, 
+  private Resource assignContainer(Resource clusterResource, CSNode node, 
+      CSApp application, 
       Priority priority, ResourceRequest request, NodeType type) {
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getNodeAddress() + 
@@ -883,10 +856,11 @@ public class LeafQueue implements Queue 
       List<Container> containers =
         new ArrayList<Container>();
       Container container =
-          BuilderUtils.newContainer(this.recordFactory,
-              application.getApplicationId(),
-              application.getNewContainerId(), node.getNodeAddress(),
-              node.getHttpAddress(), capability);
+         BuilderUtils.newContainer(this.recordFactory,
+                    application.getApplicationId(),
+                    application.getNewContainerId(),
+                    node.getNodeID(), node.getNodeAddress(),
+                    node.getHttpAddress(), capability);
       
       // If security is enabled, send the container-tokens too.
       if (UserGroupInformation.isSecurityEnabled()) {
@@ -941,24 +915,42 @@ public class LeafQueue implements Queue 
     return Resources.none();
   }
 
-  private void allocate(Application application, NodeType type, 
+  private void allocate(CSApp application, NodeType type, 
       Priority priority, ResourceRequest request, 
-      NodeInfo node, List<Container> containers) {
+      CSNode node, List<Container> containers) {
     // Allocate container to the application
     application.allocate(type, node, priority, request, containers);
 
+    for (Container container : containers) {
+      // Create the container and 'start' it.
+      ContainerId containerId = container.getId();
+      RMContext rmContext = this.scheduler.getRMContext();
+      EventHandler eventHandler = rmContext.getDispatcher().getEventHandler();
+      RMContainer rmContainer = new RMContainerImpl(containerId, application
+          .getApplicationAttemptId(), node.getNodeID(), container,
+          eventHandler, rmContext.getContainerAllocationExpirer());
+      if (rmContext.getRMContainers().putIfAbsent(containerId, rmContainer) != null) {
+        LOG.error("Duplicate container addition! ContainerID :  "
+            + containerId);
+      } else {
+        eventHandler.handle(new RMContainerEvent(containerId,
+            RMContainerEventType.START));
+      }
+    }
+
     // Inform the NodeManager about the allocation
-    node.allocateContainer(application.getApplicationId(), containers);
+    node.allocateContainer(application.getApplicationId(),
+        containers);
   }
 
-  private void reserve(Application application, Priority priority, 
-      NodeInfo node, Resource resource) {
+  private void reserve(CSApp application, Priority priority, 
+      CSNode node, Resource resource) {
     application.reserveResource(node, priority, resource);
     node.reserveResource(application, priority, resource);
   }
 
-  private void unreserve(Application application, Priority priority, 
-      NodeInfo node) {
+  private void unreserve(CSApp application, Priority priority, 
+      CSNode node) {
     // Done with the reservation?
     if (application.isReserved(node, priority)) {
       application.unreserveResource(node, priority);
@@ -969,7 +961,7 @@ public class LeafQueue implements Queue 
 
   @Override
   public void completedContainer(Resource clusterResource, 
-      Container container, Resource containerResource, Application application) {
+      Container container, Resource containerResource, CSApp application) {
     if (application != null) {
       // Careful! Locking order is important!
       synchronized (this) {
@@ -1066,7 +1058,7 @@ public class LeafQueue implements Queue 
 
   @Override
   public void recoverContainer(Resource clusterResource,
-      Application application, Container container) {
+      CSApp application, Container container) {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, application.getUser(), container.getResource());

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Aug  3 11:31:34 2011
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
@@ -47,8 +46,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 
 @Private
@@ -86,8 +83,6 @@ public class ParentQueue implements Queu
   private final QueueMetrics metrics;
 
   private QueueInfo queueInfo; 
-  private Map<ApplicationId, org.apache.hadoop.yarn.api.records.ApplicationReport> 
-  applicationInfos;
 
   private Map<QueueACL, AccessControlList> acls = 
     new HashMap<QueueACL, AccessControlList>();
@@ -144,11 +139,6 @@ public class ParentQueue implements Queu
     this.queueComparator = comparator;
     this.childQueues = new TreeSet<Queue>(comparator);
 
-    this.applicationInfos = 
-      new HashMap<ApplicationId, 
-      org.apache.hadoop.yarn.api.records.ApplicationReport>();
-
-
     LOG.info("Initialized parent-queue " + queueName + 
         " name=" + queueName + 
         ", fullname=" + getQueuePath()); 
@@ -258,11 +248,6 @@ public class ParentQueue implements Queu
   }
 
   @Override
-  public List<Application> getApplications() {
-    return null;
-  }
-
-  @Override
   public synchronized List<Queue> getChildQueues() {
     return new ArrayList<Queue>(childQueues);
   }
@@ -286,25 +271,16 @@ public class ParentQueue implements Queu
   }
 
   @Override
-  public synchronized QueueInfo getQueueInfo(boolean includeApplications, 
+  public synchronized QueueInfo getQueueInfo( 
       boolean includeChildQueues, boolean recursive) {
     queueInfo.setCurrentCapacity(usedCapacity);
 
-    if (includeApplications) {
-      queueInfo.setApplications( 
-        new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>(
-            applicationInfos.values()));
-    } else {
-      queueInfo.setApplications(
-          new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>());
-    }
-
     List<QueueInfo> childQueuesInfo = new ArrayList<QueueInfo>();
     if (includeChildQueues) {
       for (Queue child : childQueues) {
         // Get queue information recursively?
         childQueuesInfo.add(
-            child.getQueueInfo(includeApplications, recursive, recursive));
+            child.getQueueInfo(recursive, recursive));
       }
     }
     queueInfo.setChildQueues(childQueuesInfo);
@@ -420,9 +396,8 @@ public class ParentQueue implements Queu
   }
 
   @Override
-  public void submitApplication(Application application, String user,
-      String queue, Priority priority) 
-  throws AccessControlException {
+  public void submitApplication(CSApp application, String user,
+      String queue) throws AccessControlException {
     
     synchronized (this) {
       // Sanity check
@@ -443,7 +418,7 @@ public class ParentQueue implements Queu
     // Inform the parent queue
     if (parent != null) {
       try {
-        parent.submitApplication(application, user, queue, priority);
+        parent.submitApplication(application, user, queue);
       } catch (AccessControlException ace) {
         LOG.info("Failed to submit application to parent-queue: " + 
             parent.getQueuePath(), ace);
@@ -453,14 +428,11 @@ public class ParentQueue implements Queu
     }
   }
 
-  private synchronized void addApplication(Application application, 
+  private synchronized void addApplication(CSApp application, 
       String user) {
   
     ++numApplications;
 
-    applicationInfos.put(application.getApplicationId(), 
-        application.getApplicationInfo());
-
     LOG.info("Application added -" +
         " appId: " + application.getApplicationId() + 
         " user: " + user + 
@@ -469,16 +441,9 @@ public class ParentQueue implements Queu
   }
   
   @Override
-  public void finishApplication(Application application, String queue) 
-  throws AccessControlException {
+  public void finishApplication(CSApp application, String queue) {
     
     synchronized (this) {
-      // Sanity check
-      if (queue.equals(queueName)) {
-        throw new AccessControlException("Cannot finish application " +
-            "from non-leaf queue: " + queueName);
-      }
-
       removeApplication(application, application.getUser());
     }
     
@@ -488,11 +453,10 @@ public class ParentQueue implements Queu
     }
   }
 
-  public synchronized void removeApplication(Application application, 
+  public synchronized void removeApplication(CSApp application, 
       String user) {
     
     --numApplications;
-    applicationInfos.remove(application.getApplicationId());
 
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
@@ -511,7 +475,7 @@ public class ParentQueue implements Queu
 
   @Override
   public synchronized Resource assignContainers(
-      Resource clusterResource, NodeInfo node) {
+      Resource clusterResource, CSNode node) {
     Resource assigned = Resources.createResource(0);
 
     while (canAssign(node)) {
@@ -575,14 +539,14 @@ public class ParentQueue implements Queu
 
   }
   
-  private boolean canAssign(NodeInfo node) {
+  private boolean canAssign(CSNode node) {
     return (node.getReservedApplication() == null) && 
         Resources.greaterThanOrEqual(node.getAvailableResource(), 
                                      minimumAllocation);
   }
   
   synchronized Resource assignContainersToChildQueues(Resource cluster, 
-      NodeInfo node) {
+      CSNode node) {
     Resource assigned = Resources.createResource(0);
     
     printChildQueues();
@@ -625,7 +589,7 @@ public class ParentQueue implements Queu
   @Override
   public void completedContainer(Resource clusterResource,
       Container container, Resource containerResource, 
-      Application application) {
+      CSApp application) {
     if (application != null) {
       // Careful! Locking order is important!
       // Book keeping
@@ -682,7 +646,7 @@ public class ParentQueue implements Queu
   
   @Override
   public void recoverContainer(Resource clusterResource,
-      Application application, Container container) {
+      CSApp application, Container container) {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, container.getResource());

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Wed Aug  3 11:31:34 2011
@@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 
 /**
  * Queue represents a node in the tree of 
@@ -124,12 +124,6 @@ extends org.apache.hadoop.yarn.server.re
   public List<Queue> getChildQueues();
   
   /**
-   * Get applications in this queue
-   * @return applications in the queue
-   */
-  public List<Application> getApplications();
-  
-  /**
    * Check if the <code>user</code> has permission to perform the operation
    * @param acl ACL
    * @param user user
@@ -143,10 +137,9 @@ extends org.apache.hadoop.yarn.server.re
    * @param application application being submitted
    * @param user user who submitted the application
    * @param queue queue to which the application is submitted
-   * @param priority application priority
    */
-  public void submitApplication(Application application, String user, 
-      String queue, Priority priority) 
+  public void submitApplication(CSApp application, String user, 
+      String queue) 
   throws AccessControlException;
   
   /**
@@ -154,8 +147,7 @@ extends org.apache.hadoop.yarn.server.re
    * @param application
    * @param queue application queue 
    */
-  public void finishApplication(Application application, String queue)
-  throws AccessControlException;
+  public void finishApplication(CSApp application, String queue);
   
   /**
    * Assign containers to applications in the queue or it's children (if any).
@@ -163,7 +155,7 @@ extends org.apache.hadoop.yarn.server.re
    * @param node node on which resources are available
    * @return
    */
-  public Resource assignContainers(Resource clusterResource, NodeInfo node);
+  public Resource assignContainers(Resource clusterResource, CSNode node);
   
   /**
    * A container assigned to the queue has completed.
@@ -175,7 +167,7 @@ extends org.apache.hadoop.yarn.server.re
    */
   public void completedContainer(Resource clusterResource,
       Container container, Resource containerResource, 
-      Application application);
+      CSApp application);
 
   /**
    * Get the number of applications in the queue.
@@ -204,6 +196,6 @@ extends org.apache.hadoop.yarn.server.re
    * @param application the application for which the container was allocated
    * @param container the container that was recovered.
    */
-  public void recoverContainer(Resource clusterResource, Application application, 
+  public void recoverContainer(Resource clusterResource, CSApp application, 
       Container container);
 }

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,31 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+
+public class AppAddedSchedulerEvent extends SchedulerEvent {
+
+  private final ApplicationAttemptId applicationAttemptId;
+  private final String queue;
+  private final String user;
+
+  public AppAddedSchedulerEvent(ApplicationAttemptId applicationAttemptId,
+      String queue, String user) {
+    super(SchedulerEventType.APP_ADDED);
+    this.applicationAttemptId = applicationAttemptId;
+    this.queue = queue;
+    this.user = user;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class AppRemovedSchedulerEvent extends SchedulerEvent {
+
+  private final ApplicationAttemptId applicationAttemptId;
+
+  public AppRemovedSchedulerEvent(ApplicationAttemptId applicationAttemptId) {
+    super(SchedulerEventType.APP_REMOVED);
+    this.applicationAttemptId = applicationAttemptId;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptID() {
+    return this.applicationAttemptId;
+  }
+
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+public class ContainerFinishedSchedulerEvent extends SchedulerEvent {
+
+  private final Container container;
+
+  public ContainerFinishedSchedulerEvent(Container container) {
+    super(SchedulerEventType.CONTAINER_FINISHED);
+    this.container = container;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class NodeAddedSchedulerEvent extends SchedulerEvent {
+
+  private final RMNode rmNode;
+
+  public NodeAddedSchedulerEvent(RMNode rmNode) {
+    super(SchedulerEventType.NODE_ADDED);
+    this.rmNode = rmNode;
+  }
+
+  public RMNode getAddedRMNode() {
+    return rmNode;
+  }
+
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class NodeRemovedSchedulerEvent extends SchedulerEvent {
+
+  private final RMNode rmNode;
+
+  public NodeRemovedSchedulerEvent(RMNode rmNode) {
+    super(SchedulerEventType.NODE_REMOVED);
+    this.rmNode = rmNode;
+  }
+
+  public RMNode getRemovedRMNode() {
+    return rmNode;
+  }
+
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,30 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class NodeUpdateSchedulerEvent extends SchedulerEvent {
+
+  private final RMNode rmNode;
+  private final Map<ApplicationId, List<Container>> containers;
+
+  public NodeUpdateSchedulerEvent(RMNode rmNode,
+      Map<ApplicationId, List<Container>> containers) {
+    super(SchedulerEventType.NODE_UPDATE);
+    this.rmNode = rmNode;
+    this.containers = containers;
+  }
+
+  public RMNode getRMNode() {
+    return rmNode;
+  }
+
+  public Map<ApplicationId, List<Container>> getContainers() {
+    return containers;
+  }
+
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEvent.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,9 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SchedulerEvent extends AbstractEvent<SchedulerEventType> {
+  public SchedulerEvent(SchedulerEventType type) {
+    super(type);
+  }
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,17 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+public enum SchedulerEventType {
+
+  // Source: Node
+  NODE_ADDED,
+  NODE_REMOVED,
+  NODE_UPDATE,
+
+  // Source: Container
+  CONTAINER_FINISHED,
+  
+  // Source: App
+  APP_ADDED,
+  APP_REMOVED,
+
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug  3 11:31:34 2011
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,11 +37,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -51,22 +54,31 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationTrackerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerFinishedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
@@ -84,7 +96,9 @@ public class FifoScheduler implements Re
 
   private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
   private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
-  private ClusterTracker clusterTracker;
+  private RMContext rmContext;
+
+  private Map<NodeId, SchedulerNode> nodes = new ConcurrentHashMap<NodeId, SchedulerNode>();
 
   private static final int MINIMUM_MEMORY = 1024;
 
@@ -104,9 +118,8 @@ public class FifoScheduler implements Re
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  Map<ApplicationId, Application> applications =
-      new TreeMap<ApplicationId, Application>(
-          new BuilderUtils.ApplicationIdComparator());
+  private Map<ApplicationAttemptId, SchedulerApp> applications
+      = new HashMap<ApplicationAttemptId, SchedulerApp>();
 
   private static final String DEFAULT_QUEUE_NAME = "default";
   private final QueueMetrics metrics =
@@ -124,20 +137,13 @@ public class FifoScheduler implements Re
     }
 
     @Override
-    public QueueInfo getQueueInfo(boolean includeApplications, 
+    public QueueInfo getQueueInfo( 
         boolean includeChildQueues, boolean recursive) {
       QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
       queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
       queueInfo.setCapacity(100.0f);
       queueInfo.setMaximumCapacity(100.0f);
       queueInfo.setChildQueues(new ArrayList<QueueInfo>());
-
-      if (includeApplications) {
-        queueInfo.setApplications(getApplications());
-      } else {
-        queueInfo.setApplications(
-            new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>());
-      }
       return queueInfo;
     }
 
@@ -161,16 +167,6 @@ public class FifoScheduler implements Re
       return Collections.singletonList(queueUserAclInfo);
     }
   };
-
-  public FifoScheduler() {
-  }
-
-  public FifoScheduler(ClusterTracker clusterTracker) {
-    this.clusterTracker = clusterTracker;
-    if (clusterTracker != null) {
-      this.clusterTracker.addListener(this);
-    }
-  }
   
   @Override
   public Resource getMinimumResourceCapability() {
@@ -185,14 +181,13 @@ public class FifoScheduler implements Re
   @Override
   public synchronized void reinitialize(Configuration conf,
       ContainerTokenSecretManager containerTokenSecretManager, 
-      ClusterTracker clusterTracker) 
+      RMContext rmContext) 
   throws IOException 
   {
     if (!this.initialized) {
       this.conf = conf;
       this.containerTokenSecretManager = containerTokenSecretManager;
-      this.clusterTracker = clusterTracker;
-      if (clusterTracker != null) this.clusterTracker.addListener(this);
+      this.rmContext = rmContext;
       this.minimumAllocation = 
         Resources.createResource(conf.getInt(MINIMUM_ALLOCATION, MINIMUM_MEMORY));
       this.maximumAllocation = 
@@ -204,59 +199,47 @@ public class FifoScheduler implements Re
   }
 
   @Override
-  public synchronized Allocation allocate(ApplicationId applicationId,
-      List<ResourceRequest> ask, List<Container> release) 
-      throws IOException {
-    Application application = getApplication(applicationId);
+  public synchronized void allocate(ApplicationAttemptId applicationAttemptId,
+      List<ResourceRequest> ask) {
+    SchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +
-          "or non existant application " + applicationId);
-      return new Allocation(EMPTY_CONTAINER_LIST, Resources.none()); 
+          "or non existant application " + applicationAttemptId);
+      return;
     }
 
     // Sanity check
     normalizeRequests(ask);
     
-    List<Container> allocatedContainers = null;
     Resource limit = null;
     synchronized (application) {
 
       LOG.debug("allocate: pre-update" +
-          " applicationId=" + applicationId + 
+          " applicationId=" + applicationAttemptId + 
           " application=" + application);
       application.showRequests();
 
       // Update application requests
       application.updateResourceRequests(ask);
 
-      // Release containers
-      releaseContainers(application, release);
-
       LOG.debug("allocate: post-update" +
-          " applicationId=" + applicationId + 
+          " applicationId=" + applicationAttemptId + 
           " application=" + application);
       application.showRequests();
 
-      allocatedContainers = application.acquire();
-      limit = application.getHeadroom();
       LOG.debug("allocate:" +
-          " applicationId=" + applicationId + 
-          " #ask=" + ask.size() + 
-          " #release=" + release.size() +
-          " #allocatedContainers=" + allocatedContainers.size() + 
-          " limit=" + limit);
+          " applicationId=" + applicationAttemptId + 
+          " #ask=" + ask.size());
     }
-    
-    return new Allocation(allocatedContainers, limit);
   }
 
-  private void releaseContainers(Application application, List<Container> release) {
-    application.releaseContainers(release);
-    for (Container container : release) {
-      releaseContainer(application.getApplicationId(), container);
-    }
+  @Override
+  public Resource getResourceLimit(ApplicationAttemptId applicationAttemptId) {
+    SchedulerApp application = getApplication(applicationAttemptId);
+    // TODO: What if null?
+    return application.getHeadroom();
   }
-
+  
   private void normalizeRequests(List<ResourceRequest> asks) {
     for (ResourceRequest ask : asks) {
       normalizeRequest(ask);
@@ -271,42 +254,41 @@ public class FifoScheduler implements Re
     ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
   }
 
-  private synchronized Application getApplication(ApplicationId applicationId) {
-    return applications.get(applicationId);
+  private synchronized SchedulerApp getApplication(
+      ApplicationAttemptId applicationAttemptId) {
+    return applications.get(applicationAttemptId);
   }
 
-  @Override
-  public synchronized void addApplication(ApplicationId applicationId, ApplicationMaster master,
-      String user, String unusedQueue, Priority unusedPriority, ApplicationStore appStore) 
-  throws IOException {
-    applications.put(applicationId, 
-        new Application(applicationId, master, DEFAULT_QUEUE, user, appStore));
+  private synchronized void addApplication(ApplicationAttemptId appAttemptId,
+      String queueName, String user) {
+    AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
+        appAttemptId, null, queueName, user, null);
+    SchedulerApp schedulerApp = new SchedulerApp(appSchedulingInfo,
+        DEFAULT_QUEUE);
+    applications.put(appAttemptId, schedulerApp);
     metrics.submitApp(user);
-    LOG.info("Application Submission: " + applicationId.getId() + " from " + user + 
+    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + " from " + user + 
         ", currently active: " + applications.size());
+    rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(appAttemptId,
+            RMAppAttemptEventType.APP_ACCEPTED));
   }
 
-  @Override
-  public synchronized void doneApplication(ApplicationId applicationId, boolean finishApplication)
-  throws IOException {
-    Application application = getApplication(applicationId);
+  private synchronized void doneApplication(
+      ApplicationAttemptId applicationAttemptId, boolean finishApplication)
+      throws IOException {
+    SchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
-      throw new IOException("Unknown application " + applicationId + 
+      throw new IOException("Unknown application " + applicationAttemptId + 
       " has completed!");
     }
 
-    // Release current containers
-    releaseContainers(application, application.getCurrentContainers());
-    
     // Clean up pending requests, metrics etc.
     application.stop();
     
     if (finishApplication) {
-      // Let the cluster know that the applications are done
-      finishedApplication(applicationId, 
-          application.getAllNodesForApplication());
       // Remove the application
-      applications.remove(applicationId);
+      applications.remove(applicationAttemptId);
     }
   }
 
@@ -315,14 +297,15 @@ public class FifoScheduler implements Re
    * 
    * @param node node on which resources are available to be allocated
    */
-  private synchronized void assignContainers(NodeInfo node) {
+  private synchronized void assignContainers(SchedulerNode node) {
     LOG.debug("assignContainers:" +
-        " node=" + node.getNodeAddress() + 
+        " node=" + node.getRMNode().getNodeAddress() + 
         " #applications=" + applications.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationId, Application> e : applications.entrySet()) {
-      Application application = e.getValue();
+    for (Map.Entry<ApplicationAttemptId, SchedulerApp> e : applications
+        .entrySet()) {
+      SchedulerApp application = e.getValue();
       LOG.debug("pre-assignContainers");
       application.showRequests();
       synchronized (application) {
@@ -354,10 +337,10 @@ public class FifoScheduler implements Re
     }
   }
 
-  private int getMaxAllocatableContainers(Application application,
-      Priority priority, NodeInfo node, NodeType type) {
+  private int getMaxAllocatableContainers(SchedulerApp application,
+      Priority priority, SchedulerNode node, NodeType type) {
     ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, NodeManager.ANY);
+      application.getResourceRequest(priority, SchedulerNode.ANY);
     int maxContainers = offSwitchRequest.getNumContainers();
 
     if (type == NodeType.OFF_SWITCH) {
@@ -366,7 +349,7 @@ public class FifoScheduler implements Re
 
     if (type == NodeType.RACK_LOCAL) {
       ResourceRequest rackLocalRequest = 
-        application.getResourceRequest(priority, node.getRackName());
+        application.getResourceRequest(priority, node.getRMNode().getRackName());
       if (rackLocalRequest == null) {
         return maxContainers;
       }
@@ -376,7 +359,7 @@ public class FifoScheduler implements Re
 
     if (type == NodeType.DATA_LOCAL) {
       ResourceRequest nodeLocalRequest = 
-        application.getResourceRequest(priority, node.getNodeAddress());
+        application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
       if (nodeLocalRequest != null) {
         maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
       }
@@ -386,8 +369,8 @@ public class FifoScheduler implements Re
   }
 
 
-  private int assignContainersOnNode(NodeInfo node, 
-      Application application, Priority priority 
+  private int assignContainersOnNode(SchedulerNode node, 
+      SchedulerApp application, Priority priority 
   ) {
     // Data-local
     int nodeLocalContainers = 
@@ -403,7 +386,7 @@ public class FifoScheduler implements Re
 
 
     LOG.debug("assignContainersOnNode:" +
-        " node=" + node.getNodeAddress() + 
+        " node=" + node.getRMNode().getNodeAddress() + 
         " application=" + application.getApplicationId().getId() +
         " priority=" + priority.getPriority() + 
         " #assigned=" + 
@@ -413,11 +396,11 @@ public class FifoScheduler implements Re
     return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
   }
 
-  private int assignNodeLocalContainers(NodeInfo node, 
-      Application application, Priority priority) {
+  private int assignNodeLocalContainers(SchedulerNode node, 
+      SchedulerApp application, Priority priority) {
     int assignedContainers = 0;
     ResourceRequest request = 
-      application.getResourceRequest(priority, node.getNodeAddress());
+      application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
     if (request != null) {
       int assignableContainers = 
         Math.min(
@@ -431,11 +414,11 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private int assignRackLocalContainers(NodeInfo node, 
-      Application application, Priority priority) {
+  private int assignRackLocalContainers(SchedulerNode node, 
+      SchedulerApp application, Priority priority) {
     int assignedContainers = 0;
     ResourceRequest request = 
-      application.getResourceRequest(priority, node.getRackName());
+      application.getResourceRequest(priority, node.getRMNode().getRackName());
     if (request != null) {
       int assignableContainers = 
         Math.min(
@@ -449,11 +432,11 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private int assignOffSwitchContainers(NodeInfo node, 
-      Application application, Priority priority) {
+  private int assignOffSwitchContainers(SchedulerNode node, 
+      SchedulerApp application, Priority priority) {
     int assignedContainers = 0;
     ResourceRequest request = 
-      application.getResourceRequest(priority, NodeManager.ANY);
+      application.getResourceRequest(priority, SchedulerNode.ANY);
     if (request != null) {
       assignedContainers = 
         assignContainers(node, application, priority, 
@@ -462,11 +445,11 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private int assignContainers(NodeInfo node, Application application, 
+  private int assignContainers(SchedulerNode node, SchedulerApp application, 
       Priority priority, int assignableContainers, 
       ResourceRequest request, NodeType type) {
     LOG.debug("assignContainers:" +
-        " node=" + node.getNodeAddress() + 
+        " node=" + node.getRMNode().getNodeAddress() + 
         " application=" + application.getApplicationId().getId() + 
         " priority=" + priority.getPriority() + 
         " assignableContainers=" + assignableContainers +
@@ -475,11 +458,11 @@ public class FifoScheduler implements Re
 
     int availableContainers = 
       node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy
-    // application
-    // with this
-    // zero would
-    // crash the
-    // scheduler.
+                                                                        // application
+                                                                        // with this
+                                                                        // zero would
+                                                                        // crash the
+                                                                        // scheduler.
     int assignedContainers = 
       Math.min(assignableContainers, availableContainers);
 
@@ -490,8 +473,9 @@ public class FifoScheduler implements Re
         Container container =
             BuilderUtils.newContainer(recordFactory,
                 application.getApplicationId(),
-                application.getNewContainerId(), node.getNodeAddress(),
-                node.getHttpAddress(), capability);
+                application.getNewContainerId(),
+                node.getRMNode().getNodeID(), node.getRMNode().getNodeAddress(),
+                node.getRMNode().getHttpAddress(), capability);
         // If security is enabled, send the container-tokens too.
         if (UserGroupInformation.isSecurityEnabled()) {
           ContainerToken containerToken =
@@ -510,8 +494,9 @@ public class FifoScheduler implements Re
         }
         containers.add(container);
       }
-      application.allocate(type, node, priority, request, containers);
-      addAllocatedContainers(node, application.getApplicationId(), containers);
+      application.allocate(containers);
+      addAllocatedContainers(node, application.getApplicationAttemptId(),
+          containers);
       Resources.addTo(usedResource,
           Resources.multiply(capability, assignedContainers));
     }
@@ -519,23 +504,23 @@ public class FifoScheduler implements Re
   }
 
   private synchronized void killContainers(List<Container> containers) {
-    for (Container container : containers) {
-      container.setState(ContainerState.COMPLETE);
-    }
     applicationCompletedContainers(containers);
   }
-  
-  private synchronized void applicationCompletedContainers(
-      List<Container> completedContainers) {
-    for (Container c: completedContainers) {
-      Application app = applications.get(c.getId().getAppId());
+
+  private synchronized void applicationCompletedContainers(List<Container> containers) {
+    for (Container c : containers) {
+      applicationCompletedContainer(c);
+    }
+  }
+
+  private synchronized void applicationCompletedContainer(Container c) {
+      SchedulerApp app = applications.get(c.getId().getAppId());
       /** this is possible, since an application can be removed from scheduler but
        * the nodemanger is just updating about a completed container.
        */
       if (app != null) {
         app.completedContainer(c, c.getResource());
       }
-    }
   }
 
   private List<Container> getCompletedContainers(Map<String, List<Container>> allContainers) {
@@ -555,46 +540,71 @@ public class FifoScheduler implements Re
     return completedContainers;
   }
 
-  @Override
-  public synchronized void nodeUpdate(NodeInfo node, 
-      Map<String,List<Container>> containers ) {
+  private synchronized void nodeUpdate(RMNode rmNode,
+      Map<String, List<Container>> containers) {
+    SchedulerNode node = this.nodes.get(rmNode.getNodeID());
+    node.statusUpdate(containers);
 
     applicationCompletedContainers(getCompletedContainers(containers));
-    LOG.info("Node heartbeat " + node.getNodeID() + " resource = " + node.getAvailableResource());
+
+    LOG.info("Node heartbeat " + rmNode.getNodeID() + " resource = " + node.getAvailableResource());
+    LOG.info("Node heartbeat " + rmNode.getNodeID() + " resource = " + node.getAvailableResource().getMemory());
+    LOG.info("=========Node heartbeat " + rmNode.getNodeID() + " resourcemimi = " + minimumAllocation.getMemory());
     if (Resources.greaterThanOrEqual(node.getAvailableResource(),
         minimumAllocation)) {
       assignContainers(node);
     }
     metrics.setAvailableResourcesToQueue(
         Resources.subtract(clusterResource, usedResource));
-    LOG.info("Node after allocation " + node.getNodeID() + " resource = "
+    LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
         + node.getAvailableResource());
 
     // TODO: Add the list of containers to be preempted when we support
   }  
 
   @Override
-  public synchronized void handle(ASMEvent<ApplicationTrackerEventType> event) {
+  public synchronized void handle(SchedulerEvent event) {
     switch(event.getType()) {
-    case ADD:
-      /**
-       * ignore add since its called syncronously from applications manager.
-       */
+    case NODE_ADDED:
+      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
+      addNode(nodeAddedEvent.getAddedRMNode());
       break;
-    case REMOVE:
-      try {
-        doneApplication(event.getApplication().getApplicationID(), true);
-      } catch(IOException ie) {
-        LOG.error("Unable to remove application " + event.getApplication().getApplicationID(), ie);
+    case NODE_REMOVED:
+      NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
+      removeNode(nodeRemovedEvent.getRemovedRMNode());
+      break;
+    case NODE_UPDATE:
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+      Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
+      Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
+      for (Map.Entry<ApplicationId, List<Container>> entry : contAppMapping.entrySet()) {
+        conts.put(entry.getKey().toString(), entry.getValue());
       }
-      break;  
-    case EXPIRE:
+      nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
+      break;
+    case APP_ADDED:
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
+          .getQueue(), appAddedEvent.getUser());
+      break;
+    case APP_REMOVED:
+      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
       try {
-        doneApplication(event.getApplication().getApplicationID(), false);
+        doneApplication(appRemovedEvent.getApplicationAttemptID(), true);
       } catch(IOException ie) {
-        LOG.error("Unable to remove application " + event.getApplication().getApplicationID(), ie);
+        LOG.error("Unable to remove application "
+            + appRemovedEvent.getApplicationAttemptID(), ie);
       }
       break;
+    case CONTAINER_FINISHED:
+      ContainerFinishedSchedulerEvent containerFinishedEvent = (ContainerFinishedSchedulerEvent) event;
+      Container container = containerFinishedEvent.getContainer();
+      applicationCompletedContainer(container);
+      this.rmContext.getRMContainers().remove(container.getId());
+      releaseContainer(container.getId().getAppId(), container);
+      break;
+    default:
+      LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }
   }
 
@@ -602,21 +612,17 @@ public class FifoScheduler implements Re
   private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
-  public synchronized Resource getClusterResource() {
-    return clusterResource;
-  }
-
-  @Override
-  public synchronized void removeNode(NodeInfo nodeInfo) {
+  private synchronized void removeNode(RMNode nodeInfo) {
+    this.nodes.remove(nodeInfo.getNodeID());
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     killContainers(nodeInfo.getRunningContainers());
   }
 
 
   @Override
-  public QueueInfo getQueueInfo(String queueName, boolean includeApplications,
+  public QueueInfo getQueueInfo(String queueName,
       boolean includeChildQueues, boolean recursive) {
-    return DEFAULT_QUEUE.getQueueInfo(includeApplications, false, false);
+    return DEFAULT_QUEUE.getQueueInfo(false, false);
   }
 
   @Override
@@ -624,37 +630,39 @@ public class FifoScheduler implements Re
     return DEFAULT_QUEUE.getQueueUserAclInfo(null); 
   }
 
-  private synchronized List<org.apache.hadoop.yarn.api.records.ApplicationReport> 
-  getApplications() {
-    List<org.apache.hadoop.yarn.api.records.ApplicationReport> applications = 
-      new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>();
-    for (Application application : this.applications.values()) {
-      applications.add(application.getApplicationInfo());
-    }
-    return applications;
-  }
-
-  @Override
-  public synchronized void addNode(NodeInfo nodeManager) {
+  private synchronized void addNode(RMNode nodeManager) {
+    this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }
 
-  public synchronized void releaseContainer(ApplicationId applicationId, 
+  private synchronized void releaseContainer(ApplicationId applicationId, 
       Container container) {
     // Reap containers
     LOG.info("Application " + applicationId + " released container " +
         container.getId());
-    clusterTracker.releaseContainer(container);
+    // TODO:FIXMEVINODKV
+//    node.releaseContainer(container);
   }
 
-  public synchronized void addAllocatedContainers(NodeInfo nodeInfo, 
-      ApplicationId applicationId, List<Container> containers) {
-    nodeInfo.allocateContainer(applicationId, containers);
-  }
-
-  public synchronized void finishedApplication(ApplicationId applicationId,
-      List<NodeInfo> nodesToNotify) {
-    clusterTracker.finishedApplication(applicationId, nodesToNotify);
+  private synchronized void addAllocatedContainers(SchedulerNode node,
+      ApplicationAttemptId appAttemptId, List<Container> containers) {
+    node.allocateContainer(appAttemptId.getApplicationId(), containers);
+    for (Container container : containers) {
+      // Create the container and 'start' it.
+      ContainerId containerId = container.getId();
+      RMContainer rmContainer = new RMContainerImpl(containerId,
+          appAttemptId, node.getNodeID(), container, this.rmContext
+              .getDispatcher().getEventHandler(), this.rmContext
+              .getContainerAllocationExpirer());
+      if (this.rmContext.getRMContainers().putIfAbsent(containerId,
+          rmContainer) != null) {
+        LOG.error("Duplicate container addition! ContainerID :  "
+            + containerId);
+      } else {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMContainerEvent(containerId, RMContainerEventType.START));
+      }
+    }
   }
 
   @Override
@@ -662,7 +670,7 @@ public class FifoScheduler implements Re
     for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
       ApplicationId appId = entry.getKey();
       ApplicationInfo appInfo = entry.getValue();
-      Application app = applications.get(appId);
+      SchedulerApp app = applications.get(appId);
       app.allocate(appInfo.getContainers());
     }
   }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Wed Aug  3 11:31:34 2011
@@ -18,16 +18,19 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
-import com.google.inject.Inject;
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
 
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+import org.apache.hadoop.yarn.webapp.view.JQueryUI.Render;
 
-import static org.apache.hadoop.yarn.util.StringHelper.*;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+import com.google.inject.Inject;
 
 class AppsBlock extends HtmlBlock {
   final AppsList list;
@@ -52,21 +55,21 @@ class AppsBlock extends HtmlBlock {
             th(".note", "Note")._()._().
         tbody();
     int i = 0;
-    for (Application app : list.apps.values()) {
-      String appId = Apps.toString(app.getApplicationID());
-      String trackingUrl = app.getMaster().getTrackingUrl();
+    for (RMApp app : list.apps.values()) {
+      String appId = Apps.toString(app.getApplicationId());
+      String trackingUrl = app.getTrackingUrl();
       String ui = trackingUrl == null || trackingUrl.isEmpty() ? "UNASSIGNED" :
           (app.getFinishTime() == 0 ? "ApplicationMaster" : "JobHistory");
-      String percent = String.format("%.1f", app.getStatus().getProgress() * 100);
+      String percent = String.format("%.1f", app.getProgress() * 100);
       tbody.
         tr().
           td().
-            br().$title(String.valueOf(app.getApplicationID().getId()))._(). // for sorting
+            br().$title(String.valueOf(app.getApplicationId().getId()))._(). // for sorting
             a(url("app", appId), appId)._().
           td(app.getUser().toString()).
           td(app.getName().toString()).
           td(app.getQueue().toString()).
-          td(app.getMaster().getState().toString()).
+          td(app.getState().toString()).
           td().
             br().$title(percent)._(). // for sorting
             div(_PROGRESSBAR).
@@ -75,7 +78,7 @@ class AppsBlock extends HtmlBlock {
                 $style(join("width:", percent, '%'))._()._()._().
           td().
             a(trackingUrl == null ? "#" : join("http://", trackingUrl), ui)._().
-          td(app.getMaster().getDiagnostics())._();
+          td(app.getDiagnostics().toString())._();
       if (list.rendering != Render.HTML && ++i >= 20) break;
     }
     tbody._()._();

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java Wed Aug  3 11:31:34 2011
@@ -29,8 +29,8 @@ import java.io.PrintWriter;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
 import org.apache.hadoop.yarn.webapp.ToJSON;
@@ -43,39 +43,39 @@ import com.google.inject.servlet.Request
 @RequestScoped
 class AppsList implements ToJSON {
   final RequestContext rc;
-  final ConcurrentMap<ApplicationId, Application> apps;
+  final ConcurrentMap<ApplicationId, RMApp> apps;
   Render rendering;
 
   @Inject AppsList(RequestContext ctx, RMContext rmContext) {
     rc = ctx;
-    apps = rmContext.getApplications();
+    apps = rmContext.getRMApps();
   }
 
   void toDataTableArrays(PrintWriter out) {
     out.append('[');
     boolean first = true;
-    for (Application app : apps.values()) {
+    for (RMApp app : apps.values()) {
       if (first) {
         first = false;
       } else {
         out.append(",\n");
       }
-      String appID = Apps.toString(app.getApplicationID());
-      String trackingUrl = app.getMaster().getTrackingUrl();
+      String appID = Apps.toString(app.getApplicationId());
+      String trackingUrl = app.getTrackingUrl();
       String ui = trackingUrl == null ? "UNASSIGNED" :
           (app.getFinishTime() == 0 ? "ApplicationMaster" : "JobHistory");
       out.append("[\"");
-      appendSortable(out, app.getApplicationID().getId());
+      appendSortable(out, app.getApplicationId().getId());
       appendLink(out, appID, rc.prefix(), "app", appID).append(_SEP).
           append(escapeHtml(app.getUser().toString())).append(_SEP).
           append(escapeHtml(app.getName().toString())).append(_SEP).
           append(escapeHtml(app.getQueue())).append(_SEP).
-          append(app.getMaster().getState().toString()).append(_SEP);
-      appendProgressBar(out, app.getStatus().getProgress()).append(_SEP);
+          append(app.getState().toString()).append(_SEP);
+      appendProgressBar(out, app.getProgress()).append(_SEP);
       appendLink(out, ui, rc.prefix(),
                  trackingUrl == null ? "#" : "http://", trackingUrl).
           append(_SEP).append(escapeJavaScript(escapeHtml(
-                              app.getMaster().getDiagnostics()))).
+                              app.getDiagnostics().toString()))).
           append("\"]");
     }
     out.append(']');

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Wed Aug  3 11:31:34 2011
@@ -18,30 +18,32 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import java.util.Date;
-
-import com.google.inject.Inject;
-
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
+
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.SubView;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+import com.google.inject.Inject;
 
 class NodesPage extends RmView {
 
   static class NodesBlock extends HtmlBlock {
-    final ClusterTracker resource;
+    final RMContext rmContext;
 
     @Inject
-    NodesBlock(ClusterTracker rc, ViewContext ctx) {
+    NodesBlock(RMContext context, ViewContext ctx) {
       super(ctx);
-      resource = rc;
+      this.rmContext = context;
     }
 
     @Override
@@ -56,10 +58,11 @@ class NodesPage extends RmView {
           th(".lastHealthUpdate", "Last health-update").
           th(".healthReport", "Health-report").
           th(".containers", "Containers").
-          th(".mem", "Mem Used (MB)").
-          th(".mem", "Mem Avail (MB)")._()._().
+//          th(".mem", "Mem Used (MB)").
+//          th(".mem", "Mem Avail (MB)").
+          _()._().
           tbody();
-      for (NodeInfo ni : resource.getAllNodeInfo()) {
+      for (RMNode ni : this.rmContext.getRMNodes().values()) {
         NodeHealthStatus health = ni.getNodeHealthStatus();
         tbody.tr().
             td(ni.getRackName()).
@@ -69,8 +72,9 @@ class NodesPage extends RmView {
             td(Times.format(health.getLastHealthReportTime())).
             td(String.valueOf(health.getHealthReport())).
             td(String.valueOf(ni.getNumContainers())).
-            td(String.valueOf(ni.getUsedResource().getMemory())).
-            td(String.valueOf(ni.getAvailableResource().getMemory()))._();
+//            td(String.valueOf(ni.getUsedResource().getMemory())).
+//            td(String.valueOf(ni.getAvailableResource().getMemory())).
+            _();
       }
       tbody._()._();
     }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java Wed Aug  3 11:31:34 2011
@@ -18,14 +18,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
-import org.apache.hadoop.yarn.server.resourcemanager.ApplicationsManager;
+import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
+
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.webapp.WebApp;
 
-import static org.apache.hadoop.yarn.util.StringHelper.*;
-
 /**
  * The RM webapp
  */
@@ -44,8 +42,6 @@ public class RMWebApp extends WebApp {
     if (rm != null) {
       bind(ResourceManager.class).toInstance(rm);
       bind(RMContext.class).toInstance(rm.getRMContext());
-      bind(ApplicationsManager.class).toInstance(rm.getApplicationsManager());
-      bind(ClusterTracker.class).toInstance(rm.getResourceTracker());
     }
     route("/", RmController.class);
     route("/nodes", RmController.class, "nodes");



Mime
View raw message