hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153453 [1/2] - in /hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager: rmcontainer/ scheduler/ scheduler/capacity/ scheduler/fifo/
Date Wed, 03 Aug 2011 11:51:22 GMT
Author: vinodkv
Date: Wed Aug  3 11:51:20 2011
New Revision: 1153453

URL: http://svn.apache.org/viewvc?rev=1153453&view=rev
Log:
CS is done (I hope! :) )

Added:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
Removed:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSApp.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSNode.java
Modified:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Wed Aug  3 11:51:20 2011
@@ -3,6 +3,9 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 public interface RMContainer extends EventHandler<RMContainerEvent> {
@@ -14,5 +17,11 @@ public interface RMContainer extends Eve
   RMContainerState getState();
 
   Container getContainer();
+
+  Resource getReservedResource();
+
+  NodeId getReservedNode();
   
+  Priority getReservedPriority();
+
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Wed Aug  3 11:51:20 2011
@@ -8,6 +8,7 @@ public enum RMContainerEventType {
   // Source: SchedulerApp
   ACQUIRED,
   KILL, // Also from Node on NodeRemoval
+  RESERVED,
 
   LAUNCHED,
   FINISHED,

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Wed Aug  3 11:51:20 2011
@@ -11,6 +11,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@@ -36,6 +38,19 @@ public class RMContainerImpl implements 
         RMContainerEventType.START, new ContainerStartedTransition())
     .addTransition(RMContainerState.NEW, RMContainerState.KILLED,
         RMContainerEventType.KILL)
+    .addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
+        RMContainerEventType.RESERVED, new ContainerReservedTransition())
+
+    // Transitions from RESERVED state
+    .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, 
+        RMContainerEventType.RESERVED, new ContainerReservedTransition())
+    .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, 
+        RMContainerEventType.START, new ContainerStartedTransition())
+    .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, 
+        RMContainerEventType.KILL) // nothing to do
+    .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, 
+        RMContainerEventType.RELEASED) // nothing to do
+       
 
     // Transitions from ALLOCATED state
     .addTransition(RMContainerState.ALLOCATED, RMContainerState.ACQUIRED,
@@ -95,6 +110,10 @@ public class RMContainerImpl implements 
   private final EventHandler eventHandler;
   private final ContainerAllocationExpirer containerAllocationExpirer;
 
+  private Resource reservedResource;
+  private NodeId reservedNode;
+  private Priority reservedPriority;
+
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId,
       EventHandler handler,
@@ -139,6 +158,21 @@ public class RMContainerImpl implements 
   }
 
   @Override
+  public Resource getReservedResource() {
+    return reservedResource;
+  }
+
+  @Override
+  public NodeId getReservedNode() {
+    return reservedNode;
+  }
+
+  @Override
+  public Priority getReservedPriority() {
+    return reservedPriority;
+  }
+  
+  @Override
   public void handle(RMContainerEvent event) {
     LOG.info("Processing " + event.getContainerId() + " of type " + event.getType());
     try {
@@ -171,6 +205,19 @@ public class RMContainerImpl implements 
     }
   }
 
+  private static final class ContainerReservedTransition extends
+  BaseTransition {
+
+    @Override
+    public void transition(RMContainerImpl container, RMContainerEvent event) {
+      RMContainerReservedEvent e = (RMContainerReservedEvent)event;
+      container.reservedResource = e.getReservedResource();
+      container.reservedNode = e.getReservedNode();
+      container.reservedPriority = e.getReservedPriority();
+    }
+  }
+
+
   private static final class ContainerStartedTransition extends
       BaseTransition {
 
@@ -179,7 +226,7 @@ public class RMContainerImpl implements 
       container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
           container.appAttemptId, container.container));
     }
