Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153445&r1=1153444&r2=1153445&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Wed Aug 3 11:46:46 2011
@@ -37,10 +37,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -61,10 +61,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -72,9 +72,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerFinishedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -169,11 +170,11 @@ public class FifoScheduler implements Re
};
public synchronized Resource getUsedResource(NodeId nodeId) {
- return nodes.get(nodeId).getUsedResource();
+ return getNode(nodeId).getUsedResource();
}
public synchronized Resource getAvailableResource(NodeId nodeId) {
- return nodes.get(nodeId).getAvailableResource();
+ return getNode(nodeId).getAvailableResource();
}
@Override
@@ -206,19 +207,26 @@ public class FifoScheduler implements Re
}
}
+ private static final Allocation EMPTY_ALLOCATION =
+ new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
@Override
- public synchronized void allocate(ApplicationAttemptId applicationAttemptId,
- List<ResourceRequest> ask) {
+ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
+ List<ResourceRequest> ask, List<Container> release) {
SchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
- return;
+ return EMPTY_ALLOCATION;
}
// Sanity check
normalizeRequests(ask);
-
+
+ // Release containers
+ for (Container releasedContainer : release) {
+ completedContainer(releasedContainer, RMContainerEventType.RELEASED);
+ }
+
synchronized (application) {
LOG.debug("allocate: pre-update" +
@@ -237,16 +245,13 @@ public class FifoScheduler implements Re
LOG.debug("allocate:" +
" applicationId=" + applicationAttemptId +
" #ask=" + ask.size());
+
+ return new Allocation(
+ application.pullNewlyAllocatedContainers(),
+ application.getHeadroom());
}
}
- @Override
- public Resource getResourceLimit(ApplicationAttemptId applicationAttemptId) {
- SchedulerApp application = getApplication(applicationAttemptId);
- // TODO: What if null?
- return application.getHeadroom();
- }
-
private void normalizeRequests(List<ResourceRequest> asks) {
for (ResourceRequest ask : asks) {
normalizeRequest(ask);
@@ -256,9 +261,9 @@ public class FifoScheduler implements Re
private void normalizeRequest(ResourceRequest ask) {
int memory = ask.getCapability().getMemory();
// FIXME: TestApplicationCleanup is relying on unnormalized behavior.
- //ask.capability.memory = MINIMUM_MEMORY *
memory = MINIMUM_MEMORY *
((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
+ ask.setCapability(Resources.createResource(memory));
}
private synchronized SchedulerApp getApplication(
@@ -266,6 +271,10 @@ public class FifoScheduler implements Re
return applications.get(applicationAttemptId);
}
+ private synchronized SchedulerNode getNode(NodeId nodeId) {
+ return nodes.get(nodeId);
+ }
+
private synchronized void addApplication(ApplicationAttemptId appAttemptId,
String queueName, String user) {
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
@@ -297,7 +306,7 @@ public class FifoScheduler implements Re
// Remove the application
applications.remove(applicationAttemptId);
}
-
+
/**
* Heart of the scheduler...
*
@@ -473,15 +482,18 @@ public class FifoScheduler implements Re
Math.min(assignableContainers, availableContainers);
if (assignedContainers > 0) {
- List<Container> containers =
- new ArrayList<Container>(assignedContainers);
for (int i=0; i < assignedContainers; ++i) {
+ // Create the container
Container container =
BuilderUtils.newContainer(recordFactory,
application.getApplicationAttemptId(),
application.getNewContainerId(),
node.getRMNode().getNodeID(), node.getRMNode().getNodeAddress(),
node.getRMNode().getHttpAddress(), capability);
+ RMContainer rmContainer =
+ new RMContainerImpl(container,
+ application.getApplicationAttemptId(), node.getNodeID(),
+ null, this.rmContext.getContainerAllocationExpirer());
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
@@ -499,100 +511,84 @@ public class FifoScheduler implements Re
containerToken.setService(container.getNodeId().toString());
container.setContainerToken(containerToken);
}
- containers.add(container);
+
+ // Allocate!
+ application.allocate(type, node, priority, request,
+ Collections.singletonList(rmContainer));
+ node.allocateContainer(application.getApplicationId(),
+ container);
}
- application.allocate(type, node, priority, request, containers);
- addAllocatedContainers(node, application.getApplicationAttemptId(),
- containers);
+
+ // Update total usage
Resources.addTo(usedResource,
Resources.multiply(capability, assignedContainers));
}
+
return assignedContainers;
}
- private synchronized void killContainers(List<Container> containers) {
- applicationCompletedContainers(containers);
- }
-
- private synchronized void applicationCompletedContainers(List<Container> containers)
{
- for (Container c : containers) {
- applicationCompletedContainer(c);
- }
- }
-
- private synchronized void applicationCompletedContainer(Container c) {
- SchedulerApp app = applications.get(c.getId().getAppAttemptId());
- /** this is possible, since an application can be removed from scheduler but
- * the nodemanger is just updating about a completed container.
- */
- if (app != null) {
- app.completedContainer(c, c.getResource());
- }
- }
-
- private List<Container> getCompletedContainers(Map<String, List<Container>>
allContainers) {
- if (allContainers == null) {
- return new ArrayList<Container>();
- }
- List<Container> completedContainers = new ArrayList<Container>();
- // Iterate through the running containers and update their status
- for (Map.Entry<String, List<Container>> e :
- allContainers.entrySet()) {
- for (Container c: e.getValue()) {
- if (c.getState() == ContainerState.COMPLETE) {
- completedContainers.add(c);
+ private synchronized void nodeUpdate(RMNode rmNode,
+ Map<ApplicationId, List<Container>> containers) {
+ SchedulerNode node = getNode(rmNode.getNodeID());
+
+ // Process completed containers
+ for (List<Container> appContainers : containers.values()) {
+ for (Container container : appContainers) {
+ if (container.getContainerStatus().getState() == ContainerState.RUNNING
+ || container.getContainerStatus().getState() == ContainerState.INITIALIZING)
{
+ launchContainer(container, node);
+ } else { // has to COMPLETE
+ completedContainer(container, RMContainerEventType.FINISHED);
}
}
}
- return completedContainers;
- }
-
- private synchronized void nodeUpdate(RMNode rmNode,
- Map<String, List<Container>> containers) {
- SchedulerNode node = this.nodes.get(rmNode.getNodeID());
- node.statusUpdate(containers);
-
- applicationCompletedContainers(getCompletedContainers(containers));
- LOG.info("Node heartbeat " + rmNode.getNodeID() + " resource = " + node.getAvailableResource());
+ LOG.info("Node heartbeat " + rmNode.getNodeID() +
+ " available resource = " + node.getAvailableResource());
+
if (Resources.greaterThanOrEqual(node.getAvailableResource(),
minimumAllocation)) {
assignContainers(node);
}
+
metrics.setAvailableResourcesToQueue(
Resources.subtract(clusterResource, usedResource));
LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
-
- // TODO: Add the list of containers to be preempted when we support
}
@Override
public synchronized void handle(SchedulerEvent event) {
switch(event.getType()) {
case NODE_ADDED:
+ {
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
- break;
+ }
+ break;
case NODE_REMOVED:
+ {
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
removeNode(nodeRemovedEvent.getRemovedRMNode());
- break;
+ }
+ break;
case NODE_UPDATE:
- NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
- Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
- Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
- for (Map.Entry<ApplicationId, List<Container>> entry : contAppMapping.entrySet())
{
- conts.put(entry.getKey().toString(), entry.getValue());
- }
- nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
- break;
+ {
+ NodeUpdateSchedulerEvent nodeUpdatedEvent =
+ (NodeUpdateSchedulerEvent)event;
+ nodeUpdate(nodeUpdatedEvent.getRMNode(),
+ nodeUpdatedEvent.getContainers());
+ }
+ break;
case APP_ADDED:
+ {
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
.getQueue(), appAddedEvent.getUser());
- break;
+ }
+ break;
case APP_REMOVED:
+ {
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
try {
doneApplication(appRemovedEvent.getApplicationAttemptID(),
@@ -601,30 +597,82 @@ public class FifoScheduler implements Re
LOG.error("Unable to remove application "
+ appRemovedEvent.getApplicationAttemptID(), ie);
}
- break;
- case CONTAINER_FINISHED:
- ContainerFinishedSchedulerEvent containerFinishedEvent = (ContainerFinishedSchedulerEvent)
event;
- Container container = containerFinishedEvent.getContainer();
- applicationCompletedContainer(container);
- this.rmContext.getRMContainers().remove(container.getId());
- releaseContainer(container.getId().getAppId(), container);
- break;
+ }
+ break;
+ case CONTAINER_EXPIRED:
+ {
+ ContainerExpiredSchedulerEvent containerExpiredEvent =
+ (ContainerExpiredSchedulerEvent) event;
+ completedContainer(containerExpiredEvent.getContainer(),
+ RMContainerEventType.EXPIRE);
+ }
+ break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
+ private void launchContainer(Container container, SchedulerNode node) {
+ // Get the application for the finished container
+ ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+ SchedulerApp application = getApplication(applicationAttemptId);
+ if (application == null) {
+ LOG.info("Unknown application: " + applicationAttemptId +
+ " launched container " + container.getId() +
+ " on node: " + node);
+ return;
+ }
+
+ application.launchContainer(container.getId());
+ }
+
+ @Lock(FifoScheduler.class)
+ private synchronized void completedContainer(Container container, RMContainerEventType
event) {
+ // Get the application for the finished container
+ ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+ SchedulerApp application = getApplication(applicationAttemptId);
+
+ // Get the node on which the container was allocated
+ SchedulerNode node = getNode(container.getNodeId());
+
+ if (application == null) {
+ LOG.info("Unknown application: " + applicationAttemptId +
+ " released container " + container.getId() +
+ " on node: " + node +
+ " with event: " + event);
+ return;
+ }
+
+ // Inform the application
+ application.completedContainer(container, event);
+
+ // Inform the node
+ node.releaseContainer(container);
+ LOG.info("Application " + applicationAttemptId +
+ " released container " + container.getId() +
+ " on node: " + node +
+ " with event: " + event);
+
+ }
+
private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) {
+ SchedulerNode node = getNode(nodeInfo.getNodeID());
+ // Kill running containers
+ for(Container container : node.getRunningContainers()) {
+ completedContainer(container, RMContainerEventType.KILL);
+ }
+
+ //Remove the node
this.nodes.remove(nodeInfo.getNodeID());
+
+ // Update cluster metrics
Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
- killContainers(nodeInfo.getRunningContainers());
}
-
@Override
public QueueInfo getQueueInfo(String queueName,
boolean includeChildQueues, boolean recursive) {
@@ -641,35 +689,6 @@ public class FifoScheduler implements Re
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
}
- private synchronized void releaseContainer(ApplicationId applicationId,
- Container container) {
- // Reap containers
- LOG.info("Application " + applicationId + " released container " +
- container.getId());
- nodes.get(container.getNodeId()).releaseContainer(container);
- }
-
- private synchronized void addAllocatedContainers(SchedulerNode node,
- ApplicationAttemptId appAttemptId, List<Container> containers) {
- node.allocateContainer(appAttemptId.getApplicationId(), containers);
- for (Container container : containers) {
- // Create the container and 'start' it.
- ContainerId containerId = container.getId();
- RMContainer rmContainer = new RMContainerImpl(containerId,
- appAttemptId, node.getNodeID(), container, this.rmContext
- .getDispatcher().getEventHandler(), this.rmContext
- .getContainerAllocationExpirer());
- if (this.rmContext.getRMContainers().putIfAbsent(containerId,
- rmContainer) != null) {
- LOG.error("Duplicate container addition! ContainerID : "
- + containerId);
- } else {
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMContainerEvent(containerId, RMContainerEventType.START));
- }
- }
- }
-
@Override
public void recover(RMState state) {
// for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet())
{
@@ -679,4 +698,11 @@ public class FifoScheduler implements Re
// app.allocate(appInfo.getContainers());
// }
}
+
+ @Override
+ public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
+ SchedulerNode node = getNode(nodeId);
+ return new SchedulerNodeReport(
+ node.getUsedResource(), node.getNumContainers());
+ }
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1153445&r1=1153444&r2=1153445&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
Wed Aug 3 11:46:46 2011
@@ -71,7 +71,8 @@ class NodesPage extends RmView {
td(health.getIsNodeHealthy() ? "Healthy" : "Unhealthy").
td(Times.format(health.getLastHealthReportTime())).
td(String.valueOf(health.getHealthReport())).
- td(String.valueOf(ni.getNumContainers())).
+ // TODO: acm: refactor2 FIXME
+ //td(String.valueOf(ni.getNumContainers())).
// TODO: FIXME Vinodkv
// td(String.valueOf(ni.getUsedResource().getMemory())).
// td(String.valueOf(ni.getAvailableResource().getMemory())).
|