Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug 3 11:51:20 2011
@@ -36,9 +36,8 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -47,20 +46,18 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -85,8 +82,10 @@ public class LeafQueue implements Queue
private float usedCapacity = 0.0f;
private volatile int numContainers;
- Set<CSApp> applications;
-
+ Set<SchedulerApp> applications;
+ Map<ApplicationAttemptId, SchedulerApp> applicationsMap =
+ new HashMap<ApplicationAttemptId, SchedulerApp>();
+
public final Resource minimumAllocation;
private ContainerTokenSecretManager containerTokenSecretManager;
@@ -109,7 +108,7 @@ public class LeafQueue implements Queue
public LeafQueue(CapacitySchedulerContext cs,
String queueName, Queue parent,
- Comparator<CSApp> applicationComparator, Queue old) {
+ Comparator<SchedulerApp> applicationComparator, Queue old) {
this.scheduler = cs;
this.queueName = queueName;
this.parent = parent;
@@ -158,7 +157,7 @@ public class LeafQueue implements Queue
" name=" + queueName +
", fullname=" + getQueuePath());
- this.applications = new TreeSet<CSApp>(applicationComparator);
+ this.applications = new TreeSet<SchedulerApp>(applicationComparator);
}
private synchronized void setupQueueConfigs(
@@ -362,7 +361,7 @@ public class LeafQueue implements Queue
}
@Override
- public void submitApplication(CSApp application, String userName,
+ public void submitApplication(SchedulerApp application, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
@@ -423,10 +422,11 @@ public class LeafQueue implements Queue
}
}
- private synchronized void addApplication(CSApp application, User user) {
+ private synchronized void addApplication(SchedulerApp application, User user) {
// Accept
user.submitApplication();
applications.add(application);
+ applicationsMap.put(application.getApplicationAttemptId(), application);
LOG.info("Application added -" +
" appId: " + application.getApplicationId() +
@@ -436,7 +436,7 @@ public class LeafQueue implements Queue
}
@Override
- public void finishApplication(CSApp application, String queue) {
+ public void finishApplication(SchedulerApp application, String queue) {
// Careful! Locking order is important!
synchronized (this) {
removeApplication(application, getUser(application.getUser()));
@@ -446,8 +446,9 @@ public class LeafQueue implements Queue
parent.finishApplication(application, queue);
}
- public synchronized void removeApplication(CSApp application, User user) {
+ public synchronized void removeApplication(SchedulerApp application, User user) {
applications.remove(application);
+ applicationsMap.remove(application.getApplicationAttemptId());
user.finishApplication();
if (user.getApplications() == 0) {
@@ -461,24 +462,31 @@ public class LeafQueue implements Queue
" #user-applications: " + user.getApplications() +
" #queue-applications: " + getNumApplications());
}
+
+ private synchronized SchedulerApp getApplication(
+ ApplicationAttemptId applicationAttemptId) {
+ return applicationsMap.get(applicationAttemptId);
+ }
@Override
public synchronized Resource
- assignContainers(Resource clusterResource, CSNode node) {
+ assignContainers(Resource clusterResource, SchedulerNode node) {
LOG.info("DEBUG --- assignContainers:" +
" node=" + node.getNodeAddress() +
" #applications=" + applications.size());
// Check for reserved resources
- CSApp reservedApplication = node.getReservedApplication();
- if (reservedApplication != null) {
- return assignReservedContainers(reservedApplication, node,
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+ SchedulerApp application =
+ getApplication(reservedContainer.getApplicationAttemptId());
+ return assignReservedContainer(application, node, reservedContainer,
clusterResource);
}
-
- // Try to assign containers to applications in fifo order
- for (CSApp application : applications) {
+
+ // Try to assign containers to applications in order
+ for (SchedulerApp application : applications) {
LOG.info("DEBUG --- pre-assignContainers for application "
+ application.getApplicationId());
@@ -497,6 +505,7 @@ public class LeafQueue implements Queue
}
// Are we going over limits by allocating to this application?
+
ResourceRequest required =
application.getResourceRequest(priority, RMNode.ANY);
@@ -520,7 +529,7 @@ public class LeafQueue implements Queue
// Try to schedule
Resource assigned =
assignContainersOnNode(clusterResource, node, application, priority,
- false);
+ null);
// Did we schedule or reserve a container?
if (Resources.greaterThan(assigned, Resources.none())) {
@@ -552,30 +561,21 @@ public class LeafQueue implements Queue
}
- private synchronized Resource assignReservedContainers(CSApp application,
- CSNode node, Resource clusterResource) {
- synchronized (application) {
- for (Priority priority : application.getPriorities()) {
-
- // Do we reserve containers at this 'priority'?
- if (application.isReserved(node, priority)) {
-
- // Do we really need this reservation still?
- ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, RMNode.ANY);
- if (offSwitchRequest.getNumContainers() == 0) {
- // Release
- unreserve(application, priority, node);
- return offSwitchRequest.getCapability();
- }
-
- // Try to assign if we have sufficient resources
- assignContainersOnNode(clusterResource, node, application, priority,
- true);
- }
- }
+ private synchronized Resource assignReservedContainer(SchedulerApp application,
+ SchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
+ // Do we still need this reservation?
+ Priority priority = rmContainer.getReservedPriority();
+ if (application.getTotalRequiredResources(priority) == 0) {
+ // Release
+ Container container = rmContainer.getContainer();
+ completedContainer(clusterResource, application, node,
+ rmContainer, RMContainerEventType.RELEASED);
+ return container.getResource();
}
+ // Try to assign if we have sufficient resources
+ assignContainersOnNode(clusterResource, node, application, priority, rmContainer);
+
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
@@ -599,12 +599,12 @@ public class LeafQueue implements Queue
return true;
}
- private void setUserResourceLimit(CSApp application, Resource resourceLimit) {
+ private void setUserResourceLimit(SchedulerApp application, Resource resourceLimit) {
application.setAvailableResourceLimit(resourceLimit);
metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit);
}
- private Resource computeUserLimit(CSApp application,
+ private Resource computeUserLimit(SchedulerApp application,
Resource clusterResource, Resource required) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
@@ -688,80 +688,87 @@ public class LeafQueue implements Queue
return (a + (b - 1)) / b;
}
- boolean needContainers(CSApp application, Priority priority) {
- ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, RMNode.ANY);
-
- int requiredContainers = offSwitchRequest.getNumContainers();
- int reservedContainers = application.getReservedContainers(priority);
+ boolean needContainers(SchedulerApp application, Priority priority) {
+ int requiredContainers = application.getTotalRequiredResources(priority);
+ int reservedContainers = application.getNumReservedContainers(priority);
return ((requiredContainers - reservedContainers) > 0);
}
- Resource assignContainersOnNode(Resource clusterResource, CSNode node,
- CSApp application, Priority priority, boolean reserved) {
+ Resource assignContainersOnNode(Resource clusterResource, SchedulerNode node,
+ SchedulerApp application, Priority priority, RMContainer reservedContainer) {
Resource assigned = Resources.none();
// Data-local
- assigned = assignNodeLocalContainers(clusterResource, node, application, priority);
+ assigned =
+ assignNodeLocalContainers(clusterResource, node, application, priority,
+ reservedContainer);
if (Resources.greaterThan(assigned, Resources.none())) {
return assigned;
}
// Rack-local
- assigned = assignRackLocalContainers(clusterResource, node, application, priority);
+ assigned =
+ assignRackLocalContainers(clusterResource, node, application, priority,
+ reservedContainer);
if (Resources.greaterThan(assigned, Resources.none())) {
- return assigned;
+ return assigned;
}
// Off-switch
return assignOffSwitchContainers(clusterResource, node, application,
- priority, reserved);
+ priority, reservedContainer);
}
- Resource assignNodeLocalContainers(Resource clusterResource, CSNode node,
- CSApp application, Priority priority) {
+ Resource assignNodeLocalContainers(Resource clusterResource, SchedulerNode node,
+ SchedulerApp application, Priority priority,
+ RMContainer reservedContainer) {
ResourceRequest request = application.getResourceRequest(priority, node
.getNodeAddress());
if (request != null) {
- if (canAssign(application, priority, node, NodeType.DATA_LOCAL, false)) {
+ if (canAssign(application, priority, node, NodeType.DATA_LOCAL,
+ reservedContainer)) {
return assignContainer(clusterResource, node, application, priority, request,
- NodeType.DATA_LOCAL);
+ NodeType.DATA_LOCAL, reservedContainer);
}
}
return Resources.none();
}
- Resource assignRackLocalContainers(Resource clusterResource, CSNode node,
- CSApp application, Priority priority) {
+ Resource assignRackLocalContainers(Resource clusterResource,
+ SchedulerNode node, SchedulerApp application, Priority priority,
+ RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, node.getRackName());
if (request != null) {
- if (canAssign(application, priority, node, NodeType.RACK_LOCAL, false)) {
+ if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
+ reservedContainer)) {
return assignContainer(clusterResource, node, application, priority, request,
- NodeType.RACK_LOCAL);
+ NodeType.RACK_LOCAL, reservedContainer);
}
}
return Resources.none();
}
- Resource assignOffSwitchContainers(Resource clusterResource, CSNode node,
- CSApp application, Priority priority, boolean reserved) {
+ Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node,
+ SchedulerApp application, Priority priority,
+ RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, RMNode.ANY);
if (request != null) {
- if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reserved)) {
+ if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
+ reservedContainer)) {
return assignContainer(clusterResource, node, application, priority, request,
- NodeType.OFF_SWITCH);
+ NodeType.OFF_SWITCH, reservedContainer);
}
}
return Resources.none();
}
- boolean canAssign(CSApp application, Priority priority,
- CSNode node, NodeType type, boolean reserved) {
+ boolean canAssign(SchedulerApp application, Priority priority,
+ SchedulerNode node, NodeType type, RMContainer reservedContainer) {
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, RMNode.ANY);
@@ -781,18 +788,18 @@ public class LeafQueue implements Queue
if (requiredContainers > 0) {
// No 'delay' for reserved containers
- if (reserved) {
+ if (reservedContainer != null) {
return true;
}
-// // Check if we have waited long enough
-// if (missedNodes < (requiredContainers * localityWaitFactor)) {
-// LOG.info("Application " + application.getApplicationId() +
-// " has missed " + missedNodes + " opportunities," +
-// " waitFactor= " + localityWaitFactor +
-// " for cluster of size " + scheduler.getNumClusterNodes());
-// return false;
-// }
+ // Check if we have waited long enough
+ if (missedNodes < (requiredContainers * localityWaitFactor)) {
+ LOG.info("Application " + application.getApplicationId() +
+ " has missed " + missedNodes + " opportunities," +
+ " waitFactor= " + localityWaitFactor +
+ " for cluster of size " + scheduler.getNumClusterNodes());
+ return false;
+ }
return true;
}
return false;
@@ -830,157 +837,162 @@ public class LeafQueue implements Queue
return false;
}
- private Resource assignContainer(Resource clusterResource, CSNode node,
- CSApp application,
- Priority priority, ResourceRequest request, NodeType type) {
+
+ private Container getContainer(RMContainer rmContainer,
+ SchedulerApp application, SchedulerNode node, Resource capability) {
+ if (rmContainer != null) {
+ return rmContainer.getContainer();
+ }
+
+ Container container =
+ BuilderUtils.newContainer(this.recordFactory,
+ application.getApplicationAttemptId(),
+ application.getNewContainerId(),
+ node.getNodeID(),
+ node.getHttpAddress(), capability);
+
+ // If security is enabled, send the container-tokens too.
+ if (UserGroupInformation.isSecurityEnabled()) {
+ ContainerToken containerToken =
+ this.recordFactory.newRecordInstance(ContainerToken.class);
+ ContainerTokenIdentifier tokenidentifier =
+ new ContainerTokenIdentifier(container.getId(),
+ container.getNodeId().toString(), container.getResource());
+ containerToken.setIdentifier(
+ ByteBuffer.wrap(tokenidentifier.getBytes()));
+ containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
+ containerToken.setPassword(
+ ByteBuffer.wrap(
+ containerTokenSecretManager.createPassword(tokenidentifier))
+ );
+ containerToken.setService(container.getNodeId().toString());
+ container.setContainerToken(containerToken);
+ }
+
+ return container;
+ }
+
+ private Resource assignContainer(Resource clusterResource, SchedulerNode node,
+ SchedulerApp application, Priority priority,
+ ResourceRequest request, NodeType type, RMContainer rmContainer) {
LOG.info("DEBUG --- assignContainers:" +
" node=" + node.getNodeAddress() +
" application=" + application.getApplicationId().getId() +
" priority=" + priority.getPriority() +
" request=" + request + " type=" + type);
Resource capability = request.getCapability();
-
- Resource available = node.getAvailableResource();
- if (available.getMemory() > 0) {
-
- int availableContainers =
- available.getMemory() / capability.getMemory(); // TODO: A buggy
- // application
- // with this
- // zero would
- // crash the
- // scheduler.
-
- if (availableContainers > 0) {
- List<Container> containers =
- new ArrayList<Container>();
- Container container =
- BuilderUtils.newContainer(this.recordFactory,
- application.getApplicationAttemptId(),
- application.getNewContainerId(),
- node.getNodeID(),
- node.getHttpAddress(), capability);
-
- // If security is enabled, send the container-tokens too.
- if (UserGroupInformation.isSecurityEnabled()) {
- ContainerToken containerToken = this.recordFactory.newRecordInstance(ContainerToken.class);
- ContainerTokenIdentifier tokenidentifier =
- new ContainerTokenIdentifier(container.getId(),
- container.getNodeId().toString(), container.getResource());
- containerToken.setIdentifier(ByteBuffer.wrap(tokenidentifier.getBytes()));
- containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
- containerToken.setPassword(ByteBuffer.wrap(containerTokenSecretManager
- .createPassword(tokenidentifier)));
- containerToken.setService(container.getNodeId().toString());
- container.setContainerToken(containerToken);
- }
-
- containers.add(container);
-
- // Allocate
- allocate(application, type, priority, request, node, containers);
+ Resource available = node.getAvailableResource();
- // Did we previously reserve containers at this 'priority'?
- if (application.isReserved(node, priority)){
- unreserve(application, priority, node);
- }
-
- LOG.info("assignedContainer" +
- " application=" + application.getApplicationId() +
- " container=" + container +
- " queue=" + this.toString() +
- " util=" + getUtilization() +
- " used=" + usedResources +
- " cluster=" + clusterResource);
+ assert (available.getMemory() > 0);
- return container.getResource();
- } else {
- // Reserve by 'charging' in advance...
- reserve(application, priority, node, request.getCapability());
-
- LOG.info("Reserved container " +
- " application=" + application.getApplicationId() +
- " resource=" + request.getCapability() +
- " queue=" + this.toString() +
- " util=" + getUtilization() +
- " used=" + usedResources +
- " cluster=" + clusterResource);
+ // Create the container if necessary
+ Container container =
+ getContainer(rmContainer, application, node, capability);
+
+ // Can we allocate a container on this node?
+ int availableContainers =
+ available.getMemory() / capability.getMemory();
+ if (availableContainers > 0) {
+ // Allocate...
- return request.getCapability();
+ // Did we previously reserve containers at this 'priority'?
+ if (rmContainer != null){
+ unreserve(application, priority, node, rmContainer);
+ }
+ // Inform the application
+ RMContainer allocatedContainer =
+ application.allocate(type, node, priority, request, container);
+ if (allocatedContainer == null) {
+ // Did the application need this resource?
+ return Resources.none();
}
- }
- return Resources.none();
+ // Inform the node
+ node.allocateContainer(application.getApplicationId(),
+ allocatedContainer);
+
+ LOG.info("assignedContainer" +
+ " application=" + application.getApplicationId() +
+ " container=" + container +
+ " containerId=" + container.getId() +
+ " queue=" + this +
+ " util=" + getUtilization() +
+ " used=" + usedResources +
+ " cluster=" + clusterResource);
+
+ return container.getResource();
+ } else {
+ // Reserve by 'charging' in advance...
+ reserve(application, priority, node, rmContainer, container);
+
+ LOG.info("Reserved container " +
+ " application=" + application.getApplicationId() +
+ " resource=" + request.getCapability() +
+ " queue=" + this.toString() +
+ " util=" + getUtilization() +
+ " used=" + usedResources +
+ " cluster=" + clusterResource);
+
+ return request.getCapability();
+ }
}
- private void allocate(CSApp application, NodeType type,
- Priority priority, ResourceRequest request,
- CSNode node, List<Container> containers) {
- // Allocate container to the application
- // TODO: acm: refactor2 FIXME
- application.allocate(type, node, priority, request, null);
-
- for (Container container : containers) {
- // Create the container and 'start' it.
- ContainerId containerId = container.getId();
- RMContext rmContext = this.scheduler.getRMContext();
- EventHandler eventHandler = rmContext.getDispatcher().getEventHandler();
- RMContainer rmContainer = new RMContainerImpl(container, application
- .getApplicationAttemptId(), node.getNodeID(),
- eventHandler, rmContext.getContainerAllocationExpirer());
- // TODO: FIX
-// if (rmContext.getRMContainers().putIfAbsent(containerId, rmContainer) != null) {
-// LOG.error("Duplicate container addition! ContainerID : "
-// + containerId);
-// } else {
-// eventHandler.handle(new RMContainerEvent(containerId,
-// RMContainerEventType.START));
-// }
- }
-
- // Inform the NodeManager about the allocation
- // TODO: acm: refactor2 FIXME
-// node.allocateContainer(application.getApplicationId(),
-// containers);
- }
-
- private void reserve(CSApp application, Priority priority,
- CSNode node, Resource resource) {
- application.reserveResource(node, priority, resource);
- node.reserveResource(application, priority, resource);
+ private void reserve(SchedulerApp application, Priority priority,
+ SchedulerNode node, RMContainer rmContainer, Container container) {
+ rmContainer = application.reserve(node, priority, rmContainer, container);
+ node.reserveResource(application, priority, rmContainer);
+
+ // Update reserved metrics if this is the first reservation
+ if (rmContainer == null) {
+ getMetrics().reserveResource(
+ application.getUser(), container.getResource());
+ }
}
- private void unreserve(CSApp application, Priority priority,
- CSNode node) {
+ private void unreserve(SchedulerApp application, Priority priority,
+ SchedulerNode node, RMContainer rmContainer) {
// Done with the reservation?
- if (application.isReserved(node, priority)) {
- application.unreserveResource(node, priority);
- node.unreserveResource(application, priority);
- }
+ application.unreserve(node, priority);
+ node.unreserveResource(application);
+
+ // Update reserved metrics
+ getMetrics().unreserveResource(
+ application.getUser(), rmContainer.getContainer().getResource());
}
@Override
public void completedContainer(Resource clusterResource,
- Container container, Resource containerResource, CSApp application) {
+ SchedulerApp application, SchedulerNode node, RMContainer rmContainer,
+ RMContainerEventType event) {
if (application != null) {
// Careful! Locking order is important!
synchronized (this) {
+
+ Container container = rmContainer.getContainer();
- // Inform the application - this might be an allocated container or
- // an unfulfilled reservation
- // TODO: acm: refactor2 FIXME
- //application.completedContainer(container, containerResource);
-
+ // Inform the application & the node
+ // Note: It's safe to assume that all state changes to RMContainer
+ // happen under scheduler's lock...
+ // So, this is, in effect, a transaction across application & node
+ if (rmContainer.getState() == RMContainerState.RESERVED) {
+ application.unreserve(node, rmContainer.getReservedPriority());
+ node.unreserveResource(application);
+ } else {
+ application.containerCompleted(rmContainer, event);
+ node.releaseContainer(container);
+ }
+
+
// Book-keeping
releaseResource(clusterResource,
- application.getUser(), containerResource);
+ application.getUser(), container.getResource());
LOG.info("completedContainer" +
" container=" + container +
- " resource=" + containerResource +
+ " resource=" + container.getResource() +
" queue=" + this +
" util=" + getUtilization() +
" used=" + usedResources +
@@ -988,29 +1000,41 @@ public class LeafQueue implements Queue
}
// Inform the parent queue
- parent.completedContainer(clusterResource, container,
- containerResource, application);
+ parent.completedContainer(clusterResource, application,
+ node, rmContainer, event);
}
}
private synchronized void allocateResource(Resource clusterResource,
String userName, Resource resource) {
+ // Update queue metrics
Resources.addTo(usedResources, resource);
updateResource(clusterResource);
++numContainers;
+ // Update user metrics
User user = getUser(userName);
user.assignContainer(resource);
+
+ LOG.info(getQueueName() +
+ " used=" + usedResources + " numContainers=" + numContainers +
+ " user=" + userName + " resources=" + user.getConsumedResources());
}
private synchronized void releaseResource(Resource clusterResource,
String userName, Resource resource) {
+ // Update queue metrics
Resources.subtractFrom(usedResources, resource);
updateResource(clusterResource);
--numContainers;
+ // Update user metrics
User user = getUser(userName);
user.releaseContainer(resource);
+
+ LOG.info(getQueueName() +
+ " used=" + usedResources + " numContainers=" + numContainers +
+ " user=" + userName + " resources=" + user.getConsumedResources());
}
@Override
@@ -1062,7 +1086,7 @@ public class LeafQueue implements Queue
@Override
public void recoverContainer(Resource clusterResource,
- CSApp application, Container container) {
+ SchedulerApp application, Container container) {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, application.getUser(), container.getResource());
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Aug 3 11:51:20 2011
@@ -36,7 +36,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -46,7 +45,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@Private
@Evolving
@@ -396,7 +399,7 @@ public class ParentQueue implements Queu
}
@Override
- public void submitApplication(CSApp application, String user,
+ public void submitApplication(SchedulerApp application, String user,
String queue) throws AccessControlException {
synchronized (this) {
@@ -428,7 +431,7 @@ public class ParentQueue implements Queu
}
}
- private synchronized void addApplication(CSApp application,
+ private synchronized void addApplication(SchedulerApp application,
String user) {
++numApplications;
@@ -441,7 +444,7 @@ public class ParentQueue implements Queu
}
@Override
- public void finishApplication(CSApp application, String queue) {
+ public void finishApplication(SchedulerApp application, String queue) {
synchronized (this) {
removeApplication(application, application.getUser());
@@ -453,7 +456,7 @@ public class ParentQueue implements Queu
}
}
- public synchronized void removeApplication(CSApp application,
+ public synchronized void removeApplication(SchedulerApp application,
String user) {
--numApplications;
@@ -475,7 +478,7 @@ public class ParentQueue implements Queu
@Override
public synchronized Resource assignContainers(
- Resource clusterResource, CSNode node) {
+ Resource clusterResource, SchedulerNode node) {
Resource assigned = Resources.createResource(0);
while (canAssign(node)) {
@@ -539,14 +542,14 @@ public class ParentQueue implements Queu
}
- private boolean canAssign(CSNode node) {
- return (node.getReservedApplication() == null) &&
+ private boolean canAssign(SchedulerNode node) {
+ return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(node.getAvailableResource(),
minimumAllocation);
}
synchronized Resource assignContainersToChildQueues(Resource cluster,
- CSNode node) {
+ SchedulerNode node) {
Resource assigned = Resources.createResource(0);
printChildQueues();
@@ -588,13 +591,14 @@ public class ParentQueue implements Queu
@Override
public void completedContainer(Resource clusterResource,
- Container container, Resource containerResource,
- CSApp application) {
+ SchedulerApp application, SchedulerNode node,
+ RMContainer rmContainer, RMContainerEventType event) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
synchronized (this) {
- releaseResource(clusterResource, containerResource);
+ releaseResource(clusterResource,
+ rmContainer.getContainer().getResource());
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -605,8 +609,8 @@ public class ParentQueue implements Queu
// Inform the parent
if (parent != null) {
- parent.completedContainer(clusterResource, container,
- containerResource, application);
+ parent.completedContainer(clusterResource, application,
+ node, rmContainer, event);
}
}
}
@@ -646,7 +650,7 @@ public class ParentQueue implements Queu
@Override
public void recoverContainer(Resource clusterResource,
- CSApp application, Container container) {
+ SchedulerApp application, Container container) {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, container.getResource());
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Wed Aug 3 11:51:20 2011
@@ -26,12 +26,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
/**
* Queue represents a node in the tree of
@@ -138,7 +139,7 @@ extends org.apache.hadoop.yarn.server.re
* @param user user who submitted the application
* @param queue queue to which the application is submitted
*/
- public void submitApplication(CSApp application, String user,
+ public void submitApplication(SchedulerApp application, String user,
String queue)
throws AccessControlException;
@@ -147,7 +148,7 @@ extends org.apache.hadoop.yarn.server.re
* @param application
* @param queue application queue
*/
- public void finishApplication(CSApp application, String queue);
+ public void finishApplication(SchedulerApp application, String queue);
/**
* Assign containers to applications in the queue or it's children (if any).
@@ -155,19 +156,20 @@ extends org.apache.hadoop.yarn.server.re
* @param node node on which resources are available
* @return
*/
- public Resource assignContainers(Resource clusterResource, CSNode node);
+ public Resource assignContainers(Resource clusterResource, SchedulerNode node);
/**
* A container assigned to the queue has completed.
* @param clusterResource the resource of the cluster
+ * @param application application to which the container was assigned
+ * @param node node on which the container completed
* @param container completed container,
* <code>null</code> if it was just a reservation
- * @param containerResource allocated resource
- * @param application application to which the container was assigned
+ * @param event event to be sent to the container
*/
public void completedContainer(Resource clusterResource,
- Container container, Resource containerResource,
- CSApp application);
+ SchedulerApp application, SchedulerNode node,
+ RMContainer container, RMContainerEventType event);
/**
* Get the number of applications in the queue.
@@ -196,6 +198,6 @@ extends org.apache.hadoop.yarn.server.re
* @param application the application for which the container was allocated
* @param container the container that was recovered.
*/
- public void recoverContainer(Resource clusterResource, CSApp application,
+ public void recoverContainer(Resource clusterResource, SchedulerApp application,
Container container);
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug 3 11:51:20 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -62,7 +63,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -225,7 +225,8 @@ public class FifoScheduler implements Re
// Release containers
for (Container releasedContainer : release) {
- containerCompleted(releasedContainer, RMContainerEventType.RELEASED);
+ containerCompleted(getRMContainer(releasedContainer),
+ RMContainerEventType.RELEASED);
}
if (!ask.isEmpty()) {
@@ -261,8 +262,9 @@ public class FifoScheduler implements Re
private void normalizeRequest(ResourceRequest ask) {
int memory = ask.getCapability().getMemory();
// FIXME: TestApplicationCleanup is relying on unnormalized behavior.
- memory = MINIMUM_MEMORY *
- ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
+ memory =
+ MINIMUM_MEMORY *
+ ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
ask.setCapability(Resources.createResource(memory));
}
@@ -279,12 +281,12 @@ public class FifoScheduler implements Re
String queueName, String user) {
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
appAttemptId, queueName, user, null);
- SchedulerApp schedulerApp = new SchedulerApp(appSchedulingInfo,
- DEFAULT_QUEUE);
+ SchedulerApp schedulerApp =
+ new SchedulerApp(this.rmContext, appSchedulingInfo, DEFAULT_QUEUE);
applications.put(appAttemptId, schedulerApp);
metrics.submitApp(user);
- LOG.info("Application Submission: " + appAttemptId.getApplicationId() + " from " + user +
- ", currently active: " + applications.size());
+ LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
+ " from " + user + ", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.APP_ACCEPTED));
@@ -302,7 +304,7 @@ public class FifoScheduler implements Re
// Kill all 'live' containers
for (RMContainer container : application.getLiveContainers()) {
- containerCompleted(container.getContainer(), RMContainerEventType.KILL);
+ containerCompleted(container, RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
@@ -428,7 +430,7 @@ public class FifoScheduler implements Re
NodeType.DATA_LOCAL),
request.getNumContainers());
assignedContainers =
- assignContainers(node, application, priority,
+ assignContainer(node, application, priority,
assignableContainers, request, NodeType.DATA_LOCAL);
}
return assignedContainers;
@@ -446,7 +448,7 @@ public class FifoScheduler implements Re
NodeType.RACK_LOCAL),
request.getNumContainers());
assignedContainers =
- assignContainers(node, application, priority,
+ assignContainer(node, application, priority,
assignableContainers, request, NodeType.RACK_LOCAL);
}
return assignedContainers;
@@ -459,13 +461,13 @@ public class FifoScheduler implements Re
application.getResourceRequest(priority, SchedulerNode.ANY);
if (request != null) {
assignedContainers =
- assignContainers(node, application, priority,
+ assignContainer(node, application, priority,
request.getNumContainers(), request, NodeType.OFF_SWITCH);
}
return assignedContainers;
}
- private int assignContainers(SchedulerNode node, SchedulerApp application,
+ private int assignContainer(SchedulerNode node, SchedulerApp application,
Priority priority, int assignableContainers,
ResourceRequest request, NodeType type) {
LOG.debug("assignContainers:" +
@@ -495,10 +497,6 @@ public class FifoScheduler implements Re
application.getNewContainerId(),
node.getRMNode().getNodeID(),
node.getRMNode().getHttpAddress(), capability);
- RMContainer rmContainer = new RMContainerImpl(container, application
- .getApplicationAttemptId(), node.getNodeID(), this.rmContext
- .getDispatcher().getEventHandler(), this.rmContext
- .getContainerAllocationExpirer());
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
@@ -518,10 +516,14 @@ public class FifoScheduler implements Re
}
// Allocate!
- application.allocate(type, node, priority, request,
- Collections.singletonList(rmContainer));
+
+ // Inform the application
+ RMContainer rmContainer =
+ application.allocate(type, node, priority, request, container);
+
+ // Inform the node
node.allocateContainer(application.getApplicationId(),
- container);
+ rmContainer);
}
// Update total usage
@@ -541,7 +543,8 @@ public class FifoScheduler implements Re
if (container.getState() == ContainerState.RUNNING) {
containerLaunchedOnNode(container, node);
} else { // has to COMPLETE
- containerCompleted(container, RMContainerEventType.FINISHED);
+ containerCompleted(getRMContainer(container),
+ RMContainerEventType.FINISHED);
}
}
}
@@ -607,7 +610,7 @@ public class FifoScheduler implements Re
{
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
- containerCompleted(containerExpiredEvent.getContainer(),
+ containerCompleted(getRMContainer(containerExpiredEvent.getContainer()),
RMContainerEventType.EXPIRE);
}
break;
@@ -631,9 +634,10 @@ public class FifoScheduler implements Re
}
@Lock(FifoScheduler.class)
- private synchronized void containerCompleted(Container container,
+ private synchronized void containerCompleted(RMContainer rmContainer,
RMContainerEventType event) {
// Get the application for the finished container
+ Container container = rmContainer.getContainer();
ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
@@ -649,7 +653,7 @@ public class FifoScheduler implements Re
}
// Inform the application
- application.containerCompleted(container, event);
+ application.containerCompleted(rmContainer, event);
// Inform the node
node.releaseContainer(container);
@@ -667,7 +671,7 @@ public class FifoScheduler implements Re
private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = getNode(nodeInfo.getNodeID());
// Kill running containers
- for(Container container : node.getRunningContainers()) {
+ for(RMContainer container : node.getRunningContainers()) {
containerCompleted(container, RMContainerEventType.KILL);
}
@@ -696,6 +700,7 @@ public class FifoScheduler implements Re
@Override
public void recover(RMState state) {
+ // TODO fix recovery
// for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
// ApplicationId appId = entry.getKey();
// ApplicationInfo appInfo = entry.getValue();
@@ -710,4 +715,12 @@ public class FifoScheduler implements Re
return new SchedulerNodeReport(
node.getUsedResource(), node.getNumContainers());
}
+
+ private RMContainer getRMContainer(Container container) {
+ ContainerId containerId = container.getId();
+ SchedulerApp application =
+ getApplication(container.getId().getAppAttemptId());
+ return application.getRMContainer(containerId);
+ }
+
}
|