-}
+  }
 
   private static final class AcquiredTransition extends BaseTransition {
 

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java?rev=1153453&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java Wed Aug  3 11:51:20 2011
@@ -0,0 +1,41 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * The event signifying that a container has been reserved.
+ * 
+ * The event encapsulates information on the amount of reservation
+ * and the node on which the reservation is in effect.
+ */
+public class RMContainerReservedEvent extends RMContainerEvent {
+
+  private final Resource reservedResource;
+  private final NodeId reservedNode;
+  private final Priority reservedPriority;
+  
+  public RMContainerReservedEvent(ContainerId containerId,
+      Resource reservedResource, NodeId reservedNode, 
+      Priority reservedPriority) {
+    super(containerId, RMContainerEventType.RESERVED);
+    this.reservedResource = reservedResource;
+    this.reservedNode = reservedNode;
+    this.reservedPriority = reservedPriority;
+  }
+
+  public Resource getReservedResource() {
+    return reservedResource;
+  }
+
+  public NodeId getReservedNode() {
+    return reservedNode;
+  }
+
+  public Priority getReservedPriority() {
+    return reservedPriority;
+  }
+
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java Wed Aug  3 11:51:20 2011
@@ -1,5 +1,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 public enum RMContainerState {
-  NEW, ALLOCATED, ACQUIRED, RUNNING, COMPLETED, EXPIRED, RELEASED, KILLED
+  NEW, 
+  RESERVED, 
+  ALLOCATED, 
+  ACQUIRED, 
+  RUNNING, 
+  COMPLETED, 
+  EXPIRED, 
+  RELEASED, 
+  KILLED
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Aug  3 11:51:20 2011
@@ -18,11 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,8 +32,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -74,11 +69,7 @@ public class AppSchedulingInfo {
 
   private final ApplicationStore store;
 
-  /* Current consumption */
-  List<Container> acquired = new ArrayList<Container>();
-  List<Container> completedContainers = new ArrayList<Container>();
   /* Allocated by scheduler */
-  List<Container> allocated = new ArrayList<Container>();
   boolean pending = true; // for app metrics
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
@@ -124,38 +115,6 @@ public class AppSchedulingInfo {
   }
 
   /**
-   * the currently acquired/allocated containers by the application masters.
-   * 
-   * @return the current containers being used by the application masters.
-   */
-  public synchronized List<Container> getCurrentContainers() {
-    List<Container> currentContainers = new ArrayList<Container>(acquired);
-    currentContainers.addAll(allocated);
-    return currentContainers;
-  }
-
-  /**
-   * The ApplicationMaster is acquiring the allocated/completed resources.
-   * 
-   * @return allocated resources
-   */
-  synchronized private List<Container> acquire() {
-    // Return allocated containers
-    acquired.addAll(allocated);
-    List<Container> heartbeatContainers = allocated;
-    allocated = new ArrayList<Container>();
-
-    LOG.info("acquire:" + " application=" + applicationId + " #acquired="
-        + heartbeatContainers.size());
-    heartbeatContainers = (heartbeatContainers == null) ? new ArrayList<Container>()
-        : heartbeatContainers;
-
-    heartbeatContainers.addAll(completedContainers);
-    completedContainers.clear();
-    return heartbeatContainers;
-  }
-
-  /**
    * The ApplicationMaster is updating resource requirements for the
    * application, by asking for more resources and releasing resources acquired
    * by the application.
@@ -204,24 +163,6 @@ public class AppSchedulingInfo {
     }
   }
 
-  private synchronized void releaseContainers(List<Container> release) {
-    // Release containers and update consumption
-    for (Container container : release) {
-      LOG.debug("update: " + "application=" + applicationId + " released="
-          + container);
-      // TOday in all code paths, this is taken by completedContainer called by
-      // the caller. So commenting this.
-      // Resources.subtractFrom(currentConsumption, container.getResource());
-      for (Iterator<Container> i = acquired.iterator(); i.hasNext();) {
-        Container c = i.next();
-        if (c.getId().equals(container.getId())) {
-          i.remove();
-          LOG.info("Removed acquired container: " + container.getId());
-        }
-      }
-    }
-  }
-
   synchronized public Collection<Priority> getPriorities() {
     return priorities;
   }
@@ -242,15 +183,6 @@ public class AppSchedulingInfo {
     return request.getCapability();
   }
 
-  synchronized private void completedContainer(Container container, 
-      Resource containerResource) {
-    if (container != null) {
-      LOG.info("Completed container: " + container);
-      completedContainers.add(container);
-    }
-    queue.getMetrics().releaseResources(user, 1, containerResource);
-  }
-
   /**
    * Resources have been allocated to this application by the resource
    * scheduler. Track them.
@@ -263,17 +195,17 @@ public class AppSchedulingInfo {
    *          the priority of the request.
    * @param request
    *          the request
-   * @param containers
+   * @param container
    *          the containers allocated.
    */
   synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, List<Container> containers) {
+      Priority priority, ResourceRequest request, Container container) {
     if (type == NodeType.DATA_LOCAL) {
-      allocateNodeLocal(node, priority, request, containers);
+      allocateNodeLocal(node, priority, request, container);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, containers);
+      allocateRackLocal(node, priority, request, container);
     } else {
-      allocateOffSwitch(node, priority, request, containers);
+      allocateOffSwitch(node, priority, request, container);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -284,7 +216,7 @@ public class AppSchedulingInfo {
     }
     LOG.debug("allocate: user: " + user + ", memory: "
         + request.getCapability());
-    metrics.allocateResources(user, containers.size(), request.getCapability());
+    metrics.allocateResources(user, 1, request.getCapability());
   }
 
   /**
@@ -295,21 +227,19 @@ public class AppSchedulingInfo {
    *          resources allocated to the application
    */
   synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority,
-      ResourceRequest nodeLocalRequest, List<Container> containers) {
+      ResourceRequest nodeLocalRequest, Container container) {
     // Update consumption and track allocations
-    allocate(containers);
+    allocate(container);
 
     // Update future requirements
-    nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers()
-        - containers.size());
+    nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getNodeAddress());
     }
 
     ResourceRequest rackLocalRequest = requests.get(priority).get(
         node.getRackName());
-    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
-        - containers.size());
+    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
@@ -317,8 +247,7 @@ public class AppSchedulingInfo {
     // Do not remove ANY
     ResourceRequest offSwitchRequest = requests.get(priority).get(
         RMNode.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
-        - containers.size());
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
   }
 
   /**
@@ -329,14 +258,13 @@ public class AppSchedulingInfo {
    *          resources allocated to the application
    */
   synchronized private void allocateRackLocal(SchedulerNode node, Priority priority,
-      ResourceRequest rackLocalRequest, List<Container> containers) {
+      ResourceRequest rackLocalRequest, Container container) {
 
     // Update consumption and track allocations
-    allocate(containers);
+    allocate(container);
 
     // Update future requirements
-    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
-        - containers.size());
+    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
@@ -344,8 +272,7 @@ public class AppSchedulingInfo {
     // Do not remove ANY
     ResourceRequest offSwitchRequest = requests.get(priority).get(
         RMNode.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
-        - containers.size());
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
   }
 
   /**
@@ -356,33 +283,29 @@ public class AppSchedulingInfo {
    *          resources allocated to the application
    */
   synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority,
