hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153430 [6/9] - in /hadoop/common/branches/MR-279/mapreduce: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/loc...
Date Wed, 03 Aug 2011 11:32:10 GMT
Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Aug  3 11:31:34 2011
@@ -21,23 +21,22 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -46,7 +45,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 /**
  * This class keeps track of all the consumption of an application. This also
@@ -54,13 +53,16 @@ import org.apache.hadoop.yarn.server.res
  */
 @Private
 @Unstable
-public class Application {
+public class AppSchedulingInfo {
   
-  private static final Log LOG = LogFactory.getLog(Application.class);
+  private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
+  private final ApplicationAttemptId applicationAttemptId;
   final ApplicationId applicationId;
-  final Queue queue;
+  private final String queueName;
+  Queue queue;
   final String user;
-  
+  private final AtomicInteger containerIdCounter = new AtomicInteger(0);
+
   private final RecordFactory recordFactory = RecordFactoryProvider
       .getRecordFactory(null);
 
@@ -68,15 +70,6 @@ public class Application {
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
   final Map<Priority, Map<String, ResourceRequest>> requests = 
     new HashMap<Priority, Map<String, ResourceRequest>>();
-  final Resource currentConsumption = recordFactory
-      .newRecordInstance(Resource.class);
-  final Resource currentReservation = recordFactory
-  .newRecordInstance(Resource.class);
-  final Resource overallConsumption = recordFactory
-      .newRecordInstance(Resource.class);
-  Resource resourceLimit = recordFactory.newRecordInstance(Resource.class);
-  
-  Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
 
   private final ApplicationStore store;
 
@@ -85,23 +78,15 @@ public class Application {
   List<Container> completedContainers = new ArrayList<Container>();
   /* Allocated by scheduler */
   List<Container> allocated = new ArrayList<Container>();
-  Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
   ApplicationMaster master;
   boolean pending = true; // for app metrics
 
-  /* Reserved containers */
-  private final Comparator<NodeInfo> nodeComparator = new Comparator<NodeInfo>() {
-    @Override
-    public int compare(NodeInfo o1, NodeInfo o2) {
-      return o1.getNodeID().getId() - o2.getNodeID().getId();
-    }
-  };
-  final Map<Priority, Set<NodeInfo>> reservedContainers = new HashMap<Priority, Set<NodeInfo>>();
-
-  public Application(ApplicationId applicationId, ApplicationMaster master,
-      Queue queue, String user, ApplicationStore store) {
-    this.applicationId = applicationId;
-    this.queue = queue;
+  public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
+      ApplicationMaster master, String queueName, String user,
+      ApplicationStore store) {
+    this.applicationAttemptId = appAttemptId;
+    this.applicationId = appAttemptId.getApplicationId();
+    this.queueName = queueName;
     this.user = user;
     this.master = master;
     this.store = store;
@@ -111,8 +96,12 @@ public class Application {
     return applicationId;
   }
 
-  public Queue getQueue() {
-    return queue;
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public String getQueueName() {
+    return queueName;
   }
 
   public String getUser() {
@@ -126,34 +115,20 @@ public class Application {
   public synchronized boolean isPending() {
     return pending;
   }
-
-  public synchronized boolean isSchedulable() {
-    ApplicationState state = getState();
-    return 
-      (state == ApplicationState.ALLOCATED || state == ApplicationState.ALLOCATING ||
-       state == ApplicationState.LAUNCHED || state == ApplicationState.LAUNCHING || 
-       state == ApplicationState.PENDING || state == ApplicationState.RUNNING);
-  }
-  
-  public synchronized Map<Priority, Map<String, ResourceRequest>> getRequests() {
-    return requests;
-  }
-
-  public int getNewContainerId() {
-    int i = master.getContainerCount();
-    master.setContainerCount(++i);
-    return master.getContainerCount();
-  }
   
   /**
    * Clear any pending requests from this application.
    */
-  public synchronized void clearRequests() {
+  private synchronized void clearRequests() {
     priorities.clear();
     requests.clear();
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
+  public int getNewContainerId() {
+    return this.containerIdCounter.incrementAndGet();
+  }
+
   /**
    * the currently acquired/allocated containers by the application masters.
    * 
@@ -170,16 +145,12 @@ public class Application {
    * 
    * @return allocated resources
    */
-  synchronized public List<Container> acquire() {
+  synchronized private List<Container> acquire() {
     // Return allocated containers
     acquired.addAll(allocated);
     List<Container> heartbeatContainers = allocated;
     allocated = new ArrayList<Container>();
 
-    for (Container container : heartbeatContainers) {
-      Resources.addTo(overallConsumption, container.getResource());
-    }
-
     LOG.info("acquire:" + " application=" + applicationId + " #acquired="
         + heartbeatContainers.size());
     heartbeatContainers = (heartbeatContainers == null) ? new ArrayList<Container>()
@@ -207,7 +178,7 @@ public class Application {
       boolean updatePendingResources = false;
       ResourceRequest lastRequest = null;
 
-      if (hostName.equals(NodeManager.ANY)) {
+      if (hostName.equals(RMNode.ANY)) {
         LOG.debug("update:" + " application=" + applicationId + " request="
             + request);
         updatePendingResources = true;
@@ -239,7 +210,7 @@ public class Application {
     }
   }
 
-  public synchronized void releaseContainers(List<Container> release) {
+  private synchronized void releaseContainers(List<Container> release) {
     // Release containers and update consumption
     for (Container container : release) {
       LOG.debug("update: " + "application=" + applicationId + " released="
@@ -273,18 +244,17 @@ public class Application {
   }
 
   public synchronized Resource getResource(Priority priority) {
-    ResourceRequest request = getResourceRequest(priority, NodeManager.ANY);
+    ResourceRequest request = getResourceRequest(priority, RMNode.ANY);
     return request.getCapability();
   }
 
-  synchronized public void completedContainer(Container container, 
+  synchronized private void completedContainer(Container container, 
       Resource containerResource) {
     if (container != null) {
       LOG.info("Completed container: " + container);
       completedContainers.add(container);
     }
     queue.getMetrics().releaseResources(user, 1, containerResource);
-    Resources.subtractFrom(currentConsumption, containerResource);
   }
 
   /**
@@ -302,9 +272,8 @@ public class Application {
    * @param containers
    *          the containers allocated.
    */
-  synchronized public void allocate(NodeType type, NodeInfo node,
+  synchronized public void allocate(NodeType type, SchedulerNode node,
       Priority priority, ResourceRequest request, List<Container> containers) {
-    applicationOnNodes.add(node);
     if (type == NodeType.DATA_LOCAL) {
       allocateNodeLocal(node, priority, request, containers);
     } else if (type == NodeType.RACK_LOCAL) {
@@ -331,7 +300,7 @@ public class Application {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal(NodeInfo node, Priority priority,
+  synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority,
       ResourceRequest nodeLocalRequest, List<Container> containers) {
     // Update consumption and track allocations
     allocate(containers);
@@ -353,7 +322,7 @@ public class Application {
 
     // Do not remove ANY
     ResourceRequest offSwitchRequest = requests.get(priority).get(
-        NodeManager.ANY);
+        RMNode.ANY);
     offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
         - containers.size());
   }
@@ -365,7 +334,7 @@ public class Application {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(NodeInfo node, Priority priority,
+  synchronized private void allocateRackLocal(SchedulerNode node, Priority priority,
       ResourceRequest rackLocalRequest, List<Container> containers) {
 
     // Update consumption and track allocations
@@ -380,7 +349,7 @@ public class Application {
 
     // Do not remove ANY
     ResourceRequest offSwitchRequest = requests.get(priority).get(
-        NodeManager.ANY);
+        RMNode.ANY);
     offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
         - containers.size());
   }
@@ -392,7 +361,7 @@ public class Application {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(NodeInfo node, Priority priority,
+  synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority,
       ResourceRequest offSwitchRequest, List<Container> containers) {
 
     // Update consumption and track allocations
@@ -405,10 +374,9 @@ public class Application {
         - containers.size());
   }
 
-  synchronized public void allocate(List<Container> containers) {
+  synchronized private void allocate(List<Container> containers) {
     // Update consumption and track allocations
     for (Container container : containers) {
-      Resources.addTo(currentConsumption, container.getResource());
 
       allocated.add(container);
       try {
@@ -422,127 +390,11 @@ public class Application {
     }
   }
 
-  synchronized public void resetSchedulingOpportunities(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities
-        .get(priority);
-    schedulingOpportunities = 0;
-    this.schedulingOpportunities.put(priority, schedulingOpportunities);
-  }
-
-  synchronized public void addSchedulingOpportunity(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities
-        .get(priority);
-    if (schedulingOpportunities == null) {
-      schedulingOpportunities = 0;
-    }
-    ++schedulingOpportunities;
-    this.schedulingOpportunities.put(priority, schedulingOpportunities);
-  }
-
-  synchronized public int getSchedulingOpportunities(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities
-        .get(priority);
-    if (schedulingOpportunities == null) {
-      schedulingOpportunities = 0;
-      this.schedulingOpportunities.put(priority, schedulingOpportunities);
-    }
-    return schedulingOpportunities;
-  }
-
-  synchronized public void showRequests() {
-    if (LOG.isDebugEnabled()) {
-      for (Priority priority : getPriorities()) {
-        Map<String, ResourceRequest> requests = getResourceRequests(priority);
-        if (requests != null) {
-          LOG.debug("showRequests:" + " application=" + applicationId + 
-              " available=" + getHeadroom() + 
-              " current=" + currentConsumption + " state=" + getState());
-          for (ResourceRequest request : requests.values()) {
-            LOG.debug("showRequests:" + " application=" + applicationId
-                + " request=" + request);
-          }
-        }
-      }
-    }
-  }
-
-  synchronized public List<NodeInfo> getAllNodesForApplication() {
-    return new ArrayList<NodeInfo>(applicationOnNodes);
-  }
-
-  synchronized public org.apache.hadoop.yarn.api.records.ApplicationReport getApplicationInfo() {
-    org.apache.hadoop.yarn.api.records.ApplicationReport application = recordFactory
-        .newRecordInstance(org.apache.hadoop.yarn.api.records.ApplicationReport.class);
-    application.setApplicationId(applicationId);
-    application.setName("");
-    application.setQueue(queue.getQueueName());
-    application.setState(org.apache.hadoop.yarn.api.records.ApplicationState.RUNNING);
-    application.setUser(user);
-
-    ApplicationStatus status = recordFactory
-        .newRecordInstance(ApplicationStatus.class);
-    status.setApplicationId(applicationId);
-    application.setStatus(status);
-
-    return application;
-  }
-
-  public synchronized int getReservedContainers(Priority priority) {
-    Set<NodeInfo> reservedNodes = this.reservedContainers.get(priority);
-    return (reservedNodes == null) ? 0 : reservedNodes.size();
-  }
-
-  public synchronized void reserveResource(NodeInfo node, Priority priority,
-      Resource resource) {
-    Set<NodeInfo> reservedNodes = this.reservedContainers.get(priority);
-    if (reservedNodes == null) {
-      reservedNodes = new TreeSet<NodeInfo>(nodeComparator);
-      reservedContainers.put(priority, reservedNodes);
-    }
-    reservedNodes.add(node);
-    Resources.add(currentReservation, resource);
-    LOG.info("Application " + applicationId + " reserved " + resource
-        + " on node " + node + ", currently has " + reservedNodes.size()
-        + " at priority " + priority 
-        + "; currentReservation " + currentReservation);
-    queue.getMetrics().reserveResource(user, resource);
-  }
-
-  public synchronized void unreserveResource(NodeInfo node, Priority priority) {
-    Set<NodeInfo> reservedNodes = reservedContainers.get(priority);
-    reservedNodes.remove(node);
-    if (reservedNodes.isEmpty()) {
-      this.reservedContainers.remove(priority);
-    }
-    
-    Resource resource = getResource(priority);
-    Resources.subtract(currentReservation, resource);
-
-    LOG.info("Application " + applicationId + " unreserved " + " on node "
-        + node + ", currently has " + reservedNodes.size() + " at priority "
-        + priority + "; currentReservation " + currentReservation);
-    queue.getMetrics().unreserveResource(user, node.getReservedResource());
-  }
-
-  public synchronized boolean isReserved(NodeInfo node, Priority priority) {
-    Set<NodeInfo> reservedNodes = reservedContainers.get(priority);
-    if (reservedNodes != null) {
-      return reservedNodes.contains(node);
-    }
-    return false;
-  }
-
-  public float getLocalityWaitFactor(Priority priority, int clusterNodes) {
-    // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = Math.max(this.requests.get(priority).size() - 1, 1);
-    return ((float) requiredResources / clusterNodes);
-  }
-
   synchronized public void stop() {
     // clear pending resources metrics for the application
     QueueMetrics metrics = queue.getMetrics();
     for (Map<String, ResourceRequest> asks : requests.values()) {
-      ResourceRequest request = asks.get(NodeManager.ANY);
+      ResourceRequest request = asks.get(RMNode.ANY);
       if (request != null) {
         metrics.decrPendingResources(user, request.getNumContainers(),
             Resources.multiply(request.getCapability(), request
@@ -555,28 +407,7 @@ public class Application {
     clearRequests();
   }
 
-  public Map<Priority, Set<NodeInfo>> getAllReservations() {
-    return new HashMap<Priority, Set<NodeInfo>>(reservedContainers);
-  }
-
-  public synchronized void setAvailableResourceLimit(Resource globalLimit) {
-    this.resourceLimit = globalLimit; 
-  }
-
-  /**
-   * Get available headroom in terms of resources for the application's user.
-   * @return available resource headroom
-   */
-  public synchronized Resource getHeadroom() {
-    Resource limit = 
-      Resources.subtract(Resources.subtract(resourceLimit, currentConsumption), 
-          currentReservation);
-
-    // Corner case to deal with applications being slightly over-limit
-    if (limit.getMemory() < 0) {
-      limit.setMemory(0);
-    }
-    
-    return limit;
+  public void setQueue(Queue queue) {
+    this.queue = queue;
   }
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Wed Aug  3 11:31:34 2011
@@ -52,13 +52,11 @@ public interface Queue {
   
   /**
    * Get queue information
-   * @param includeApplications include applications?
    * @param includeChildQueues include child queues?
    * @param recursive recursively get child queue information?
    * @return queue information
    */
-  QueueInfo getQueueInfo(boolean includeApplications, 
-      boolean includeChildQueues, boolean recursive);
+  QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive);
   
   /**
    * Get queue ACLs for given <code>user</code>.

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Wed Aug  3 11:31:34 2011
@@ -133,7 +133,7 @@ public class QueueMetrics {
     }
   }
 
-  public void finishApp(Application app) {
+  public void finishApp(AppSchedulingInfo app) {
     ApplicationState state = app.getState();
     switch (state) {
       case KILLED: appsKilled.incr(); break;

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Wed Aug  3 11:31:34 2011
@@ -23,11 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
 /**
@@ -37,8 +34,7 @@ import org.apache.hadoop.yarn.server.sec
  */
 @LimitedPrivate("yarn")
 @Evolving
-public interface ResourceScheduler extends ResourceListener, YarnScheduler, 
-  EventHandler<ASMEvent<ApplicationTrackerEventType>>, Recoverable {
+public interface ResourceScheduler extends YarnScheduler, Recoverable {
   /**
    * Re-initialize the <code>ResourceScheduler</code>.
    * @param conf configuration
@@ -46,5 +42,5 @@ public interface ResourceScheduler exten
    * @throws IOException
    */
   void reinitialize(Configuration conf, 
-      ContainerTokenSecretManager secretManager, ClusterTracker clusterTracker) throws IOException;    
+      ContainerTokenSecretManager secretManager, RMContext rmContext) throws IOException;    
 }

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,164 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class SchedulerApp {
+
+  private static final Log LOG = LogFactory.getLog(SchedulerApp.class);
+
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private final AppSchedulingInfo appSchedulingInfo;
+  private final Queue queue;
+
+  private final Resource currentConsumption = recordFactory
+      .newRecordInstance(Resource.class);
+  private Resource resourceLimit = recordFactory
+      .newRecordInstance(Resource.class);
+
+  public SchedulerApp(AppSchedulingInfo application, Queue queue) {
+    this.appSchedulingInfo = application;
+    this.queue = queue;
+    application.setQueue(queue);
+  }
+
+  public ApplicationId getApplicationId() {
+    return this.appSchedulingInfo.getApplicationId();
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return this.appSchedulingInfo.getApplicationAttemptId();
+  }
+
+  public String getUser() {
+    return this.appSchedulingInfo.getUser();
+  }
+
+  public void updateResourceRequests(List<ResourceRequest> requests) {
+    this.appSchedulingInfo.updateResourceRequests(requests);
+  }
+
+  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
+    return this.appSchedulingInfo.getResourceRequests(priority);
+  }
+
+  public int getNewContainerId() {
+    return this.appSchedulingInfo.getNewContainerId();
+  }
+  
+  public List<Container> getCurrentContainers() {
+    return this.appSchedulingInfo.getCurrentContainers();
+  }
+
+  public Collection<Priority> getPriorities() {
+    return this.appSchedulingInfo.getPriorities();
+  }
+
+  public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) {
+    return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress);
+  }
+
+  public Resource getResource(Priority priority) {
+    return this.appSchedulingInfo.getResource(priority);
+  }
+
+  public void allocate(NodeType type, SchedulerNode node, Priority priority,
+      ResourceRequest request, List<Container> containers) {
+    this.appSchedulingInfo
+        .allocate(type, node, priority, request, containers);
+  }
+
+  public boolean isPending() {
+    return this.appSchedulingInfo.isPending();
+  }
+
+  public String getQueueName() {
+    return this.appSchedulingInfo.getQueueName();
+  }
+
+  public Queue getQueue() {
+    return this.queue;
+  }
+
+  public void stop() {
+    this.appSchedulingInfo.stop();
+  }
+
+  synchronized public void completedContainer(Container container, 
+      Resource containerResource) {
+    if (container != null) {
+      LOG.info("Completed container: " + container);
+    }
+    queue.getMetrics().releaseResources(getUser(), 1,
+        containerResource);
+    Resources.subtractFrom(currentConsumption, containerResource);
+  }
+
+  synchronized public void allocate(List<Container> containers) {
+    // Update consumption and track allocations
+    for (Container container : containers) {
+      Resources.addTo(currentConsumption, container.getResource());
+      LOG.debug("allocate: applicationId=" + container.getId().getAppId()
+          + " container=" + container.getId() + " host="
+          + container.getContainerManagerAddress());
+    }
+  }
+
+  public Resource getCurrentConsumption() {
+    return this.currentConsumption;
+  }
+
+  synchronized public void showRequests() {
+    if (LOG.isDebugEnabled()) {
+      for (Priority priority : getPriorities()) {
+        Map<String, ResourceRequest> requests = getResourceRequests(priority);
+        if (requests != null) {
+          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
+              " available=" + getHeadroom() + 
+              " current=" + currentConsumption);
+          for (ResourceRequest request : requests.values()) {
+            LOG.debug("showRequests:" + " application=" + getApplicationId()
+                + " request=" + request);
+          }
+        }
+      }
+    }
+  }
+
+  public synchronized void setAvailableResourceLimit(Resource globalLimit) {
+    this.resourceLimit = globalLimit; 
+  }
+
+  /**
+   * Get available headroom in terms of resources for the application's user.
+   * @return available resource headroom
+   */
+  public synchronized Resource getHeadroom() {
+    Resource limit = Resources.subtract(resourceLimit, currentConsumption);
+
+    // Corner case to deal with applications being slightly over-limit
+    if (limit.getMemory() < 0) {
+      limit.setMemory(0);
+    }
+    
+    return limit;
+  }
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,218 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class SchedulerNode {
+
+  private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
+
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
+  private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
+
+  private volatile int numContainers;
+
+  /* set of containers that are allocated containers */
+  private final Map<ContainerId, Container> runningContainers = 
+    new TreeMap<ContainerId, Container>();
+
+  private final RMNode rmNode;
+
+  public static final String ANY = "*";
+
+  public SchedulerNode(RMNode node) {
+    this.rmNode = node;
+    this.availableResource.setMemory(node.getTotalCapability().getMemory());
+  }
+
+  public RMNode getRMNode() {
+    return this.rmNode;
+  }
+
+  public NodeId getNodeID() {
+    return this.rmNode.getNodeID();
+  }
+
+  public String getHttpAddress() {
+    return this.rmNode.getHttpAddress();
+  }
+
+  public String getNodeAddress() {
+    return this.rmNode.getNodeAddress();
+  }
+
+  public String getRackName() {
+    return this.rmNode.getRackName();
+  }
+
+  /**
+   * The Scheduler has allocated containers on this node to the 
+   * given application.
+   * 
+   * @param applicationId application
+   * @param containers allocated containers
+   */
+  public synchronized void allocateContainer(ApplicationId applicationId, 
+      List<Container> containers) {
+    if (containers == null) {
+      LOG.error("Adding null containers for application " + applicationId);
+      return;
+    }   
+    for (Container container : containers) {
+      allocateContainer(container);
+    }
+
+    LOG.info("addContainers:" +
+        " node=" + rmNode.getNodeAddress() + 
+        " #containers=" + containers.size() + 
+        " available=" + getAvailableResource().getMemory() + 
+        " used=" + getUsedResource().getMemory());
+  }
+
+  /**
+   * Status update from the NodeManager
+   * @param nodeStatus node status
+   * @return the set of containers no longer should be used by the
+   * node manager.
+   */
+  public synchronized void 
+    statusUpdate(Map<String,List<Container>> allContainers) {
+
+    if (allContainers == null) {
+      return;
+    }
+       
+    List<Container> listContainers = new ArrayList<Container>();
+    // Iterate through the running containers and update their status
+    for (Map.Entry<String, List<Container>> e : 
+      allContainers.entrySet()) {
+      listContainers.addAll(e.getValue());
+    }
+    update(listContainers);
+  }
+  
+  /**
+   * Status update for an application running on a given node
+   * @param node node
+   * @param containers containers update.
+   * @return containers that are completed or need to be preempted.
+   */
+  private synchronized void update(List<Container> containers) {
+    
+    for (Container container : containers) {
+    
+      if (container.getState() == ContainerState.COMPLETE) {
+        if (runningContainers.remove(container.getId()) != null) {
+          updateResource(container);
+          LOG.info("Completed container " + container);
+        }
+        LOG.info("Removed completed container " + container.getId() + " on node " + 
+            rmNode.getNodeAddress());
+      }
+    }
+  }
+  
+  private synchronized void allocateContainer(Container container) {
+    deductAvailableResource(container.getResource());
+    ++numContainers;
+    
+    runningContainers.put(container.getId(), container);
+    LOG.info("Allocated container " + container.getId() + 
+        " to node " + rmNode.getNodeAddress());
+    
+    LOG.info("Assigned container " + container.getId() + 
+        " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
+        ", which currently has " + numContainers + " containers, " + 
+        getUsedResource() + " used and " + 
+        getAvailableResource() + " available");
+  }
+
+  public synchronized Resource getAvailableResource() {
+    return this.availableResource;
+  }
+
+  public synchronized Resource getUsedResource() {
+    return this.usedResource;
+  }
+
+  private synchronized boolean isValidContainer(Container c) {    
+    if (runningContainers.containsKey(c.getId()))
+      return true;
+    return false;
+  }
+
+  private synchronized void updateResource(Container container) {
+    addAvailableResource(container.getResource());
+    --numContainers;
+  }
+  
+  /**
+   * Release an allocated container on this node.
+   * @param container container to be released
+   * @return <code>true</code> iff the container was unused, 
+   *         <code>false</code> otherwise
+   */
+  public synchronized boolean releaseContainer(Container container) {
+    if (!isValidContainer(container)) {
+      LOG.error("Invalid container released " + container);
+      return false;
+    }
+    
+    /* remove the containers from the nodemanger */
+    
+    runningContainers.remove(container.getId());
+    updateResource(container);
+
+    LOG.info("Released container " + container.getId() + 
+        " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
+        ", which currently has " + numContainers + " containers, " + 
+        getUsedResource() + " used and " + getAvailableResource()
+        + " available" + ", release resources=" + true);
+    return true;
+  }
+
+
+  private synchronized void addAvailableResource(Resource resource) {
+    if (resource == null) {
+      LOG.error("Invalid resource addition of null resource for "
+          + rmNode.getNodeAddress());
+      return;
+    }
+    Resources.addTo(availableResource, resource);
+    Resources.subtractFrom(usedResource, resource);
+  }
+
+  private synchronized void deductAvailableResource(Resource resource) {
+    if (resource == null) {
+      LOG.error("Invalid deduction of null resource for "
+          + rmNode.getNodeAddress());
+    }
+    Resources.subtractFrom(availableResource, resource);
+    Resources.addTo(usedResource, resource);
+  }
+
+  @Override
+  public String toString() {
+    return "host: " + rmNode.getNodeAddress() + " #containers=" + rmNode.getNumContainers() +  
+      " available=" + getAvailableResource().getMemory() + 
+      " used=" + getUsedResource().getMemory();
+  }
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Wed Aug  3 11:31:34 2011
@@ -21,70 +21,32 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.util.List;
 
-
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 
 /**
  * This interface is used by the components to talk to the
  * scheduler for allocating of resources, cleaning up resources.
  *
  */
-public interface YarnScheduler {
-  /**
-   * Allocates and returns resources.
-   * @param applicationId
-   * @param ask
-   * @param release
-   * @return the scheduler's {@link Allocation} response 
-   * @throws IOException
-   */
-  Allocation allocate(ApplicationId applicationId,
-      List<ResourceRequest> ask, List<Container> release)
-  throws IOException;
-  
-  /**
-   * A new application has been submitted to the ResourceManager
-   * @param applicationId application which has been submitted
-   * @param master the application master
-   * @param user application user
-   * @param queue queue to which the applications is being submitte
-   * @param priority application priority
-   * @param appStore the storage for the application.
-   */
-  public void addApplication(ApplicationId applicationId, ApplicationMaster master,
-      String user, String queue, Priority priority, ApplicationStore appStore) 
-  throws IOException;
-  
-  /**
-   * A submitted application has completed.
-   * @param applicationId completed application
-   * @param finishApplication true if the application is completed and the
-   * scheduler needs to notify other components of application completion.
-   */
-  public void doneApplication(ApplicationId applicationId, boolean finishApplication)
-  throws IOException;
-
+public interface YarnScheduler extends EventHandler<SchedulerEvent> {
 
   /**
    * Get queue information
    * @param queueName queue name
-   * @param includeApplications include applications?
    * @param includeChildQueues include child queues?
    * @param recursive get children queues?
    * @return queue information
    * @throws IOException
    */
-  public QueueInfo getQueueInfo(String queueName, boolean includeApplications, 
-      boolean includeChildQueues, boolean recursive) 
-  throws IOException;
+  public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
+      boolean recursive) throws IOException;
 
   /**
    * Get acls for queues for current user.
@@ -105,4 +67,8 @@ public interface YarnScheduler {
    */
   public Resource getMaximumResourceCapability();
 
+  public Resource getResourceLimit(ApplicationAttemptId appAttemptId);
+
+  void allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask);
+
 }

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSApp.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSApp.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSApp.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,139 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+
+public class CSApp extends SchedulerApp {
+
+  private static final Log LOG = LogFactory.getLog(CSApp.class);
+
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  final Map<Priority, Set<CSNode>> reservedContainers = new HashMap<Priority, Set<CSNode>>();
+  Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
+
+  final Resource currentReservation = recordFactory
+      .newRecordInstance(Resource.class);
+
+  /* Reserved containers */
+  private final Comparator<CSNode> nodeComparator = new Comparator<CSNode>() {
+    @Override
+    public int compare(CSNode o1, CSNode o2) {
+      return o1.getNodeID().getId() - o2.getNodeID().getId();
+    }
+  };
+
+  public CSApp(AppSchedulingInfo appSchedulingInfo, Queue queue) {
+    super(appSchedulingInfo, queue);
+  }
+
+  synchronized public void resetSchedulingOpportunities(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    schedulingOpportunities = 0;
+    this.schedulingOpportunities.put(priority, schedulingOpportunities);
+  }
+
+  synchronized public void addSchedulingOpportunity(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    if (schedulingOpportunities == null) {
+      schedulingOpportunities = 0;
+    }
+    ++schedulingOpportunities;
+    this.schedulingOpportunities.put(priority, schedulingOpportunities);
+  }
+
+  synchronized public int getSchedulingOpportunities(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    if (schedulingOpportunities == null) {
+      schedulingOpportunities = 0;
+      this.schedulingOpportunities.put(priority, schedulingOpportunities);
+    }
+    return schedulingOpportunities;
+  }
+
+  public synchronized int getReservedContainers(Priority priority) {
+    Set<CSNode> reservedNodes = this.reservedContainers.get(priority);
+    return (reservedNodes == null) ? 0 : reservedNodes.size();
+  }
+
+  public synchronized void reserveResource(CSNode node, Priority priority,
+      Resource resource) {
+    Set<CSNode> reservedNodes = this.reservedContainers.get(priority);
+    if (reservedNodes == null) {
+      reservedNodes = new TreeSet<CSNode>(nodeComparator);
+      reservedContainers.put(priority, reservedNodes);
+    }
+    reservedNodes.add(node);
+    Resources.add(currentReservation, resource);
+    LOG.info("Application " + getApplicationId() + " reserved " + resource
+        + " on node " + node + ", currently has " + reservedNodes.size()
+        + " at priority " + priority 
+        + "; currentReservation " + currentReservation);
+    getQueue().getMetrics().reserveResource(
+        getUser(), resource);
+  }
+
+  public synchronized void unreserveResource(CSNode node, Priority priority) {
+    Set<CSNode> reservedNodes = reservedContainers.get(priority);
+    reservedNodes.remove(node);
+    if (reservedNodes.isEmpty()) {
+      this.reservedContainers.remove(priority);
+    }
+    
+    Resource resource = getResource(priority);
+    Resources.subtract(currentReservation, resource);
+
+    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+        + node + ", currently has " + reservedNodes.size() + " at priority "
+        + priority + "; currentReservation " + currentReservation);
+    getQueue().getMetrics().unreserveResource(
+        getUser(), node.getReservedResource());
+  }
+
+  public synchronized boolean isReserved(CSNode node, Priority priority) {
+    Set<CSNode> reservedNodes = reservedContainers.get(priority);
+    if (reservedNodes != null) {
+      return reservedNodes.contains(node);
+    }
+    return false;
+  }
+
+  public float getLocalityWaitFactor(Priority priority, int clusterNodes) {
+    // Estimate: Required unique resources (i.e. hosts + racks)
+    int requiredResources = Math.max(this.getResourceRequests(priority).size() - 1, 1);
+    return ((float) requiredResources / clusterNodes);
+  }
+
+  public Map<Priority, Set<CSNode>> getAllReservations() {
+    return new HashMap<Priority, Set<CSNode>>(reservedContainers);
+  }
+
+  public synchronized Resource getHeadroom() {
+    Resource limit = Resources.subtract(super.getHeadroom(),
+        currentReservation);
+
+    // Corner case to deal with applications being slightly over-limit
+    if (limit.getMemory() < 0) {
+      limit.setMemory(0);
+    }
+
+    return limit;
+  }
+}

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSNode.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSNode.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSNode.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,70 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+public class CSNode extends SchedulerNode {
+
+  private static final Log LOG = LogFactory.getLog(CSNode.class);
+
+  private CSApp reservedApplication = null;
+  private Resource reservedResource = null;
+
+  public CSNode(RMNode node) {
+    super(node);
+  }
+
+  public synchronized void reserveResource(
+      CSApp application, Priority priority, Resource resource) {
+    // Check if it's already reserved
+    if (reservedApplication != null) {
+
+      // Cannot reserve more than one application on a given node!
+      if (!reservedApplication.getApplicationId().equals(
+          application.getApplicationId())) {
+        throw new IllegalStateException("Trying to reserve resource " + resource + 
+            " for application " + application.getApplicationId() + 
+            " when currently reserved resource " + reservedResource +
+            " for application " + reservedApplication.getApplicationId() + 
+            " on node " + this);
+      }
+
+      LOG.info("Updated reserved resource " + resource + " on node " + 
+          this + " for application " + application);
+    } else {
+      this.reservedApplication = application;
+      LOG.info("Reserved resource " + resource + " on node " + this + 
+          " for application " + application);
+    }
+    reservedResource = resource;
+  }
+
+  public synchronized void unreserveResource(CSApp application, 
+      Priority priority) {
+    // Cannot unreserve for wrong application...
+    if (!reservedApplication.getApplicationId().equals(
+        application.getApplicationId())) {
+      throw new IllegalStateException("Trying to unreserve " +  
+          " for application " + application.getApplicationId() + 
+          " when currently reserved " + 
+          " for application " + reservedApplication.getApplicationId() + 
+          " on node " + this);
+    }
+    
+    reservedApplication = null;
+    reservedResource = null;
+  }
+
+  public synchronized CSApp getReservedApplication() {
+    return reservedApplication;
+  }
+
+  public synchronized Resource getReservedResource() {
+    return reservedResource;
+  }
+
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Aug  3 11:31:34 2011
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -39,29 +38,38 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Lock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationTrackerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerFinishedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
@@ -90,20 +98,22 @@ implements ResourceScheduler, CapacitySc
     }
   };
 
-  private final Comparator<Application> applicationComparator = 
-    new Comparator<Application>() {
+  private final Comparator<CSApp> applicationComparator = 
+    new Comparator<CSApp>() {
     @Override
-    public int compare(Application a1, Application a2) {
+    public int compare(CSApp a1, CSApp a2) {
       return a1.getApplicationId().getId() - a2.getApplicationId().getId();
     }
   };
 
   private CapacitySchedulerConfiguration conf;
   private ContainerTokenSecretManager containerTokenSecretManager;
-  private ClusterTracker clusterTracker;
+  private RMContext rmContext;
 
   private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
 
+  private Map<NodeId, CSNode> csNodes = new ConcurrentHashMap<NodeId, CSNode>();
+
   private Resource clusterResource = 
     RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
   private int numNodeManagers = 0;
@@ -111,10 +121,8 @@ implements ResourceScheduler, CapacitySc
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  private Map<ApplicationId, Application> applications =
-    Collections.synchronizedMap(
-    new TreeMap<ApplicationId, Application>(
-        new BuilderUtils.ApplicationIdComparator()));
+  private Map<ApplicationAttemptId, CSApp> applications = Collections
+      .synchronizedMap(new HashMap<ApplicationAttemptId, CSApp>());
 
   private boolean initialized = false;
 
@@ -145,18 +153,22 @@ implements ResourceScheduler, CapacitySc
   public synchronized int getNumClusterNodes() {
     return numNodeManagers;
   }
-  
+
+  @Override
+  public RMContext getRMContext() {
+    return this.rmContext;
+  }
+
   @Override
   public synchronized void reinitialize(Configuration conf,
-      ContainerTokenSecretManager containerTokenSecretManager, ClusterTracker clusterTracker) 
+      ContainerTokenSecretManager containerTokenSecretManager, RMContext rmContext) 
   throws IOException {
     if (!initialized) {
       this.conf = new CapacitySchedulerConfiguration(conf);
       this.minimumAllocation = this.conf.getMinimumAllocation();
       this.maximumAllocation = this.conf.getMaximumAllocation();
       this.containerTokenSecretManager = containerTokenSecretManager;
-      this.clusterTracker = clusterTracker;
-      if (clusterTracker != null) clusterTracker.addListener(this);
+      this.rmContext = rmContext;
       initializeQueues(this.conf);
       initialized = true;
     } else {
@@ -274,54 +286,64 @@ implements ResourceScheduler, CapacitySc
     return queue;
   }
 
-  @Override
-  public synchronized void addApplication(
-      ApplicationId applicationId, ApplicationMaster master,
-      String user, String queueName, Priority priority, ApplicationStore appStore)
-  throws IOException {
+  private synchronized void
+      addApplication(ApplicationAttemptId applicationAttemptId,
+          String queueName, String user) {
+
     // Sanity checks
     Queue queue = queues.get(queueName);
     if (queue == null) {
-      throw new IOException("Application " + applicationId + 
-          " submitted by user " + user + " to unknown queue: " + queueName);
+      String message = "Application " + applicationAttemptId + 
+      " submitted by user " + user + " to unknown queue: " + queueName;
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+      return;
     }
     if (!(queue instanceof LeafQueue)) {
-      throw new IOException("Application " + applicationId + 
-          " submitted by user " + user + " to non-leaf queue: " + queueName);
+      String message = "Application " + applicationAttemptId + 
+          " submitted by user " + user + " to non-leaf queue: " + queueName;
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+      return;
     }
 
-    // Create the application
-    Application application = 
-      new Application(applicationId, master, queue, user, appStore);
-    
+    AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
+        applicationAttemptId, null, queueName, user, null);
+    CSApp csApp = new CSApp(appSchedulingInfo, queue);
+
     // Submit to the queue
     try {
-      queue.submitApplication(application, user, queueName, priority);
+      queue.submitApplication(csApp, user, queueName);
     } catch (AccessControlException ace) {
-      throw new IOException(ace);
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptRejectedEvent(applicationAttemptId, StringUtils
+              .stringifyException(ace)));
+      return;
     }
 
-    applications.put(applicationId, application);
+    applications.put(applicationAttemptId, csApp);
 
-    LOG.info("Application Submission: " + applicationId.getId() + 
+    LOG.info("Application Submission: " + applicationAttemptId + 
         ", user: " + user +
         " queue: " + queue +
         ", currently active: " + applications.size());
+
+    rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.APP_ACCEPTED));
   }
 
-  @Override
-  public synchronized void doneApplication(
-      ApplicationId applicationId, boolean finishApplication)
-  throws IOException {
-    LOG.info("Application " + applicationId + " is done." +
+  private synchronized void doneApplication(
+      ApplicationAttemptId applicationAttemptId, boolean finishApplication) {
+    LOG.info("Application " + applicationAttemptId + " is done." +
     		" finish=" + finishApplication);
     
-    Application application = getApplication(applicationId);
+    CSApp application = getApplication(applicationAttemptId);
 
     if (application == null) {
       //      throw new IOException("Unknown application " + applicationId + 
       //          " has completed!");
-      LOG.info("Unknown application " + applicationId + " has completed!");
+      LOG.info("Unknown application " + applicationAttemptId + " has completed!");
       return;
     }
     
@@ -339,69 +361,55 @@ implements ResourceScheduler, CapacitySc
      */
     if (finishApplication) {
       // Inform the queue
-      Queue queue = queues.get(application.getQueue().getQueueName());
-      queue.finishApplication(application, queue.getQueueName());
-      
-      // Inform the resource-tracker
-      clusterTracker.finishedApplication(applicationId, 
-          application.getAllNodesForApplication());
+      String queueName = application.getQueue().getQueueName();
+      Queue queue = queues.get(queueName);
+      if (!(queue instanceof LeafQueue)) {
+        LOG.error("Cannot finish application " + "from non-leaf queue: "
+            + queueName);
+      } else {
+        queue.finishApplication(application, queue.getQueueName());
+      }
       
       // Remove from our data-structure
-      applications.remove(applicationId);
+      applications.remove(applicationAttemptId);
     }
   }
 
   @Override
   @Lock(Lock.NoLock.class)
-  public Allocation allocate(ApplicationId applicationId,
-      List<ResourceRequest> ask, List<Container> release)
-      throws IOException {
+  public void allocate(ApplicationAttemptId applicationAttemptId,
+      List<ResourceRequest> ask) {
 
-    Application application = getApplication(applicationId);
+    CSApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
-          "or non existant application " + applicationId);
-      return new Allocation(EMPTY_CONTAINER_LIST, Resources.none()); 
+          "or non existant application " + applicationAttemptId);
+      return;
     }
     
     // Sanity check
     normalizeRequests(ask);
 
     LOG.info("DEBUG --- allocate: pre-update" +
-        " applicationId=" + applicationId + 
+        " applicationId=" + applicationAttemptId + 
         " application=" + application);
     application.showRequests();
 
     // Update application requests
     application.updateResourceRequests(ask);
 
-    // Release ununsed containers and update queue capacities
-    processReleasedContainers(application, release);
-
     LOG.info("DEBUG --- allocate: post-update");
     application.showRequests();
-
-    // Acquire containers
-    List<Container> allocatedContainers = application.acquire();
-
-    // Resource limit
-    Resource limit = application.getHeadroom();
     
     LOG.info("DEBUG --- allocate:" +
-        " applicationId=" + applicationId + 
-        " #ask=" + ask.size() + 
-        " #release=" + release.size() +
-        " #allocatedContainers=" + allocatedContainers.size() +
-        " limit=" + limit);
-
-      
-      return new Allocation(allocatedContainers, limit);
-  }
+        " applicationId=" + applicationAttemptId + 
+        " #ask=" + ask.size());
+   }
 
   @Override
   @Lock(Lock.NoLock.class)
   public QueueInfo getQueueInfo(String queueName, 
-      boolean includeApplications, boolean includeChildQueues, boolean recursive) 
+      boolean includeChildQueues, boolean recursive) 
   throws IOException {
     Queue queue = null;
 
@@ -412,7 +420,7 @@ implements ResourceScheduler, CapacitySc
     if (queue == null) {
       throw new IOException("Unknown queue: " + queueName);
     }
-    return queue.getQueueInfo(includeApplications, includeChildQueues, recursive);
+    return queue.getQueueInfo(includeChildQueues, recursive);
   }
 
   @Override
@@ -463,11 +471,11 @@ implements ResourceScheduler, CapacitySc
     return completedContainers;
   }
 
-  @Override
-  public synchronized void nodeUpdate(NodeInfo nm, 
+  private synchronized void nodeUpdate(RMNode nm, 
       Map<String,List<Container>> containers ) {
     LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
-
+    SchedulerNode node = this.csNodes.get(nm.getNodeID());
+    node.statusUpdate(containers);
 
     // Completed containers
     processCompletedContainers(getCompletedContainers(containers));
@@ -476,13 +484,15 @@ implements ResourceScheduler, CapacitySc
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
 
-    Application reservedApplication = nm.getReservedApplication();
+    CSNode csNode = this.csNodes.get(nm.getNodeID());
+
+    CSApp reservedApplication = csNode.getReservedApplication();
     if (reservedApplication != null) {
       // Try to fulfill the reservation
       LOG.info("Trying to fulfill reservation for application " + 
           reservedApplication.getApplicationId() + " on node: " + nm);
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      Resource released = queue.assignContainers(clusterResource, nm);
+      Resource released = queue.assignContainers(clusterResource, csNode);
       
       // Is the reservation necessary? If not, release the reservation
       if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
@@ -492,12 +502,12 @@ implements ResourceScheduler, CapacitySc
     }
 
     // Try to schedule more if there are no reservations to fulfill
-    if (nm.getReservedApplication() == null) {
-      root.assignContainers(clusterResource, nm);
+    if (csNode.getReservedApplication() == null) {
+      root.assignContainers(clusterResource, csNode);
     } else {
       LOG.info("Skipping scheduling since node " + nm + 
           " is reserved by application " + 
-          nm.getReservedApplication().getApplicationId());
+          csNode.getReservedApplication().getApplicationId());
     }
 
   }
@@ -507,7 +517,7 @@ implements ResourceScheduler, CapacitySc
     for (Container container : containers) {
       container.setState(ContainerState.COMPLETE);
       LOG.info("Killing running container " + container.getId());
-      Application application = applications.get(container.getId().getAppId());
+      CSApp application = applications.get(container.getId().getAppId());
       processReleasedContainers(application, Collections.singletonList(container));
     }
   }
@@ -516,25 +526,28 @@ implements ResourceScheduler, CapacitySc
   private void processCompletedContainers(
       List<Container> completedContainers) {
     for (Container container: completedContainers) {
-      Application application = getApplication(container.getId().getAppId());
+      processSingleCompletedContainer(container);
+    }
+  }
 
-      // this is possible, since an application can be removed from scheduler 
-      // but the nodemanger is just updating about a completed container.
-      if (application != null) {
-
-        // Inform the queue
-        LeafQueue queue = (LeafQueue)application.getQueue();
-        queue.completedContainer(clusterResource, container, 
-            container.getResource(), application);
-      }
+  private void processSingleCompletedContainer(Container container) {
+    CSApp application = getApplication(this.rmContext.getRMContainers().get(
+        container.getId()).getApplicationAttemptId());
+
+    // this is possible, since an application can be removed from scheduler 
+    // but the nodemanger is just updating about a completed container.
+    if (application != null) {
+
+      // Inform the queue
+      LeafQueue queue = (LeafQueue)application.getQueue();
+      queue.completedContainer(clusterResource, container, 
+          container.getResource(), application);
     }
   }
 
   @Lock(Lock.NoLock.class)
-  private synchronized void processReleasedContainers(Application application,
+  private synchronized void processReleasedContainers(CSApp application,
       List<Container> releasedContainers) {
-    // Inform the application
-    application.releaseContainers(releasedContainers);
 
     // Inform clusterTracker
     List<Container> unusedContainers = new ArrayList<Container>();
@@ -551,17 +564,17 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Lock(CapacityScheduler.class)
-  private void releaseReservedContainers(Application application) {
+  private void releaseReservedContainers(CSApp application) {
     LOG.info("Releasing reservations for completed application: " + 
         application.getApplicationId());
     Queue queue = queues.get(application.getQueue().getQueueName());
-    Map<Priority, Set<NodeInfo>> reservations = application.getAllReservations();
-    for (Map.Entry<Priority, Set<NodeInfo>> e : reservations.entrySet()) {
+    Map<Priority, Set<CSNode>> reservations = application.getAllReservations();
+    for (Map.Entry<Priority, Set<CSNode>> e : reservations.entrySet()) {
       Priority priority = e.getKey();
-      Set<NodeInfo> reservedNodes = new HashSet<NodeInfo>(e.getValue());
-      for (NodeInfo node : reservedNodes) {
+      Set<CSNode> reservedNodes = new HashSet<CSNode>(e.getValue());
+      for (CSNode node : reservedNodes) {
         Resource allocatedResource = 
-          application.getResourceRequest(priority, NodeManagerImpl.ANY).getCapability();
+          application.getResourceRequest(priority, SchedulerNode.ANY).getCapability();
     
         application.unreserveResource(node, priority);
         node.unreserveResource(application, priority);
@@ -572,56 +585,66 @@ implements ResourceScheduler, CapacitySc
   }
   
   @Lock(Lock.NoLock.class)
-  private Application getApplication(ApplicationId applicationId) {
-    return applications.get(applicationId);
+  private CSApp getApplication(ApplicationAttemptId applicationAttemptId) {
+    return applications.get(applicationAttemptId);
+  }
+
+  @Override
+  public Resource getResourceLimit(ApplicationAttemptId applicationAttemptId) {
+    return applications.get(applicationAttemptId).getHeadroom();
   }
 
   @Override
-  public synchronized void handle(ASMEvent<ApplicationTrackerEventType> event) {
+  public synchronized void handle(SchedulerEvent event) {
     switch(event.getType()) {
-    case ADD:
-      /** ignore add since its called sychronously from the applications manager 
-       * 
-       */
+    case NODE_ADDED:
+      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
+      addNode(nodeAddedEvent.getAddedRMNode());
       break;
-    case REMOVE:
-      try {
-        doneApplication(event.getApplication().getApplicationID(), true);
-      } catch(IOException ie) {
-        LOG.error("Error in removing 'done' application", ie);
-        //TODO have to be shutdown the RM in case of this.
-        // do a graceful shutdown.
-      }
+    case NODE_REMOVED:
+      NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
+      removeNode(nodeRemovedEvent.getRemovedRMNode());
       break;
-    case EXPIRE:
-      try {
-        /** do not remove the application. Just do everything else exception 
-         * removing the application
-         */
-        doneApplication(event.getApplication().getApplicationID(), false);
-      } catch(IOException ie) {
-        LOG.error("Error in removing 'expired' application", ie);
-        //TODO have to be shutdown the RM in case of this.
-        // do a graceful shutdown.
+    case NODE_UPDATE:
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+      Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
+      Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
+      for (Map.Entry<ApplicationId, List<Container>> entry : contAppMapping.entrySet()) {
+        conts.put(entry.getKey().toString(), entry.getValue());
       }
+      nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
+      break;
+    case APP_ADDED:
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
+      addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
+          .getQueue(), appAddedEvent.getUser());
+      break;
+    case APP_REMOVED:
+      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+      doneApplication(appRemovedEvent.getApplicationAttemptID(), true);
+      break;
+    case CONTAINER_FINISHED:
+      ContainerFinishedSchedulerEvent containerFinishedEvent = (ContainerFinishedSchedulerEvent) event;
+      Container container = containerFinishedEvent.getContainer();
+      this.rmContext.getRMContainers().remove(container.getId());
+      processSingleCompletedContainer(container);
+      releaseContainer(container.getId().getAppId(), container);
       break;
+    default:
+      LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }
   }
 
-  public synchronized Resource getClusterResource() {
-    return clusterResource;
-  }
-
-  @Override
-  public synchronized void addNode(NodeInfo nodeManager) {
+  private synchronized void addNode(RMNode nodeManager) {
+    this.csNodes.put(nodeManager.getNodeID(), new CSNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     ++numNodeManagers;
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
   }
 
-  @Override
-  public synchronized void removeNode(NodeInfo nodeInfo) {
+  private synchronized void removeNode(RMNode nodeInfo) {
+    CSNode csNode = this.csNodes.remove(nodeInfo.getNodeID());
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     --numNodeManagers;
 
@@ -630,10 +653,10 @@ implements ResourceScheduler, CapacitySc
     killRunningContainers(runningContainers);
     
     // Remove reservations, if any
-    Application reservedApplication = nodeInfo.getReservedApplication();
+    CSApp reservedApplication = csNode.getReservedApplication();
     if (reservedApplication != null) {
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      Resource released = nodeInfo.getReservedResource();
+      Resource released = csNode.getReservedResource();
       queue.completedContainer(clusterResource, null, released, reservedApplication);
     }
     
@@ -641,15 +664,15 @@ implements ResourceScheduler, CapacitySc
         " clusterResource: " + clusterResource);
   }
   
-  @Lock(CapacityScheduler.class)
-  private boolean releaseContainer(ApplicationId applicationId, 
+  private synchronized boolean releaseContainer(ApplicationId applicationId, 
       Container container) {
     // Reap containers
     LOG.info("Application " + applicationId + " released container " + container);
-    return clusterTracker.releaseContainer(container);
+    // TODO:FIXMEVINODKV
+//    node.releaseContainer(container);
+    return true;
   }
 
-
   @Override
   @Lock(Lock.NoLock.class)
   public void recover(RMState state) throws Exception {
@@ -657,7 +680,7 @@ implements ResourceScheduler, CapacitySc
     for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {
       ApplicationId appId = entry.getKey();
       ApplicationInfo appInfo = entry.getValue();
-      Application app = applications.get(appId);
+      CSApp app = applications.get(appId);
       app.allocate(appInfo.getContainers());
       for (Container c: entry.getValue().getContainers()) {
         Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Wed Aug  3 11:31:34 2011
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
 /**
@@ -34,4 +36,6 @@ public interface CapacitySchedulerContex
   ContainerTokenSecretManager getContainerTokenSecretManager();
   
   int getNumClusterNodes();
+
+  RMContext getRMContext();
 }



Mime
View raw message