hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [35/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,681 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+@Private
+@Unstable
+public class LeafQueue implements Queue {
+  private static final Log LOG = LogFactory.getLog(LeafQueue.class);
+  
+  private final String queueName;
+  private final Queue parent;
+  private final float capacity;
+  private final float absoluteCapacity;
+  private final float maxCapacity;
+  private final float absoluteMaxCapacity;
+  private final int userLimit;
+  private final float userLimitFactor;
+  
+  private final int maxApplications;
+  private final int maxApplicationsPerUser;
+  
+  private Resource usedResources = 
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+  private float utilization = 0.0f;
+  private float usedCapacity = 0.0f;
+  private volatile int numContainers;
+  
+  Set<Application> applications;
+  
+  public final Resource minimumAllocation;
+
+  private ContainerTokenSecretManager containerTokenSecretManager;
+
+  private Map<String, User> users = new HashMap<String, User>();
+  
+  public LeafQueue(CapacitySchedulerContext cs, 
+      String queueName, Queue parent, 
+      Comparator<Application> applicationComparator) {
+    this.queueName = queueName;
+    this.parent = parent;
+    
+    this.minimumAllocation = cs.getMinimumAllocation();
+    this.containerTokenSecretManager = cs.getContainerTokenSecretManager();
+
+    this.capacity = 
+      (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
+    this.absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
+    
+    this.maxCapacity = cs.getConfiguration().getMaximumCapacity(getQueuePath());
+    this.absoluteMaxCapacity = 
+      (maxCapacity == CapacitySchedulerConfiguration.UNDEFINED) ? 
+          Float.MAX_VALUE : (parent.getAbsoluteCapacity() * maxCapacity) / 100;
+
+    this.userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
+
+    this.userLimitFactor = 
+      cs.getConfiguration().getUserLimitFactor(getQueuePath());
+    
+    int maxSystemJobs = cs.getConfiguration().getMaximumSystemApplications();
+    this.maxApplications = (int)(maxSystemJobs * absoluteCapacity);
+    this.maxApplicationsPerUser = 
+      (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
+
+    LOG.info("DEBUG --- LeafQueue:" +
+    		" name=" + queueName + 
+    		", fullname=" + getQueuePath() + 
+        ", capacity=" + capacity + 
+    		", asboluteCapacity=" + absoluteCapacity + 
+        ", maxCapacity=" + maxCapacity +
+    		", asboluteMaxCapacity=" + absoluteMaxCapacity +
+    		", userLimit=" + userLimit + ", userLimitFactor=" + userLimitFactor + 
+    		", maxApplications=" + maxApplications + 
+    		", maxApplicationsPerUser=" + maxApplicationsPerUser);
+    
+    this.applications = new TreeSet<Application>(applicationComparator);
+  }
+  
+  @Override
+  public float getCapacity() {
+    return capacity;
+  }
+
+  @Override
+  public float getAbsoluteCapacity() {
+    return absoluteCapacity;
+  }
+
+  @Override
+  public float getMaximumCapacity() {
+    return maxCapacity;
+  }
+
+  @Override
+  public float getAbsoluteMaximumCapacity() {
+    return absoluteMaxCapacity;
+  }
+
+  @Override
+  public Queue getParent() {
+    return parent;
+  }
+
+  @Override
+  public String getQueueName() {
+    return queueName;
+  }
+
+  @Override
+  public String getQueuePath() {
+    return parent.getQueuePath() + "." + getQueueName();
+  }
+
+  @Override
+  public float getUsedCapacity() {
+    return usedCapacity;
+  }
+
+  @Override
+  public synchronized Resource getUsedResources() {
+    return usedResources;
+  }
+
+  @Override
+  public synchronized float getUtilization() {
+    return utilization;
+  }
+
+  @Override
+  public synchronized List<Application> getApplications() {
+    return new ArrayList<Application>(applications);
+  }
+
+  @Override
+  public List<Queue> getChildQueues() {
+    return null;
+  }
+
+  synchronized void setUtilization(float utilization) {
+    this.utilization = utilization;
+  }
+
+  synchronized void setUsedCapacity(float usedCapacity) {
+    this.usedCapacity = usedCapacity;
+  }
+  
+  public synchronized int getNumApplications() {
+    return applications.size();
+  }
+  
+  public int getNumContainers() {
+    return numContainers;
+  }
+
+  public String toString() {
+    return queueName + ":" + capacity + ":" + absoluteCapacity + ":" + 
+      getUsedCapacity() + ":" + getUtilization() + ":" + 
+      getNumApplications() + ":" + getNumContainers();
+  }
+
+  private synchronized User getUser(String userName) {
+    User user = users.get(userName);
+    if (user == null) {
+      user = new User();
+      users.put(userName, user);
+    }
+    return user;
+  }
+  
+  @Override
+  public void submitApplication(Application application, String userName,
+      String queue, Priority priority) 
+  throws AccessControlException {
+    // Careful! Locking order is important!
+    synchronized (this) {
+      
+      // Check submission limits for queues
+      if (getNumApplications() >= maxApplications) {
+        throw new AccessControlException("Queue " + getQueuePath() + 
+            " already has " + getNumApplications() + " applications," +
+            " cannot accept submission of application: " + 
+            application.getApplicationId());
+      }
+
+      // Check submission limits for the user on this queue
+      User user = getUser(userName);
+      if (user.getApplications() >= maxApplicationsPerUser) {
+        throw new AccessControlException("Queue " + getQueuePath() + 
+            " already has " + user.getApplications() + 
+            " applications from user " + userName + 
+            " cannot accept submission of application: " + 
+            application.getApplicationId());
+      }
+      
+      // Accept 
+      user.submitApplication();
+      applications.add(application);
+      
+      LOG.info("Application submission -" +
+          " appId: " + application.getApplicationId() +
+          " user: " + user + "," + " leaf-queue: " + getQueueName() +
+          " #user-applications: " + user.getApplications() + 
+          " #queue-applications: " + getNumApplications());
+    }
+
+    // Inform the parent queue
+    parent.submitApplication(application, userName, queue, priority);
+  }
+
+  @Override
+  public void finishApplication(Application application, String queue) 
+  throws AccessControlException {
+    // Careful! Locking order is important!
+    synchronized (this) {
+      applications.remove(application);
+      
+      User user = getUser(application.getUser());
+      user.finishApplication();
+      if (user.getApplications() == 0) {
+        users.remove(application.getUser());
+      }
+      
+      LOG.info("Application completion -" +
+          " appId: " + application.getApplicationId() + 
+          " user: " + application.getUser() + 
+          " queue: " + getQueueName() +
+          " #user-applications: " + user.getApplications() + 
+          " #queue-applications: " + getNumApplications());
+    }
+    
+    // Inform the parent queue
+    parent.finishApplication(application, queue);
+  }
+  
+  @Override
+  public synchronized Resource 
+  assignContainers(ClusterTracker cluster, NodeInfo node) {
+  
+    LOG.info("DEBUG --- assignContainers:" +
+        " node=" + node.getHostName() + 
+        " #applications=" + applications.size());
+    
+    // Try to assign containers to applications in fifo order
+    for (Application application : applications) {
+  
+      LOG.info("DEBUG --- pre-assignContainers");
+      application.showRequests();
+      
+      synchronized (application) {
+        for (Priority priority : application.getPriorities()) {
+
+          // Do we need containers at this 'priority'?
+          if (!needContainers(application, priority)) {
+            continue;
+          }
+          
+          // Are we going over limits by allocating to this application?
+          ResourceRequest required = 
+            application.getResourceRequest(priority, NodeManager.ANY);
+          if (required != null && required.numContainers > 0) {
+            
+            // Maximum Capacity of the queue
+            if (!assignToQueue(cluster, required.capability)) {
+              return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+            }
+            
+            // User limits
+            if (!assignToUser(application.getUser(), cluster, required.capability)) {
+              return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+            }
+            
+          }
+          
+          Resource assigned = 
+            assignContainersOnNode(cluster, node, application, priority);
+  
+          if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+                assigned, 
+                org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+            Resource assignedResource = 
+              application.getResourceRequest(priority, NodeManager.ANY).capability;
+            
+            // Book-keeping
+            allocateResource(cluster.getClusterResource(), 
+                application.getUser(), assignedResource);
+            
+            // Done
+            return assignedResource; 
+          } else {
+            // Do not assign out of order w.r.t priorities
+            break;
+          }
+        }
+      }
+      
+      LOG.info("DEBUG --- post-assignContainers");
+      application.showRequests();
+    }
+  
+    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+  }
+
+  private synchronized boolean assignToQueue(ClusterTracker cluster, 
+      Resource required) {
+    float newUtilization = 
+      (float)(usedResources.memory + required.memory) / 
+        (cluster.getClusterResource().memory * absoluteCapacity);
+    if (newUtilization > absoluteMaxCapacity) {
+      LOG.info(getQueueName() + 
+          " current-capacity (" + getUtilization() + ") +" +
+          " required (" + required.memory + ")" +
+          " > max-capacity (" + absoluteMaxCapacity + ")");
+      return false;
+    }
+    return true;
+  }
+  
+  private synchronized boolean assignToUser(String userName, ClusterTracker cluster,
+      Resource required) {
+    // What is our current capacity? 
+    // * It is equal to the max(required, queue-capacity) if
+    //   we're running below capacity. The 'max' ensures that jobs in queues
+    //   with miniscule capacity (< 1 slot) make progress
+    // * If we're running over capacity, then its
+    //   (usedResources + required) (which extra resources we are allocating)
+
+    // Allow progress for queues with miniscule capacity
+    final int queueCapacity = 
+      Math.max(
+          divideAndCeil((int)(absoluteCapacity * cluster.getClusterResource().memory), 
+              minimumAllocation.memory), 
+          required.memory);
+    
+    final int consumed = usedResources.memory;
+    final int currentCapacity = 
+      (consumed < queueCapacity) ? queueCapacity : (consumed + required.memory);
+    
+    // Never allow a single user to take more than the 
+    // queue's configured capacity * user-limit-factor.
+    // Also, the queue's configured capacity should be higher than 
+    // queue-hard-limit * ulMin
+    
+    final int activeUsers = users.size();  
+    User user = getUser(userName);
+    
+    int limit = 
+      Math.min(
+          Math.max(divideAndCeil(currentCapacity, activeUsers), 
+                   divideAndCeil((int)userLimit*currentCapacity, 100)),
+          (int)(queueCapacity * userLimitFactor)
+          );
+
+    // Note: We aren't considering the current request since there is a fixed
+    // overhead of the AM, so... 
+    if ((user.getConsumedResources().memory) > limit) {
+      LOG.info("User " + userName + " in queue " + getQueueName() + 
+          " will exceed limit, required: " + required + 
+          " consumed: " + user.getConsumedResources() + " limit: " + limit +
+          " queueCapacity: " + queueCapacity + 
+          " qconsumed: " + consumed +
+          " currentCapacity: " + currentCapacity +
+          " activeUsers: " + activeUsers 
+          );
+      return false;
+    }
+
+    return true;
+  }
+  
+  private static int divideAndCeil(int a, int b) {
+    if (b == 0) {
+      LOG.info("divideAndCeil called with a=" + a + " b=" + b);
+      return 0;
+    }
+    return (a + (b - 1)) / b;
+  }
+
+  boolean needContainers(Application application, Priority priority) {
+    ResourceRequest offSwitchRequest = 
+      application.getResourceRequest(priority, NodeManager.ANY);
+
+    return (offSwitchRequest.numContainers > 0);
+  }
+
+  Resource assignContainersOnNode(ClusterTracker cluster, NodeInfo node, 
+      Application application, Priority priority) {
+
+    Resource assigned = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+
+    // Data-local
+    assigned = assignNodeLocalContainers(cluster, node, application, priority); 
+    if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+          assigned, 
+          org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+      return assigned;
+    }
+
+    // Rack-local
+    assigned = assignRackLocalContainers(cluster, node, application, priority);
+    if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+        assigned, 
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+    return assigned;
+  }
+    
+    // Off-switch
+    return assignOffSwitchContainers(cluster, node, application, priority);
+  }
+
+  Resource assignNodeLocalContainers(ClusterTracker cluster, NodeInfo node, 
+      Application application, Priority priority) {
+    ResourceRequest request = 
+      application.getResourceRequest(priority, node.getHostName());
+    if (request != null) {
+      if (canAssign(application, priority, node, NodeType.DATA_LOCAL)) {
+        return assignContainer(cluster, node, application, priority, request, 
+            NodeType.DATA_LOCAL);
+      }
+    }
+    
+    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+  }
+
+  Resource assignRackLocalContainers(ClusterTracker cluster, NodeInfo node, 
+      Application application, Priority priority) {
+    ResourceRequest request = 
+      application.getResourceRequest(priority, node.getRackName());
+    if (request != null) {
+      if (canAssign(application, priority, node, NodeType.RACK_LOCAL)) {
+        return assignContainer(cluster, node, application, priority, request, 
+            NodeType.RACK_LOCAL);
+      }
+    }
+    
+    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+  }
+
+  Resource assignOffSwitchContainers(ClusterTracker cluster, NodeInfo node, 
+      Application application, Priority priority) {
+    ResourceRequest request = 
+      application.getResourceRequest(priority, NodeManager.ANY);
+    if (request != null) {
+      if (canAssign(application, priority, node, NodeType.OFF_SWITCH)) {
+        return assignContainer(cluster, node, application, priority, request, 
+            NodeType.OFF_SWITCH);
+      }
+    }
+    
+    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+  }
+
+  boolean canAssign(Application application, Priority priority, 
+      NodeInfo node, NodeType type) {
+
+    ResourceRequest offSwitchRequest = 
+      application.getResourceRequest(priority, NodeManager.ANY);
+    
+    if (offSwitchRequest.numContainers == 0) {
+      return false;
+    }
+    
+    if (type == NodeType.OFF_SWITCH) {
+      return offSwitchRequest.numContainers > 0;
+    }
+    
+    if (type == NodeType.RACK_LOCAL) {
+      ResourceRequest rackLocalRequest = 
+        application.getResourceRequest(priority, node.getRackName());
+      if (rackLocalRequest == null) {
+        // No point waiting for rack-locality if we don't need this rack
+        return offSwitchRequest.numContainers > 0;
+      } else {
+        return rackLocalRequest.numContainers > 0;
+      }
+    }
+    
+    if (type == NodeType.DATA_LOCAL) {
+      ResourceRequest nodeLocalRequest = 
+        application.getResourceRequest(priority, node.getHostName());
+      if (nodeLocalRequest != null) {
+        return nodeLocalRequest.numContainers > 0;
+      }
+    }
+    
+    return false;
+  }
+  
+  private Resource assignContainer(ClusterTracker cluster, NodeInfo node, 
+      Application application, 
+      Priority priority, ResourceRequest request, NodeType type) {
+    LOG.info("DEBUG --- assignContainers:" +
+        " node=" + node.getHostName() + 
+        " application=" + application.getApplicationId().id + 
+        " priority=" + priority.priority + 
+        " request=" + request + " type=" + type);
+    Resource capability = request.capability;
+    
+    int availableContainers = 
+        node.getAvailableResource().memory / capability.memory; // TODO: A buggy
+                                                                // application
+                                                                // with this
+                                                                // zero would
+                                                                // crash the
+                                                                // scheduler.
+    
+    if (availableContainers > 0) {
+      List<Container> containers =
+        new ArrayList<Container>();
+      Container container =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Container
+        .create(application.getApplicationId(), 
+            application.getNewContainerId(),
+            node.getHostName(), capability);
+      
+      // If security is enabled, send the container-tokens too.
+      if (UserGroupInformation.isSecurityEnabled()) {
+        ContainerToken containerToken = new ContainerToken();
+        ContainerTokenIdentifier tokenidentifier =
+          new ContainerTokenIdentifier(container.id,
+              container.hostName.toString(), container.resource);
+        containerToken.identifier =
+          ByteBuffer.wrap(tokenidentifier.getBytes());
+        containerToken.kind = ContainerTokenIdentifier.KIND.toString();
+        containerToken.password =
+          ByteBuffer.wrap(containerTokenSecretManager
+              .createPassword(tokenidentifier));
+        containerToken.service = container.hostName; // TODO: port
+        container.containerToken = containerToken;
+      }
+      
+      containers.add(container);
+
+      // Allocate container to the application
+      application.allocate(type, node, priority, request, containers);
+      
+      // Update resource usage on the node
+      cluster.addAllocatedContainers(node, application.getApplicationId(), 
+          containers);
+      
+      LOG.info("allocatedContainer" +
+          " container=" + container + 
+          " queue=" + this.toString() + 
+          " util=" + getUtilization() + 
+          " used=" + usedResources + 
+          " cluster=" + cluster.getClusterResource());
+
+      return container.resource;
+    }
+    
+    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+  }
+
+  @Override
+  public void completedContainer(ClusterTracker cluster, 
+      Container container, Application application) {
+    if (application != null) {
+      // Careful! Locking order is important!
+      synchronized (this) {
+        // Inform the application
+        application.completedContainer(container);
+        
+        // Book-keeping
+        releaseResource(cluster.getClusterResource(), 
+            application.getUser(), container.resource);
+        
+        LOG.info("completedContainer" +
+            " container=" + container +
+        		" queue=" + this + 
+            " util=" + getUtilization() + 
+            " used=" + usedResources + 
+            " cluster=" + cluster.getClusterResource());
+      }
+      
+      // Inform the parent queue
+      parent.completedContainer(cluster, container, application);
+    }
+  }
+
+  private synchronized void allocateResource(Resource clusterResource, 
+      String userName, Resource resource) {
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+      addResource(usedResources, resource);
+    update(clusterResource);
+    ++numContainers;
+    
+    User user = getUser(userName);
+    user.assignContainer(resource);
+  }
+
+  private synchronized void releaseResource(Resource clusterResource, 
+      String userName, Resource resource) {
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+      subtractResource(usedResources, resource);
+    update(clusterResource);
+    --numContainers;
+    
+    User user = getUser(userName);
+    user.releaseContainer(resource);
+  }
+
+  private synchronized void update(Resource clusterResource) {
+    setUtilization(usedResources.memory / (clusterResource.memory * absoluteCapacity));
+    setUsedCapacity(usedResources.memory / (clusterResource.memory * capacity));
+  }
+  
+  static class User {
+    Resource consumed = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+    int applications = 0;
+    
+    public Resource getConsumedResources() {
+      return consumed;
+    }
+  
+    public int getApplications() {
+      return applications;
+    }
+  
+    public synchronized void submitApplication() {
+      ++applications;
+    }
+    
+    public synchronized void finishApplication() {
+      --applications;
+    }
+    
+    public synchronized void assignContainer(Resource resource) {
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+          consumed, resource);
+    }
+    
+    public synchronized void releaseContainer(Resource resource) {
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+          consumed, resource);
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,408 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+
+@Private
+@Evolving
+public class ParentQueue implements Queue {
+
+  private static final Log LOG = LogFactory.getLog(ParentQueue.class);
+
+  private final Queue parent;
+  private final String queueName;
+  private final float capacity;
+  private final float maximumCapacity;
+  private final float absoluteCapacity;
+  private final float absoluteMaxCapacity;
+
+  private float usedCapacity = 0.0f;
+  private float utilization = 0.0f;
+
+  private final Set<Queue> childQueues;
+  
+  private Resource usedResources = 
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+  
+  private final boolean rootQueue;
+  
+  private final Resource minimumAllocation;
+
+  private volatile int numApplications;
+  private volatile int numContainers;
+
+  public ParentQueue(CapacitySchedulerContext cs, 
+      String queueName, Comparator<Queue> comparator, Queue parent) {
+    minimumAllocation = cs.getMinimumAllocation();
+    
+    this.parent = parent;
+    this.queueName = queueName;
+    this.rootQueue = (parent == null);
+    
+    LOG.info("PQ: parent=" + parent + ", qName=" + queueName + 
+        " qPath=" + getQueuePath() + ", root=" + rootQueue);
+    this.capacity = 
+      (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
+
+    float parentAbsoluteCapacity = 
+      (parent == null) ? 1.0f : parent.getAbsoluteCapacity();
+    this.absoluteCapacity = parentAbsoluteCapacity * capacity; 
+
+    this.maximumCapacity = 
+      cs.getConfiguration().getMaximumCapacity(getQueuePath());
+    this.absoluteMaxCapacity = 
+      (maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ? 
+          Float.MAX_VALUE :  (parentAbsoluteCapacity * maximumCapacity) / 100;
+    
+    this.childQueues = new TreeSet<Queue>(comparator);
+    
+    LOG.info("Initialized parent-queue " + queueName + 
+        " name=" + queueName + 
+        ", fullname=" + getQueuePath() + 
+        ", capacity=" + capacity + 
+        ", asboluteCapacity=" + absoluteCapacity + 
+        ", maxCapacity=" + maximumCapacity +
+        ", asboluteMaxCapacity=" + absoluteMaxCapacity);
+  }
+
+  public void setChildQueues(Collection<Queue> childQueues) {
+    
+    // Validate
+    float childCapacities = 0;
+    for (Queue queue : childQueues) {
+      childCapacities += queue.getCapacity();
+    }
+    if (childCapacities != 1.0f) {
+      throw new IllegalArgumentException("Illegal" +
+      		" capacity of " + childCapacities + 
+      		" for children of queue " + queueName);
+    }
+    
+    this.childQueues.addAll(childQueues);
+    LOG.info("DEBUG --- setChildQueues: " + getChildQueuesToPrint());
+  }
+  
+  @Override
+  public Queue getParent() {
+    return parent;
+  }
+
+  @Override
+  public String getQueueName() {
+    return queueName;
+  }
+
+  @Override
+  public String getQueuePath() {
+    String parentPath = ((parent == null) ? "" : (parent.getQueuePath() + "."));
+    return parentPath + getQueueName();
+  }
+
+  @Override
+  public float getCapacity() {
+    return capacity;
+  }
+
+  @Override
+  public float getAbsoluteCapacity() {
+    return absoluteCapacity;
+  }
+
+  @Override
+  public float getAbsoluteMaximumCapacity() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public float getMaximumCapacity() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public float getUsedCapacity() {
+    return usedCapacity;
+  }
+
+  @Override
+  public synchronized Resource getUsedResources() {
+    return usedResources;
+  }
+  
+  @Override
+  public synchronized float getUtilization() {
+    return utilization;
+  }
+
+  @Override
+  public List<Application> getApplications() {
+    return null;
+  }
+
+  @Override
+  public synchronized List<Queue> getChildQueues() {
+    return new ArrayList<Queue>(childQueues);
+  }
+
+  public int getNumContainers() {
+    return numContainers;
+  }
+  
+  public int getNumApplications() {
+    return numApplications;
+  }
+
+  public String toString() {
+    return queueName + ":" + capacity + ":" + absoluteCapacity + ":" + 
+      getUsedCapacity() + ":" + getUtilization() + ":" + 
+      getNumApplications() + ":" + getNumContainers() + ":" + 
+      childQueues.size() + " child-queues";
+  }
+  
+  @Override
+  public void submitApplication(Application application, String user,
+      String queue, Priority priority) 
+  throws AccessControlException {
+    // Sanity check
+    if (queue.equals(queueName)) {
+      throw new AccessControlException("Cannot submit application " +
+          "to non-leaf queue: " + queueName);
+    }
+    
+    ++numApplications;
+   
+    LOG.info("Application submission -" +
+    		" appId: " + application.getApplicationId() + 
+        " user: " + user + 
+        " leaf-queue of parent: " + getQueueName() + 
+        " #applications: " + getNumApplications());
+
+    // Inform the parent queue
+    if (parent != null) {
+      parent.submitApplication(application, user, queue, priority);
+    }
+  }
+
+  @Override
+  public void finishApplication(Application application, String queue) 
+  throws AccessControlException {
+    // Sanity check
+    if (queue.equals(queueName)) {
+      throw new AccessControlException("Cannot finish application " +
+          "from non-leaf queue: " + queueName);
+    }
+    
+    --numApplications;
+    
+    LOG.info("Application completion -" +
+        " appId: " + application.getApplicationId() + 
+        " user: " + application.getUser() + 
+        " leaf-queue of parent: " + getQueueName() + 
+        " #applications: " + getNumApplications());
+
+    // Inform the parent queue
+    if (parent != null) {
+      parent.finishApplication(application, queue);
+    }
+  }
+
+  synchronized void setUsedCapacity(float usedCapacity) {
+    this.usedCapacity = usedCapacity;
+  }
+  
+  synchronized void setUtilization(float utilization) {
+    this.utilization = utilization;
+  }
+
+  @Override
+  public synchronized Resource assignContainers(ClusterTracker cluster, 
+      NodeInfo node) {
+    Resource assigned = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+
+    while (canAssign(node)) {
+      LOG.info("DEBUG --- Trying to assign containers to child-queue of " + 
+          getQueueName());
+      
+      // Are we over maximum-capacity for this queue?
+      if (!assignToQueue()) {
+        LOG.info(getQueueName() + 
+            " current-capacity (" + getUtilization() + ") > max-capacity (" + 
+            absoluteMaxCapacity + ")");
+        break;
+      }
+      
+      // Schedule
+      Resource assignedToChild = assignContainersToChildQueues(cluster, node);
+
+      // Done if no child-queue assigned anything
+      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+          assignedToChild, 
+          org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+        // Track resource utilization for the parent-queue
+        allocateResource(cluster.getClusterResource(), assignedToChild);
+        
+        // Track resource utilization in this pass of the scheduler
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+            assigned, assignedToChild);
+        
+        LOG.info("completedContainer" +
+            " queue=" + getQueueName() + 
+            " util=" + getUtilization() + 
+            " used=" + usedResources + 
+            " cluster=" + cluster.getClusterResource());
+
+      } else {
+        break;
+      }
+
+      LOG.info("DEBUG ---" +
+      		" parentQ=" + getQueueName() + 
+      		" assigned=" + assigned + 
+      		" utilization=" + getUtilization());
+      
+      // Do not assign more than one container if this isn't the root queue
+      if (!rootQueue) {
+        break;
+      }
+    } 
+    
+    return assigned;
+  }
+  
+  private synchronized boolean assignToQueue() {
+    return (getUtilization() < absoluteMaxCapacity);
+  }
+  
+  private boolean canAssign(NodeInfo node) {
+    return 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThanOrEqual(
+        node.getAvailableResource(), 
+        minimumAllocation);
+  }
+  
+  synchronized Resource assignContainersToChildQueues(ClusterTracker cluster, 
+      NodeInfo node) {
+    Resource assigned = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+    
+    printChildQueues();
+
+    // Try to assign to most 'under-served' sub-queue
+    for (Iterator<Queue> iter=childQueues.iterator(); iter.hasNext();) {
+      Queue childQueue = iter.next();
+      LOG.info("DEBUG --- Trying to assign to" +
+      		" queue: " + childQueue.getQueuePath() + 
+      		" stats: " + childQueue);
+      assigned = childQueue.assignContainers(cluster, node);
+
+      // If we do assign, remove the queue and re-insert in-order to re-sort
+      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
+            assigned, 
+            org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+        // Remove and re-insert to sort
+        iter.remove();
+        LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() + 
+            " stats: " + childQueue);
+        childQueues.add(childQueue);
+        printChildQueues();
+        break;
+      }
+    }
+    
+    return assigned;
+  }
+
+  String getChildQueuesToPrint() {
+    StringBuilder sb = new StringBuilder();
+    for (Queue q : childQueues) {
+      sb.append(q.getQueuePath() + "(" + q.getUtilization() + "), ");
+    }
+    return sb.toString();
+  }
+  void printChildQueues() {
+    LOG.info("DEBUG --- printChildQueues - queue: " + getQueuePath() + 
+        " child-queues: " + getChildQueuesToPrint());
+  }
+  
+  @Override
+  public void completedContainer(ClusterTracker cluster,
+      Container container, Application application) {
+    if (application != null) {
+      // Careful! Locking order is important!
+      // Book keeping
+      synchronized (this) {
+        releaseResource(cluster.getClusterResource(), container.resource);
+
+        LOG.info("completedContainer" +
+            " queue=" + getQueueName() + 
+            " util=" + getUtilization() + 
+            " used=" + usedResources + 
+            " cluster=" + cluster.getClusterResource());
+      }
+
+      // Inform the parent
+      if (parent != null) {
+        parent.completedContainer(cluster, container, application);
+      }    
+    }
+  }
+  
+  private synchronized void allocateResource(Resource clusterResource, 
+      Resource resource) {
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+      addResource(usedResources, resource);
+    update(clusterResource);
+    ++numContainers;
+  }
+  
+  private synchronized void releaseResource(Resource clusterResource, 
+      Resource resource) {
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+      subtractResource(usedResources, resource);
+    update(clusterResource);
+    --numContainers;
+  }
+
+  private synchronized void update(Resource clusterResource) {
+    setUtilization(usedResources.memory / (clusterResource.memory * absoluteCapacity));
+    setUsedCapacity(usedResources.memory / (clusterResource.memory * capacity));
+  }
+  
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,164 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * Queue represents a node in the tree of 
+ * hierarchical queues in the {@link CapacityScheduler}.
+ */
+@Stable
+@Private
+public interface Queue 
+extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
+  /**
+   * Get the parent <code>Queue</code>.
+   * @return the parent queue
+   */
+  public Queue getParent();
+
+  /**
+   * Get the queue name.
+   * @return the queue name
+   */
+  public String getQueueName();
+
+  /**
+   * Get the full name of the queue, including the heirarchy.
+   * @return the full name of the queue
+   */
+  public String getQueuePath();
+  
+  /**
+   * Get the configured <em>capacity</em> of the queue.
+   * @return queue capacity
+   */
+  public float getCapacity();
+
+  /**
+   * Get capacity of the parent of the queue as a function of the 
+   * cumulative capacity in the cluster.
+   * @return capacity of the parent of the queue as a function of the 
+   *         cumulative capacity in the cluster
+   */
+  public float getAbsoluteCapacity();
+  
+  /**
+   * Get the configured maximum-capacity of the queue. 
+   * @return the configured maximum-capacity of the queue
+   */
+  public float getMaximumCapacity();
+  
+  /**
+   * Get maximum-capacity of the queue as a funciton of the cumulative capacity
+   * of the cluster.
+   * @return maximum-capacity of the queue as a funciton of the cumulative capacity
+   *         of the cluster
+   */
+  public float getAbsoluteMaximumCapacity();
+  
+  /**
+   * Get the currently utilized capacity of the queue 
+   * relative to it's parent queue.
+   * @return the currently utilized capacity of the queue 
+   *         relative to it's parent queue
+   */
+  public float getUsedCapacity();
+  
+  /**
+   * Get the currently utilized resources in the cluster 
+   * by the queue and children (if any).
+   * @return used resources by the queue and it's children 
+   */
+  public Resource getUsedResources();
+  
+  /**
+   * Get the current <em>utilization</em> of the queue 
+   * and it's children (if any).
+   * Utilization is defined as the ratio of 
+   * <em>used-capacity over configured-capacity</em> of the queue.
+   * @return queue utilization
+   */
+  public float getUtilization();
+  
+  /**
+   * Get child queues
+   * @return child queues
+   */
+  public List<Queue> getChildQueues();
+  
+  /**
+   * Get applications in this queue
+   * @return applications in the queue
+   */
+  public List<Application> getApplications();
+  
+  /**
+   * Submit a new application to the queue.
+   * @param application application being submitted
+   * @param user user who submitted the application
+   * @param queue queue to which the application is submitted
+   * @param priority application priority
+   */
+  public void submitApplication(Application application, String user, 
+      String queue, Priority priority) 
+  throws AccessControlException;
+  
+  /**
+   * An application submitted to this queue has finished.
+   * @param application
+   * @param queue application queue 
+   */
+  public void finishApplication(Application application, String queue)
+  throws AccessControlException;
+  
+  /**
+   * Assign containers to applications in the queue or it's children (if any).
+   * @param cluster cluster resources
+   * @param node node on which resources are available
+   * @return
+   */
+  public Resource assignContainers(ClusterTracker cluster, NodeInfo node);
+  
+  /**
+   * A container assigned to the queue has completed.
+   * @param cluster cluster resources
+   * @param container completed container
+   * @param application application to which the container was assigned
+   */
+  public void completedContainer(ClusterTracker cluster, 
+      Container container, Application application);
+
+  /**
+   * Get the number of applications in the queue.
+   * @return number of applications
+   */
+  public int getNumApplications();
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,440 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterTracker.NodeResponse;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+@LimitedPrivate("yarn")
+@Evolving
+public class FifoScheduler implements ResourceScheduler {
+  
+  private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
+  
+  Configuration conf;
+  private ContainerTokenSecretManager containerTokenSecretManager;
+  private final ClusterTracker clusterTracker;
+  
+  // TODO: The memory-block size should be site-configurable?
+  public static final int MINIMUM_MEMORY = 1024;
+  private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
+  private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
+  
+  public static final Resource MINIMUM_ALLOCATION = 
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
+        MINIMUM_MEMORY);
+    
+  Map<ApplicationID, Application> applications = 
+    new TreeMap<ApplicationID, Application>(
+        new org.apache.hadoop.yarn.server.resourcemanager.resource.ApplicationID.Comparator());
+
+  private static final Queue DEFAULT_QUEUE = new Queue() {
+    @Override
+    public String getQueueName() {
+      return "default";
+    }
+  };
+  
+  public FifoScheduler() {
+    this.clusterTracker = createClusterTracker();
+  }
+  
+  public FifoScheduler(Configuration conf,
+      ContainerTokenSecretManager containerTokenSecretManager) 
+  {
+    this();
+    reinitialize(conf, containerTokenSecretManager);
+  }
+  
+  protected ClusterTracker createClusterTracker() {
+    return new ClusterTrackerImpl(); 
+  }
+  
+  @Override
+  public void reinitialize(Configuration conf,
+      ContainerTokenSecretManager containerTokenSecretManager) 
+  {
+    this.conf = conf;
+    this.containerTokenSecretManager = containerTokenSecretManager;
+  }
+  
+  @Override
+  public synchronized List<Container> allocate(ApplicationID applicationId,
+      List<ResourceRequest> ask, List<Container> release) 
+      throws IOException {
+    Application application = getApplication(applicationId);
+    if (application == null) {
+      LOG.error("Calling allocate on removed " +
+      		"or non existant application " + applicationId);
+      return EMPTY_CONTAINER_LIST; 
+    }
+    normalizeRequests(ask);
+    
+    LOG.debug("allocate: pre-update" +
+    		" applicationId=" + applicationId + 
+        " application=" + application);
+    application.showRequests();
+    
+    // Update application requests
+    application.updateResourceRequests(ask);
+    
+    // Release containers
+    releaseContainers(application, release);
+    
+    application.showRequests();
+    
+    List<Container> allContainers = application.acquire();
+    LOG.debug("allocate:" +
+    		" applicationId=" + applicationId + 
+    		" #ask=" + ask.size() + 
+    		" #release=" + release.size() +
+    		" #allContainers=" + allContainers.size());
+    return allContainers;
+  }
+
+  private void releaseContainers(Application application, List<Container> release) {
+    application.releaseContainers(release);
+    for (Container container : release) {
+      clusterTracker.releaseContainer(application.getApplicationId(), container);
+    }
+  }
+  
+  private void normalizeRequests(List<ResourceRequest> asks) {
+    for (ResourceRequest ask : asks) {
+      normalizeRequest(ask);
+    }
+  }
+  
+  private void normalizeRequest(ResourceRequest ask) {
+    int memory = ask.capability.memory;
+    memory = 
+      MINIMUM_MEMORY * ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY)); 
+  }
+  
+  private synchronized Application getApplication(ApplicationID applicationId) {
+    return applications.get(applicationId);
+  }
+
+  @Override
+  public synchronized void addApplication(ApplicationID applicationId, 
+      String user, String unusedQueue, Priority unusedPriority) 
+  throws IOException {
+    applications.put(applicationId, 
+        new Application(applicationId, DEFAULT_QUEUE, user));
+    LOG.info("Application Submission: " + applicationId.id + " from " + user + 
+        ", currently active: " + applications.size());
+  }
+
+  @Override
+  public synchronized void removeApplication(ApplicationID applicationId)
+  throws IOException {
+    Application application = getApplication(applicationId);
+    if (application == null) {
+      throw new IOException("Unknown application " + applicationId + 
+          " has completed!");
+    }
+    
+    // Release current containers
+    releaseContainers(application, application.getCurrentContainers());
+    
+    // Let the cluster know that the applications are done
+    clusterTracker.finishedApplication(applicationId, 
+        application.getAllNodesForApplication());
+    
+    // Remove the application
+    applications.remove(applicationId);
+  }
+  
+  /**
+   * Heart of the scheduler...
+   * 
+   * @param node node on which resources are available to be allocated
+   */
+  private synchronized void assignContainers(NodeInfo node) {
+    LOG.debug("assignContainers:" +
+    		" node=" + node.getHostName() + 
+        " #applications=" + applications.size());
+    
+    // Try to assign containers to applications in fifo order
+    for (Map.Entry<ApplicationID, Application> e : applications.entrySet()) {
+      Application application = e.getValue();
+      LOG.debug("pre-assignContainers");
+      application.showRequests();
+      synchronized (application) {
+        for (Priority priority : application.getPriorities()) {
+          int maxContainers = 
+            getMaxAllocatableContainers(application, priority, node, 
+                NodeType.OFF_SWITCH); 
+          // Ensure the application needs containers of this priority
+          if (maxContainers > 0) {
+            int assignedContainers = 
+              assignContainersOnNode(node, application, priority);
+            // Do not assign out of order w.r.t priorities
+            if (assignedContainers == 0) {
+              break;
+            }
+          }
+        }
+      }
+      LOG.debug("post-assignContainers");
+      application.showRequests();
+      
+      // Done
+      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.lessThan(
+          node.getAvailableResource(), MINIMUM_ALLOCATION)) {
+        return;
+      }
+    }
+  }
+  
+  private int getMaxAllocatableContainers(Application application,
+      Priority priority, NodeInfo node, NodeType type) {
+    ResourceRequest offSwitchRequest = 
+      application.getResourceRequest(priority, NodeManager.ANY);
+    int maxContainers = offSwitchRequest.numContainers;
+    
+    if (type == NodeType.OFF_SWITCH) {
+      return maxContainers;
+    }
+    
+    if (type == NodeType.RACK_LOCAL) {
+      ResourceRequest rackLocalRequest = 
+        application.getResourceRequest(priority, node.getRackName());
+      if (rackLocalRequest == null) {
+        return maxContainers;
+      }
+
+      maxContainers = Math.min(maxContainers, rackLocalRequest.numContainers);
+    }
+    
+    if (type == NodeType.DATA_LOCAL) {
+      ResourceRequest nodeLocalRequest = 
+        application.getResourceRequest(priority, node.getHostName());
+      if (nodeLocalRequest != null) {
+        maxContainers = Math.min(maxContainers, nodeLocalRequest.numContainers);
+      }
+    }
+    
+    return maxContainers;
+  }
+  
+
+  private int assignContainersOnNode(NodeInfo node, 
+      Application application, Priority priority 
+      ) {
+    // Data-local
+    int nodeLocalContainers = 
+      assignNodeLocalContainers(node, application, priority); 
+
+    // Rack-local
+    int rackLocalContainers = 
+      assignRackLocalContainers(node, application, priority);
+    
+    // Off-switch
+    int offSwitchContainers =
+      assignOffSwitchContainers(node, application, priority);
+    
+
+    LOG.debug("assignContainersOnNode:" +
+        " node=" + node.getHostName() + 
+        " application=" + application.getApplicationId().id +
+        " priority=" + priority.priority + 
+        " #assigned=" + 
+          (nodeLocalContainers + rackLocalContainers + offSwitchContainers));
+    
+
+    return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
+  }
+  
+  private int assignNodeLocalContainers(NodeInfo node, 
+      Application application, Priority priority) {
+    int assignedContainers = 0;
+    ResourceRequest request = 
+      application.getResourceRequest(priority, node.getHostName());
+    if (request != null) {
+      int assignableContainers = 
+        Math.min(
+            getMaxAllocatableContainers(application, priority, node, 
+                NodeType.DATA_LOCAL), 
+            request.numContainers);
+      assignedContainers = 
+        assignContainers(node, application, priority, 
+            assignableContainers, request, NodeType.DATA_LOCAL);
+    }
+    return assignedContainers;
+  }
+  
+  private int assignRackLocalContainers(NodeInfo node, 
+      Application application, Priority priority) {
+    int assignedContainers = 0;
+    ResourceRequest request = 
+      application.getResourceRequest(priority, node.getRackName());
+    if (request != null) {
+      int assignableContainers = 
+        Math.min(
+            getMaxAllocatableContainers(application, priority, node, 
+                NodeType.RACK_LOCAL), 
+            request.numContainers);
+      assignedContainers = 
+        assignContainers(node, application, priority, 
+            assignableContainers, request, NodeType.RACK_LOCAL);
+    }
+    return assignedContainers;
+  }
+
+  private int assignOffSwitchContainers(NodeInfo node, 
+      Application application, Priority priority) {
+    int assignedContainers = 0;
+    ResourceRequest request = 
+      application.getResourceRequest(priority, NodeManager.ANY);
+    if (request != null) {
+      assignedContainers = 
+        assignContainers(node, application, priority, 
+            request.numContainers, request, NodeType.OFF_SWITCH);
+    }
+    return assignedContainers;
+  }
+  
+  private int assignContainers(NodeInfo node, Application application, 
+      Priority priority, int assignableContainers, 
+      ResourceRequest request, NodeType type) {
+    LOG.debug("assignContainers:" +
+    		" node=" + node.getHostName() + 
+    		" application=" + application.getApplicationId().id + 
+        " priority=" + priority.priority + 
+        " assignableContainers=" + assignableContainers +
+        " request=" + request + " type=" + type);
+    Resource capability = request.capability;
+    
+    int availableContainers = 
+        node.getAvailableResource().memory / capability.memory; // TODO: A buggy
+                                                                // application
+                                                                // with this
+                                                                // zero would
+                                                                // crash the
+                                                                // scheduler.
+    int assignedContainers = 
+      Math.min(assignableContainers, availableContainers);
+    
+    if (assignedContainers > 0) {
+      List<Container> containers =
+          new ArrayList<Container>(assignedContainers);
+      for (int i=0; i < assignedContainers; ++i) {
+        Container container =
+            org.apache.hadoop.yarn.server.resourcemanager.resource.Container
+                .create(application.getApplicationId(), 
+                    application.getNewContainerId(),
+                    node.getHostName(), capability);
+        // If security is enabled, send the container-tokens too.
+        if (UserGroupInformation.isSecurityEnabled()) {
+          ContainerToken containerToken = new ContainerToken();
+          ContainerTokenIdentifier tokenidentifier =
+              new ContainerTokenIdentifier(container.id,
+                  container.hostName.toString(), container.resource);
+          containerToken.identifier =
+              ByteBuffer.wrap(tokenidentifier.getBytes());
+          containerToken.kind = ContainerTokenIdentifier.KIND.toString();
+          containerToken.password =
+              ByteBuffer.wrap(containerTokenSecretManager
+                  .createPassword(tokenidentifier));
+          containerToken.service = container.hostName; // TODO: port
+          container.containerToken = containerToken;
+        }
+        containers.add(container);
+      }
+      application.allocate(type, node, priority, request, containers);
+      clusterTracker.addAllocatedContainers(node, application.getApplicationId(), containers);
+    }
+    
+    return assignedContainers;
+  }
+
+  private synchronized void applicationCompletedContainers(
+     List<Container> completedContainers) {
+    for (Container c: completedContainers) {
+      Application app = applications.get(c.id.appID);
+      /** this is possible, since an application can be removed from scheduler but
+       * the nodemanger is just updating about a completed container.
+       */
+      if (app != null) {
+        app.completedContainer(c);
+      }
+    }
+  }
+  
+  @Override
+  public synchronized NodeResponse nodeUpdate(NodeInfo node, 
+      Map<CharSequence,List<Container>> containers ) {
+   
+    NodeResponse nodeResponse = clusterTracker.nodeUpdate(node, containers);
+    applicationCompletedContainers(nodeResponse.getCompletedContainers());
+    LOG.info("Node heartbeat " + node.getNodeID() + " resource = " + node.getAvailableResource());
+    if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
+        greaterThanOrEqual(node.getAvailableResource(), MINIMUM_ALLOCATION)) {
+      assignContainers(node);
+    }
+    LOG.info("Node after allocation " + node.getNodeID() + " resource = "
+      + node.getAvailableResource());
+
+    // TODO: Add the list of containers to be preempted when we support
+    // preemption.
+    return nodeResponse;
+  }  
+  
+  @Override
+  public NodeInfo addNode(NodeID nodeId,String hostName,
+      Node node, Resource capability) {
+    return clusterTracker.addNode(nodeId, hostName, node, capability);
+  }
+
+  @Override
+  public void removeNode(NodeInfo node) {
+    clusterTracker.removeNode(node);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,88 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+
+import org.apache.hadoop.yarn.Application;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+class AppsBlock extends HtmlBlock {
+  final AppsList list;
+
+  @Inject AppsBlock(AppsList list, ViewContext ctx) {
+    super(ctx);
+    this.list = list;
+  }
+
+  @Override public void render(Block html) {
+    TBODY<TABLE<Hamlet>> tbody = html.
+      table("#apps").
+        thead().
+          tr().
+            th(".id", "ID").
+            th(".user", "User").
+            th(".name", "Name").
+            th(".queue", "Queue").
+            th(".state", "State").
+            th(".progress", "Progress").
+            th(".master", "Master UI")._()._().
+        tbody();
+    int i = 0;
+    for (Application app : list.apps) {
+      String appId = Apps.toString(app.id());
+      CharSequence master = app.master();
+      String am = master == null ? "UNASSIGNED"
+                                 : join(master, ':', app.httpPort());
+      String percent = String.format("%.1f", app.status().progress * 100);
+      tbody.
+        tr().
+          td().
+            br().$title(String.valueOf(app.id().id))._(). // for sorting
+            a(url("app", appId), appId)._().
+          td(app.user().toString()).
+          td(app.name().toString()).
+          td(app.queue().toString()).
+          td(app.state().toString()).
+          td().
+            br().$title(percent)._(). // for sorting
+            div(_PROGRESSBAR).
+              $title(join(percent, '%')). // tooltip
+              div(_PROGRESSBAR_VALUE).
+                $style(join("width:", percent, '%'))._()._()._().
+          td().
+            a(master == null ? "#" : join("http://", am), am)._()._();
+      if (list.rendering != Render.HTML && ++i >= 20) break;
+    }
+    tbody._()._();
+
+    if (list.rendering == Render.JS_ARRAY) {
+      echo("<script type='text/javascript'>\n",
+           "var appsData=");
+      list.toDataTableArrays(writer());
+      echo("\n</script>\n");
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,82 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+import com.google.inject.servlet.RequestScoped;
+
+import java.io.PrintWriter;
+import java.util.List;
+
+import static org.apache.commons.lang.StringEscapeUtils.*;
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+import static org.apache.hadoop.yarn.webapp.view.Jsons.*;
+
+import org.apache.hadoop.yarn.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.ToJSON;
+import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
+
+// So we only need to do asm.getApplications once in a request
+@RequestScoped
+class AppsList implements ToJSON {
+  final RequestContext rc;
+  final List<Application> apps;
+  Render rendering;
+
+  @Inject AppsList(RequestContext ctx, ApplicationsManager asm) {
+    rc = ctx;
+    apps = asm.getApplications();
+  }
+
+  void toDataTableArrays(PrintWriter out) {
+    out.append('[');
+    boolean first = true;
+    for (Application app : apps) {
+      if (first) {
+        first = false;
+      } else {
+        out.append(",\n");
+      }
+      String appID = Apps.toString(app.id());
+      CharSequence master = app.master();
+      String ui = master == null ? "UNASSIGNED"
+                                 : join(master, ':', app.httpPort());
+      out.append("[\"");
+      appendSortable(out, app.id().id);
+      appendLink(out, appID, rc.prefix(), "app", appID).append(_SEP).
+          append(escapeHtml(app.user().toString())).append(_SEP).
+          append(escapeHtml(app.name().toString())).append(_SEP).
+          append(app.state().toString()).append(_SEP);
+      appendProgressBar(out, app.status().progress).append(_SEP);
+      appendLink(out, ui, rc.prefix(), master == null ? "#" : "http://", ui).
+          append("\"]");
+    }
+    out.append(']');
+  }
+
+  @Override
+  public void toJSON(PrintWriter out) {
+    out.print("{\"aaData\":");
+    toDataTableArrays(out);
+    out.print("}\n");
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,179 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+import com.google.inject.servlet.RequestScoped;
+
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Queue;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+class CapacitySchedulerPage extends RmView {
+  static final String _Q = ".ui-state-default.ui-corner-all";
+  static final float WIDTH_F = 0.8f;
+  static final String Q_END = "left:101%";
+  static final String OVER = "font-size:1px;background:rgba(255, 140, 0, 0.8)";
+  static final String UNDER = "font-size:1px;background:rgba(50, 205, 50, 0.8)";
+  static final float EPSILON = 1e-8f;
+
+  @RequestScoped
+  static class Parent {
+    Queue queue;
+  }
+
+  public static class QueueBlock extends HtmlBlock {
+    final Parent parent;
+
+    @Inject QueueBlock(Parent parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void render(Block html) {
+      UL<Hamlet> ul = html.ul();
+      Queue parentQueue = parent.queue;
+      for (Queue queue : parentQueue.getChildQueues()) {
+        float used = queue.getUsedCapacity();
+        float set = queue.getCapacity();
+        float delta = Math.abs(set - used) + 0.001f;
+        float max = queue.getMaximumCapacity();
+        if (max < EPSILON) max = 1f;
+        String absMaxPct = percent(queue.getAbsoluteMaximumCapacity());
+        LI<UL<Hamlet>> li = ul.
+          li().
+            a(_Q).$style(width(max * WIDTH_F)).
+              $title(join("used:", percent(used), " set:", percent(set),
+                          " max:", percent(max))).
+              //span().$style(Q_END)._(absMaxPct)._().
+              span().$style(join(width(delta/max), ';',
+                used > set ? OVER : UNDER, ';',
+                used > set ? left(set/max) : left(used/max)))._('.')._().
+              span(".q", queue.getQueuePath().substring(5))._();
+        if (queue instanceof ParentQueue) {
+          parent.queue = queue;
+          li.
+            _(QueueBlock.class);
+        }
+        li._();
+      }
+      ul._();
+    }
+  }
+
+  static class QueuesBlock extends HtmlBlock {
+    final CapacityScheduler cs;
+    final Parent parent;
+
+    @Inject QueuesBlock(ResourceManager rm, Parent parent) {
+      cs = (CapacityScheduler) rm.getResourceScheduler();
+      this.parent = parent;
+    }
+
+    @Override
+    public void render(Block html) {
+      UL<DIV<DIV<Hamlet>>> ul = html.
+        div("#cs-wrapper.ui-widget").
+          div(".ui-widget-header.ui-corner-top").
+            _("Application Queues")._().
+          div("#cs.ui-widget-content.ui-corner-bottom").
+            ul();
+      if (cs == null) {
+        ul.
+          li().
+            a(_Q).$style(width(WIDTH_F)).
+              span().$style(Q_END)._("100% ")._().
+              span(".q", "default")._()._();
+      } else {
+        Queue root = cs.getRootQueue();
+        parent.queue = root;
+        float used = root.getUsedCapacity();
+        float set = root.getCapacity();
+        float delta = Math.abs(set - used) + 0.001f;
+        ul.
+          li().
+            a(_Q).$style(width(WIDTH_F)).
+              $title(join("used:", percent(used))).
+              span().$style(Q_END)._("100%")._().
+              span().$style(join(width(delta), ';', used > set ? OVER : UNDER,
+                ';', used > set ? left(set) : left(used)))._(".")._().
+              span(".q", "root")._().
+            _(QueueBlock.class)._();
+      }
+      ul._()._().
+      script().$type("text/javascript").
+          _("$('#cs').hide();")._()._().
+      _(AppsBlock.class);
+    }
+  }
+
+  @Override protected void postHead(Page.HTML<_> html) {
+    html.
+      style().$type("text/css").
+        _("#cs { padding: 0.5em 0 1em 0; margin-bottom: 1em; position: relative }",
+          "#cs ul { list-style: none }",
+          "#cs a { font-weight: normal; margin: 2px; position: relative }",
+          "#cs a span { font-weight: normal; font-size: 80% }",
+          "#cs-wrapper .ui-widget-header { padding: 0.2em 0.5em }")._().
+      script("/static/jt/jquery.jstree.js").
+      script().$type("text/javascript").
+        _("$(function() {",
+          "  $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');",
+          "  $('#cs').bind('loaded.jstree', function (e, data) {",
+          "    data.inst.open_all(); }).",
+          "    jstree({",
+          "    core: { animation: 188, html_titles: true },",
+          "    plugins: ['themeroller', 'html_data', 'ui'],",
+          "    themeroller: { item_open: 'ui-icon-minus',",
+          "      item_clsd: 'ui-icon-plus', item_leaf: 'ui-icon-gear'",
+          "    }",
+          "  });",
+          "  $('#cs').bind('select_node.jstree', function(e, data) {",
+          "    var q = $('.q', data.rslt.obj).first().text();",
+            "    if (q == 'root') q = '';",
+          "    $('#apps').dataTable().fnFilter(q, 3);",
+          "  });",
+          "  $('#cs').show();",
+          "});")._();
+  }
+
+  @Override protected Class<? extends SubView> content() {
+    return QueuesBlock.class;
+  }
+
+  static String percent(float f) {
+    return String.format("%.1f%%", f * 100);
+  }
+
+  static String width(float f) {
+    return String.format("width:%.1f%%", f * 100);
+  }
+
+  static String left(float f) {
+    return String.format("left:%.1f%%", f * 100);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+class DefaultSchedulerPage extends RmView {
+
+  static class QueueBlock extends HtmlBlock {
+    @Override public void render(Block html) {
+      html.h2("Under construction");
+    }
+  }
+
+  @Override protected Class<? extends SubView> content() {
+    return QueueBlock.class;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/InfoPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/InfoPage.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/InfoPage.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/InfoPage.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.InfoBlock;
+
+public class InfoPage extends RmView {
+
+  @Override protected void preHead(Page.HTML<_> html) {
+    commonPreHead(html);
+  }
+
+  @Override protected Class<? extends SubView> content() {
+    return InfoBlock.class;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+public class NavBlock extends HtmlBlock {
+
+  @Override public void render(Block html) {
+    html.
+      div("#nav").
+        h3("Cluster").
+        ul().
+          li().a(url("cluster"), "About")._().
+          li().a(url("nodes"), "Nodes")._().
+          li().a(url("apps"), "Applications")._().
+          li().a(url("scheduler"), "Scheduler")._()._().
+        h3("Tools").
+        ul().
+          li().a("/conf", "Configuration")._().
+          li().a("/logs", "Local logs")._().
+          li().a("/stacks", "Server stacks")._().
+          li().a("/metrics", "Server metrics")._()._()._().
+      div("#themeswitcher")._();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,86 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ResourceContext;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+class NodesPage extends RmView {
+
+  static class NodesBlock extends HtmlBlock {
+    final ResourceContext resource;
+
+    @Inject
+    NodesBlock(ResourceContext rc, ViewContext ctx) {
+      super(ctx);
+      resource = rc;
+    }
+
+    @Override
+    protected void render(Block html) {
+      TBODY<TABLE<Hamlet>> tbody = html.table("#nodes").
+          thead().
+          tr().
+          th(".rack", "Rack").
+          th(".nodeid", "Node ID").
+          th(".host", "Host").
+          th(".containers", "Containers").
+          th(".mem", "Mem Used (MB)").
+          th(".mem", "Mem Avail (MB)")._()._().
+          tbody();
+      for (NodeInfo ni : resource.getAllNodeInfo()) {
+        tbody.tr().
+            td(ni.getRackName()).
+            td(String.valueOf(ni.getNodeID().id)).
+            td(ni.getHostName()).
+            td(String.valueOf(ni.getNumContainers())).
+            td(String.valueOf(ni.getUsedResource().memory)).
+            td(String.valueOf(ni.getAvailableResource().memory))._();
+      }
+      tbody._()._();
+    }
+  }
+
+  @Override protected void preHead(Page.HTML<_> html) {
+    commonPreHead(html);
+    setTitle("Nodes of the cluster");
+    set(DATATABLES_ID, "nodes");
+    set(initID(DATATABLES, "nodes"), nodesTableInit());
+    setTableStyles(html, "nodes");
+  }
+
+  @Override protected Class<? extends SubView> content() {
+    return NodesBlock.class;
+  }
+
+  private String nodesTableInit() {
+    return tableInit().
+        // rack, nodeid, host, containers, memused, memavail
+        append(", aoColumns:[null, null, null, {bSearchable:false}, ").
+        append("{bSearchable:false}, {bSearchable:false}]}").toString();
+  }
+}



Mime
View raw message