-      ResourceRequest offSwitchRequest, List<Container> containers) {
+      ResourceRequest offSwitchRequest, Container container) {
 
     // Update consumption and track allocations
-    allocate(containers);
+    allocate(container);
 
     // Update future requirements
 
     // Do not remove ANY
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
-        - containers.size());
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
   }
 
-  synchronized private void allocate(List<Container> containers) {
+  synchronized private void allocate(Container container) {
     // Update consumption and track allocations
-    for (Container container : containers) {
-
-      allocated.add(container);
-      //TODO: fixme sharad
-     /* try {
+    //TODO: fixme sharad
+    /* try {
         store.storeContainer(container);
       } catch (IOException ie) {
         // TODO fix this. we shouldnt ignore
       }*/
-      LOG.debug("allocate: applicationId=" + applicationId + " container="
-          + container.getId() + " host="
-          + container.getNodeId().toString());
-    }
+    
+    LOG.debug("allocate: applicationId=" + applicationId + " container="
+        + container.getId() + " host="
+        + container.getNodeId().toString());
   }
 
   synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java Wed Aug  3 11:51:20 2011
@@ -12,17 +12,22 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class SchedulerApp {
 
@@ -44,7 +49,18 @@ public class SchedulerApp {
   private List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
 
-  public SchedulerApp(AppSchedulingInfo application, Queue queue) {
+  final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
+      new HashMap<Priority, Map<NodeId, RMContainer>>();
+  
+  Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
+
+  final Resource currentReservation = recordFactory
+      .newRecordInstance(Resource.class);
+
+  private final RMContext rmContext;
+  public SchedulerApp(RMContext rmContext, 
+      AppSchedulingInfo application, Queue queue) {
+    this.rmContext = rmContext;
     this.appSchedulingInfo = application;
     this.queue = queue;
     application.setQueue(queue);
@@ -75,11 +91,6 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getNewContainerId();
   }
   
-  @Deprecated
-  public List<Container> getCurrentContainers() {
-    return this.appSchedulingInfo.getCurrentContainers();
-  }
-  
   public Collection<Priority> getPriorities() {
     return this.appSchedulingInfo.getPriorities();
   }
@@ -88,6 +99,10 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress);
   }
 
+  public synchronized int getTotalRequiredResources(Priority priority) {
+    return getResourceRequest(priority, RMNode.ANY).getNumContainers();
+  }
+  
   public Resource getResource(Priority priority) {
     return this.appSchedulingInfo.getResource(priority);
   }
@@ -100,10 +115,6 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getQueueName();
   }
 
-  public Queue getQueue() {
-    return this.queue;
-  }
-
   public synchronized Collection<RMContainer> getLiveContainers() {
     return new ArrayList<RMContainer>(liveContainers.values());
   }
@@ -126,61 +137,66 @@ public class SchedulerApp {
       SchedulerApp application) {
   }
 
-  synchronized public void containerCompleted(Container cont,
+  synchronized public void containerCompleted(RMContainer rmContainer,
       RMContainerEventType event) {
-    ContainerId containerId = cont.getId();
+    
+    Container container = rmContainer.getContainer();
+    ContainerId containerId = container.getId();
+    
     // Inform the container
-    RMContainer container = getRMContainer(containerId);
-
-    if (container == null) {
-      LOG.error("Invalid container completed " + cont.getId());
-      return;
-    }
-
     if (event.equals(RMContainerEventType.FINISHED)) {
       // Have to send diagnostics for finished containers.
-      container.handle(new RMContainerFinishedEvent(containerId,
-          cont.getContainerStatus()));
+      rmContainer.handle(new RMContainerFinishedEvent(containerId,
+          container.getContainerStatus()));
     } else {
-      container.handle(new RMContainerEvent(containerId, event));
+      rmContainer.handle(new RMContainerEvent(containerId, event));
     }
-    LOG.info("Completed container: " + container.getContainerId() + 
-        " in state: " + container.getState());
+    LOG.info("Completed container: " + rmContainer.getContainerId() + 
+        " in state: " + rmContainer.getState());
     
     // Remove from the list of containers
-    liveContainers.remove(container.getContainerId());
+    liveContainers.remove(rmContainer.getContainerId());
     
     // Update usage metrics 
-    Resource containerResource = container.getContainer().getResource();
+    Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
   }
 
-  synchronized public void allocate(NodeType type, SchedulerNode node,
+  synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
       Priority priority, ResourceRequest request, 
-      List<RMContainer> containers) {
-    // Update consumption and track allocations
-    List<Container> allocatedContainers = 
-        new ArrayList<Container>();
-    for (RMContainer container : containers) {
-      Container c = container.getContainer();
-      // Inform the container
-      container.handle(
-          new RMContainerEvent(c.getId(), RMContainerEventType.START));
-      allocatedContainers.add(c);
-      
-      Resources.addTo(currentConsumption, c.getResource());
-      LOG.debug("allocate: applicationId=" + c.getId().getAppId()
-          + " container=" + c.getId() + " host="
-          + c.getNodeId().toString());
-      
-      // Add it to allContainers list.
-      newlyAllocatedContainers.add(container);
-      liveContainers.put(c.getId(), container);
+      Container container) {
+    
+    // Required sanity check - AM can call 'allocate' to update resource 
+    // request without locking the scheduler, hence we need to check
+    if (getTotalRequiredResources(priority) <= 0) {
+      return null;
     }
     
-    appSchedulingInfo.allocate(type, node, priority, 
-        request, allocatedContainers);
+    // Create RMContainer
+    RMContainer rmContainer = new RMContainerImpl(container, this
+        .getApplicationAttemptId(), node.getNodeID(), this.rmContext
+        .getDispatcher().getEventHandler(), this.rmContext
+        .getContainerAllocationExpirer());
+
+    // Update consumption and track allocations
+    
+    // Inform the container
+    rmContainer.handle(
+        new RMContainerEvent(container.getId(), RMContainerEventType.START));
+
+    Resources.addTo(currentConsumption, container.getResource());
+    LOG.debug("allocate: applicationId=" + container.getId().getAppId()
+        + " container=" + container.getId() + " host="
+        + container.getNodeId().toString());
+
+    // Add it to allContainers list.
+    newlyAllocatedContainers.add(rmContainer);
+    liveContainers.put(container.getId(), rmContainer);
+    
+    appSchedulingInfo.allocate(type, node, priority, request, container);
+    
+    return rmContainer;
   }
   
   synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -214,18 +230,150 @@ public class SchedulerApp {
         }
       }
     }
+    // TODO - Remove block
+    for (Priority priority : getPriorities()) {
+      Map<String, ResourceRequest> requests = getResourceRequests(priority);
+      if (requests != null) {
+        LOG.info("showRequests:" + " application=" + getApplicationId() + 
+            " headRoom=" + getHeadroom() + 
+            " currentConsumption=" + currentConsumption.getMemory());
+        for (ResourceRequest request : requests.values()) {
+          LOG.info("showRequests:" + " application=" + getApplicationId()
+              + " request=" + request);
+        }
+      }
+    }
+
   }
 
   public synchronized void setAvailableResourceLimit(Resource globalLimit) {
     this.resourceLimit = globalLimit; 
   }
 
+  public synchronized RMContainer getRMContainer(ContainerId id) {
+    return liveContainers.get(id);
+  }
+
+  synchronized public void resetSchedulingOpportunities(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    schedulingOpportunities = 0;
+    this.schedulingOpportunities.put(priority, schedulingOpportunities);
+  }
+
+  synchronized public void addSchedulingOpportunity(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    if (schedulingOpportunities == null) {
+      schedulingOpportunities = 0;
+    }
+    ++schedulingOpportunities;
+    this.schedulingOpportunities.put(priority, schedulingOpportunities);
+  }
+
+  synchronized public int getSchedulingOpportunities(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    if (schedulingOpportunities == null) {
+      schedulingOpportunities = 0;
+      this.schedulingOpportunities.put(priority, schedulingOpportunities);
+    }
+    return schedulingOpportunities;
+  }
+
+  public synchronized int getNumReservedContainers(Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    return (reservedContainers == null) ? 0 : reservedContainers.size();
+  }
+
+  public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+      RMContainer rmContainer, Container container) {
+    // Create RMContainer if necessary
+    if (rmContainer == null) {
+        rmContainer = 
+            new RMContainerImpl(container, getApplicationAttemptId(), 
+                node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
+                rmContext.getContainerAllocationExpirer());
+    }
+    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
+        container.getResource(), node.getNodeID(), priority));
+    
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers == null) {
+      reservedContainers = new HashMap<NodeId, RMContainer>();
+      this.reservedContainers.put(priority, reservedContainers);
+    }
+    reservedContainers.put(node.getNodeID(), rmContainer);
+    
+    Resources.add(currentReservation, container.getResource());
+    
+    LOG.info("Application " + getApplicationId() 
+        + " reserved container " + rmContainer
+        + " on node " + node + ", currently has " + reservedContainers.size()
+        + " at priority " + priority 
+        + "; currentReservation " + currentReservation);
+    
+    return rmContainer;
+  }
+
+  public synchronized void unreserve(SchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
+    if (reservedContainers.isEmpty()) {
+      this.reservedContainers.remove(priority);
+    }
+    
+    Resource resource = reservedContainer.getContainer().getResource();
+    Resources.subtract(currentReservation, resource);
+
+    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+        + node + ", currently has " + reservedContainers.size() + " at priority "
+        + priority + "; currentReservation " + currentReservation);
+  }
+
+  /**
+   * Has the application reserved the given <code>node</code> at the
+   * given <code>priority</code>?
+   * @param node node to be checked
+   * @param priority priority of reserved container
+   * @return
+   */
+  public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers != null) {
+      return reservedContainers.containsKey(node.getNodeID());
+    }
+    return false;
+  }
+
+  public synchronized float getLocalityWaitFactor(
+      Priority priority, int clusterNodes) {
+    // Estimate: Required unique resources (i.e. hosts + racks)
+    int requiredResources = 
+        Math.max(this.getResourceRequests(priority).size() - 1, 0);
+    return ((float) requiredResources / clusterNodes);
+  }
+
+  public synchronized List<RMContainer> getAllReservedContainers() {
+    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
+      this.reservedContainers.entrySet()) {
+      reservedContainers.addAll(e.getValue().values());
+    }
+    return reservedContainers;
+  }
+  
   /**
    * Get available headroom in terms of resources for the application's user.
    * @return available resource headroom
    */
   public synchronized Resource getHeadroom() {
     Resource limit = Resources.subtract(resourceLimit, currentConsumption);
+    Resources.subtractFrom(limit, currentReservation);
 
     // Corner case to deal with applications being slightly over-limit
     if (limit.getMemory() < 0) {
@@ -235,7 +383,8 @@ public class SchedulerApp {
     return limit;
   }
 
-  public synchronized RMContainer getRMContainer(ContainerId id) {
-    return liveContainers.get(id);
+  public Queue getQueue() {
+    return queue;
   }
+
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Wed Aug  3 11:51:20 2011
@@ -7,14 +7,17 @@ import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class SchedulerNode {
@@ -29,9 +32,11 @@ public class SchedulerNode {
 
   private volatile int numContainers;
 
+  private RMContainer reservedContainer;
+  
   /* set of containers that are allocated containers */
-  private final Map<ContainerId, Container> launchedContainers = 
-    new TreeMap<ContainerId, Container>();
+  private final Map<ContainerId, RMContainer> launchedContainers = 
+    new TreeMap<ContainerId, RMContainer>();
   
   private final RMNode rmNode;
 
@@ -70,11 +75,12 @@ public class SchedulerNode {
    * @param containers allocated containers
    */
   public synchronized void allocateContainer(ApplicationId applicationId, 
-      Container container) {
+      RMContainer rmContainer) {
+    Container container = rmContainer.getContainer();
     deductAvailableResource(container.getResource());
     ++numContainers;
     
-    launchedContainers.put(container.getId(), container);
+    launchedContainers.put(container.getId(), rmContainer);
     LOG.info("Allocated container " + container.getId() + 
         " to node " + rmNode.getNodeAddress());
     
@@ -115,7 +121,6 @@ public class SchedulerNode {
     }
 
     /* remove the containers from the nodemanger */
-    
     launchedContainers.remove(container.getId());
     updateResource(container);
 
@@ -157,7 +162,63 @@ public class SchedulerNode {
     return numContainers;
   }
 
-  public synchronized List<Container> getRunningContainers() {
-    return new ArrayList<Container>(launchedContainers.values());
+  public synchronized List<RMContainer> getRunningContainers() {
+    return new ArrayList<RMContainer>(launchedContainers.values());
+  }
+
+  public synchronized void reserveResource(
+      SchedulerApp application, Priority priority, 
+      RMContainer reservedContainer) {
+    // Check if it's already reserved
+    if (this.reservedContainer != null) {
+      // Sanity check
+      if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+        throw new IllegalStateException("Trying to reserve" +
+            " container " + reservedContainer +
+            " on node " + reservedContainer.getReservedNode() + 
+            " when currently" + " reserved resource " + this.reservedContainer +
+            " on node " + this.reservedContainer.getReservedNode());
+      }
+      
+      // Cannot reserve more than one application on a given node!
+      if (!this.reservedContainer.getContainer().getId().getAppAttemptId().equals(
+          reservedContainer.getContainer().getId().getAppAttemptId())) {
+        throw new IllegalStateException("Trying to reserve" +
+        		" container " + reservedContainer + 
+            " for application " + application.getApplicationId() + 
+            " when currently" +
+            " reserved container " + this.reservedContainer +
+            " on node " + this);
+      }
+
+      LOG.info("Updated reserved container " + 
+          reservedContainer.getContainer().getId() + " on node " + 
+          this + " for application " + application);
+    } else {
+      LOG.info("Reserved container " + reservedContainer.getContainer().getId() + 
+          " on node " + this + " for application " + application);
+    }
+    this.reservedContainer = reservedContainer;
+  }
+
+  public synchronized void unreserveResource(SchedulerApp application) {
+    // Cannot unreserve for wrong application...
+    ApplicationAttemptId reservedApplication = 
+        reservedContainer.getContainer().getId().getAppAttemptId(); 
+    if (!reservedApplication.equals(
+        application.getApplicationAttemptId())) {
+      throw new IllegalStateException("Trying to unreserve " +  
+          " for application " + application.getApplicationId() + 
+          " when currently reserved " + 
+          " for application " + reservedApplication.getApplicationId() + 
+          " on node " + this);
+    }
+    
+    reservedContainer = null;
+  }
+
+  public synchronized RMContainer getReservedContainer() {
+    return reservedContainer;
   }
+
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Aug  3 11:51:20 2011
@@ -20,14 +20,10 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -38,14 +34,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,14 +53,18 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -97,10 +96,10 @@ implements ResourceScheduler, CapacitySc
     }
   };
 
-  private final Comparator<CSApp> applicationComparator = 
-    new Comparator<CSApp>() {
+  private final Comparator<SchedulerApp> applicationComparator = 
+    new Comparator<SchedulerApp>() {
     @Override
-    public int compare(CSApp a1, CSApp a2) {
+    public int compare(SchedulerApp a1, SchedulerApp a2) {
       return a1.getApplicationId().getId() - a2.getApplicationId().getId();
     }
   };
@@ -111,7 +110,8 @@ implements ResourceScheduler, CapacitySc
 
   private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
 
-  private Map<NodeId, CSNode> csNodes = new ConcurrentHashMap<NodeId, CSNode>();
+  private Map<NodeId, SchedulerNode> nodes = 
+      new ConcurrentHashMap<NodeId, SchedulerNode>();
 
   private Resource clusterResource = 
     RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
@@ -120,8 +120,8 @@ implements ResourceScheduler, CapacitySc
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  private Map<ApplicationAttemptId, CSApp> applications = Collections
-      .synchronizedMap(new TreeMap<ApplicationAttemptId, CSApp>());
+  private Map<ApplicationAttemptId, SchedulerApp> applications = 
+      new ConcurrentHashMap<ApplicationAttemptId, SchedulerApp>();
 
   private boolean initialized = false;
 
@@ -150,11 +150,11 @@ implements ResourceScheduler, CapacitySc
   }
 
   public synchronized Resource getUsedResource(NodeId nodeId) {
-    return csNodes.get(nodeId).getUsedResource();
+    return nodes.get(nodeId).getUsedResource();
   }
 
   public synchronized Resource getAvailableResource(NodeId nodeId) {
-    return csNodes.get(nodeId).getAvailableResource();
+    return nodes.get(nodeId).getAvailableResource();
   }
 
   public synchronized int getNumClusterNodes() {
@@ -316,19 +316,20 @@ implements ResourceScheduler, CapacitySc
 
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
         applicationAttemptId, queueName, user, null);
-    CSApp csApp = new CSApp(appSchedulingInfo, queue);
+    SchedulerApp SchedulerApp = 
+        new SchedulerApp(this.rmContext, appSchedulingInfo, queue);
 
     // Submit to the queue
     try {
-      queue.submitApplication(csApp, user, queueName);
+      queue.submitApplication(SchedulerApp, user, queueName);
     } catch (AccessControlException ace) {
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId, StringUtils
-              .stringifyException(ace)));
+          new RMAppAttemptRejectedEvent(applicationAttemptId, 
+              ace.toString()));
       return;
     }
 
-    applications.put(applicationAttemptId, csApp);
+    applications.put(applicationAttemptId, SchedulerApp);
 
     LOG.info("Application Submission: " + applicationAttemptId + 
         ", user: " + user +
@@ -346,7 +347,7 @@ implements ResourceScheduler, CapacitySc
     LOG.info("Application " + applicationAttemptId + " is done." +
     		" finalState=" + rmAppAttemptFinalState);
     
-    CSApp application = getApplication(applicationAttemptId);
+    SchedulerApp application = getApplication(applicationAttemptId);
 
     if (application == null) {
       //      throw new IOException("Unknown application " + applicationId + 
@@ -356,10 +357,14 @@ implements ResourceScheduler, CapacitySc
     }
     
     // Release all the running containers 
-    processReleasedContainers(application, application.getCurrentContainers());
+    for (RMContainer rmContainer : application.getLiveContainers()) {
+      completedContainer(rmContainer, RMContainerEventType.KILL);
+    }
     
      // Release all reserved containers
-    releaseReservedContainers(application);
+    for (RMContainer rmContainer : application.getAllReservedContainers()) {
+      completedContainer(rmContainer, RMContainerEventType.KILL);
+    }
     
     // Clean up pending requests, metrics etc.
     application.stop(rmAppAttemptFinalState);
@@ -386,7 +391,7 @@ implements ResourceScheduler, CapacitySc
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
       List<ResourceRequest> ask, List<Container> release) {
 
-    CSApp application = getApplication(applicationAttemptId);
+    SchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -396,6 +401,12 @@ implements ResourceScheduler, CapacitySc
     // Sanity check
     normalizeRequests(ask);
 
+    // Release containers
+    for (Container releasedContainer : release) {
+      completedContainer(getRMContainer(releasedContainer), 
+          RMContainerEventType.RELEASED);
+    }
+
     synchronized (application) {
 
       LOG.info("DEBUG --- allocate: pre-update" +
@@ -465,154 +476,82 @@ implements ResourceScheduler, CapacitySc
         minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
   }
 
-  @Lock(CapacityScheduler.class)
-  private List<Container> getCompletedContainers(
-      Map<String, List<Container>> allContainers) {
-    if (allContainers == null) {
-      return new ArrayList<Container>();
-    }
-    List<Container> completedContainers = new ArrayList<Container>();
-    // Iterate through the running containers and update their status
-    for (Map.Entry<String, List<Container>> e : 
-      allContainers.entrySet()) {
-      for (Container c: e.getValue()) {
-        if (c.getState() == ContainerState.COMPLETE) {
-          completedContainers.add(c);
-        }
-      }
-    }
-    return completedContainers;
-  }
-
   private synchronized void nodeUpdate(RMNode nm, 
       Map<String,List<Container>> containers ) {
     LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
-    SchedulerNode node = this.csNodes.get(nm.getNodeID());
-    //TODO node.statusUpdate(containers);
-
-    // Completed containers
-    processCompletedContainers(getCompletedContainers(containers));
+    
+    SchedulerNode node = getNode(nm.getNodeID());
+    for (List<Container> appContainers : containers.values()) {
+      for (Container container : appContainers) {
+        if (container.getState() == ContainerState.RUNNING) {
+          containerLaunchedOnNode(container, node);
+        } else { // has to be 'COMPLETE'
+          LOG.info("DEBUG --- Container FINISHED: " + container.getId());
+          completedContainer(getRMContainer(container), 
+              RMContainerEventType.FINISHED);
+        }
+      }
+    }
 
-    // Assign new containers
+    // Assign new containers...
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
 
-    CSNode csNode = this.csNodes.get(nm.getNodeID());
-
-    CSApp reservedApplication = csNode.getReservedApplication();
-    if (reservedApplication != null) {
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      SchedulerApp reservedApplication = 
+          getApplication(reservedContainer.getApplicationAttemptId());
+      
       // Try to fulfill the reservation
       LOG.info("Trying to fulfill reservation for application " + 
           reservedApplication.getApplicationId() + " on node: " + nm);
-      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      Resource released = queue.assignContainers(clusterResource, csNode);
       
-      // Is the reservation necessary? If not, release the reservation
-      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
-          released, org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
-        queue.completedContainer(clusterResource, null, released, reservedApplication);
-      }
+      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
+      queue.assignContainers(clusterResource, node);
     }
 
     // Try to schedule more if there are no reservations to fulfill
-    if (csNode.getReservedApplication() == null) {
-      root.assignContainers(clusterResource, csNode);
+    if (node.getReservedContainer() == null) {
+      root.assignContainers(clusterResource, node);
     } else {
       LOG.info("Skipping scheduling since node " + nm + 
           " is reserved by application " + 
-          csNode.getReservedApplication().getApplicationId());
-    }
-
-  }
-
-  @Lock(CapacityScheduler.class)
-  private void killRunningContainers(List<Container> containers) {
-    for (Container container : containers) {
-      container.setState(ContainerState.COMPLETE);
-      LOG.info("Killing running container " + container.getId());
-      CSApp application = applications.get(container.getId().getAppId());
-      processReleasedContainers(application, Collections.singletonList(container));
-    }
-  }
-  
-  @Lock(Lock.NoLock.class)
-  private void processCompletedContainers(
-      List<Container> completedContainers) {
-    for (Container container: completedContainers) {
-      processSingleCompletedContainer(container);
-    }
-  }
-
-  private void processSingleCompletedContainer(Container container) {
-    CSApp application = getApplication(container.getId().getAppAttemptId());
-
-    // this is possible, since an application can be removed from scheduler 
-    // but the nodemanger is just updating about a completed container.
-    if (application != null) {
-
-      // Inform the queue
-      LeafQueue queue = (LeafQueue)application.getQueue();
-      queue.completedContainer(clusterResource, container, 
-          container.getResource(), application);
-    }
-  }
-
-  @Lock(Lock.NoLock.class)
-  private synchronized void processReleasedContainers(CSApp application,
-      List<Container> releasedContainers) {
-
-    // Inform clusterTracker
-    List<Container> unusedContainers = new ArrayList<Container>();
-    for (Container container : releasedContainers) {
-      if (releaseContainer(
-          application.getApplicationId(), 
-          container)) {
-        unusedContainers.add(container);
-      }
+          node.getReservedContainer().getContainerId().getAppId());
     }
 
-    // Update queue capacities
-    processCompletedContainers(unusedContainers);
   }
 
-  @Lock(CapacityScheduler.class)
-  private void releaseReservedContainers(CSApp application) {
-    LOG.info("Releasing reservations for completed application: " + 
-        application.getApplicationId());
-    Queue queue = queues.get(application.getQueue().getQueueName());
-    Map<Priority, Set<CSNode>> reservations = application.getAllReservations();
-    for (Map.Entry<Priority, Set<CSNode>> e : reservations.entrySet()) {
-      Priority priority = e.getKey();
-      Set<CSNode> reservedNodes = new HashSet<CSNode>(e.getValue());
-      for (CSNode node : reservedNodes) {
-        Resource allocatedResource = 
-          application.getResourceRequest(priority, SchedulerNode.ANY).getCapability();
-    
-        application.unreserveResource(node, priority);
-        node.unreserveResource(application, priority);
-        
-        queue.completedContainer(clusterResource, null, allocatedResource, application);
-      }
+  private void containerLaunchedOnNode(Container container, SchedulerNode node) {
+    // Get the application for the finished container
+    ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+    SchedulerApp application = getApplication(applicationAttemptId);
+    if (application == null) {
+      LOG.info("Unknown application: " + applicationAttemptId + 
+          " launched container " + container.getId() +
+          " on node: " + node);
+      return;
     }
-  }
-  
-  @Lock(Lock.NoLock.class)
-  private CSApp getApplication(ApplicationAttemptId applicationAttemptId) {
-    return applications.get(applicationAttemptId);
+    
+    application.containerLaunchedOnNode(container.getId());
   }
 
   @Override
-  public synchronized void handle(SchedulerEvent event) {
+  public void handle(SchedulerEvent event) {
     switch(event.getType()) {
     case NODE_ADDED:
+    {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
-      break;
+    }
+    break;
     case NODE_REMOVED:
+    {
       NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
       removeNode(nodeRemovedEvent.getRemovedRMNode());
-      break;
+    }
+    break;
     case NODE_UPDATE:
+    {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
       Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
       Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
@@ -620,24 +559,37 @@ implements ResourceScheduler, CapacitySc
         conts.put(entry.getKey().toString(), entry.getValue());
       }
       nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
-      break;
+    }
+    break;
     case APP_ADDED:
+    {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
       addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
           .getQueue(), appAddedEvent.getUser());
-      break;
+    }
+    break;
     case APP_REMOVED:
+    {
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
       doneApplication(appRemovedEvent.getApplicationAttemptID(),
           appRemovedEvent.getFinalAttemptState());
-      break;
+    }
+    break;
+    case CONTAINER_EXPIRED:
+    {
+      ContainerExpiredSchedulerEvent containerExpiredEvent = 
+          (ContainerExpiredSchedulerEvent) event;
+      completedContainer(getRMContainer(containerExpiredEvent.getContainer()), 
+          RMContainerEventType.EXPIRE);
+    }
+    break;
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.csNodes.put(nodeManager.getNodeID(), new CSNode(nodeManager));
+    this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     ++numNodeManagers;
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
@@ -645,43 +597,86 @@ implements ResourceScheduler, CapacitySc
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {
-    CSNode csNode = this.csNodes.remove(nodeInfo.getNodeID());
+    SchedulerNode node = this.nodes.remove(nodeInfo.getNodeID());
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     --numNodeManagers;
 
     // Remove running containers
-    List<Container> runningContainers = null;//TODO = nodeInfo.getRunningContainers();
-    killRunningContainers(runningContainers);
+    List<RMContainer> runningContainers = node.getRunningContainers();
+    for (RMContainer container : runningContainers) {
+      completedContainer(container, RMContainerEventType.KILL);
+    }
     
     // Remove reservations, if any
-    CSApp reservedApplication = csNode.getReservedApplication();
-    if (reservedApplication != null) {
-      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      Resource released = csNode.getReservedResource();
-      queue.completedContainer(clusterResource, null, released, reservedApplication);
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      completedContainer(reservedContainer, RMContainerEventType.KILL);
     }
     
     LOG.info("Removed node " + nodeInfo.getNodeAddress() + 
         " clusterResource: " + clusterResource);
   }
   
-  private synchronized boolean releaseContainer(ApplicationId applicationId, 
-      Container container) {
-    // Reap containers
-    LOG.info("Application " + applicationId + " released container " + container);
-    csNodes.get(container.getNodeId()).releaseContainer(container);
-    return true;
+  @Lock(CapacityScheduler.class)
+  private synchronized void completedContainer(RMContainer rmContainer, 
+      RMContainerEventType event) {
+    if (rmContainer == null) {
+      LOG.info("Null container completed...");
+      return;
+    }
+    
+    Container container = rmContainer.getContainer();
+    
+    // Get the application for the finished container
+    ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+    SchedulerApp application = getApplication(applicationAttemptId);
+    if (application == null) {
+      LOG.info("Container " + container + " of" +
+      		" unknown application " + applicationAttemptId + 
+          " completed with event " + event);
+      return;
+    }
+    
+    // Get the node on which the container was allocated
+    SchedulerNode node = getNode(container.getNodeId());
+    
+    // Inform the queue
+    LeafQueue queue = (LeafQueue)application.getQueue();
+    queue.completedContainer(clusterResource, application, node, 
+        rmContainer, event);
+
+    LOG.info("Application " + applicationAttemptId + 
+        " released container " + container.getId() +
+        " on node: " + node + 
+        " with event: " + event);
+  }
+
+  @Lock(Lock.NoLock.class)
+  private SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
+    return applications.get(applicationAttemptId);
+  }
+
+  @Lock(Lock.NoLock.class)
+  private SchedulerNode getNode(NodeId nodeId) {
+    return nodes.get(nodeId);
+  }
+
+  private RMContainer getRMContainer(Container container) {
+    ContainerId containerId = container.getId();
+    SchedulerApp application = 
+        getApplication(container.getId().getAppAttemptId());
+    return (application == null) ? null : application.getRMContainer(containerId);
   }
 
   @Override
   @Lock(Lock.NoLock.class)
   public void recover(RMState state) throws Exception {
-    // TODO: VINDOKVFIXME
+    // TODO: VINDOKVFIXME recovery
 //    applications.clear();
 //    for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {
 //      ApplicationId appId = entry.getKey();
 //      ApplicationInfo appInfo = entry.getValue();
-//      CSApp app = applications.get(appId);
+//      SchedulerApp app = applications.get(appId);
 //      app.allocate(appInfo.getContainers());
 //      for (Container c: entry.getValue().getContainers()) {
 //        Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());
@@ -692,7 +687,9 @@ implements ResourceScheduler, CapacitySc
 
   @Override
   public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    // TODO Auto-generated method stub
-    return null;
+    SchedulerNode node = getNode(nodeId);
+    return new SchedulerNodeReport(
+        node.getUsedResource(), node.getNumContainers());
   }
+  
 }



Mime
View raw message