Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug 3 11:31:34 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -46,13 +47,18 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -79,7 +85,7 @@ public class LeafQueue implements Queue
private float usedCapacity = 0.0f;
private volatile int numContainers;
- Set<Application> applications;
+ Set<CSApp> applications;
public final Resource minimumAllocation;
@@ -90,8 +96,6 @@ public class LeafQueue implements Queue
private final QueueMetrics metrics;
private QueueInfo queueInfo;
- private Map<ApplicationId, org.apache.hadoop.yarn.api.records.ApplicationReport>
- applicationInfos;
private QueueState state;
@@ -105,7 +109,7 @@ public class LeafQueue implements Queue
public LeafQueue(CapacitySchedulerContext cs,
String queueName, Queue parent,
- Comparator<Application> applicationComparator, Queue old) {
+ Comparator<CSApp> applicationComparator, Queue old) {
this.scheduler = cs;
this.queueName = queueName;
this.parent = parent;
@@ -139,10 +143,6 @@ public class LeafQueue implements Queue
this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
- this.applicationInfos =
- new HashMap<ApplicationId,
- org.apache.hadoop.yarn.api.records.ApplicationReport>();
-
QueueState state = cs.getConfiguration().getState(getQueuePath());
Map<QueueACL, AccessControlList> acls =
@@ -158,7 +158,7 @@ public class LeafQueue implements Queue
" name=" + queueName +
", fullname=" + getQueuePath());
- this.applications = new TreeSet<Application>(applicationComparator);
+ this.applications = new TreeSet<CSApp>(applicationComparator);
}
private synchronized void setupQueueConfigs(
@@ -256,11 +256,6 @@ public class LeafQueue implements Queue
}
@Override
- public synchronized List<Application> getApplications() {
- return new ArrayList<Application>(applications);
- }
-
- @Override
public List<Queue> getChildQueues() {
return null;
}
@@ -292,19 +287,9 @@ public class LeafQueue implements Queue
}
@Override
- public synchronized QueueInfo getQueueInfo(boolean includeApplications,
+ public synchronized QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
queueInfo.setCurrentCapacity(usedCapacity);
-
- if (includeApplications) {
- queueInfo.setApplications(
- new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>(
- applicationInfos.values()));
- } else {
- queueInfo.setApplications(
- new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>());
- }
-
return queueInfo;
}
@@ -377,9 +362,8 @@ public class LeafQueue implements Queue
}
@Override
- public void submitApplication(Application application, String userName,
- String queue, Priority priority)
- throws AccessControlException {
+ public void submitApplication(CSApp application, String userName,
+ String queue) throws AccessControlException {
// Careful! Locking order is important!
// Check queue ACLs
@@ -430,7 +414,7 @@ public class LeafQueue implements Queue
// Inform the parent queue
try {
- parent.submitApplication(application, userName, queue, priority);
+ parent.submitApplication(application, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
@@ -439,12 +423,10 @@ public class LeafQueue implements Queue
}
}
- private synchronized void addApplication(Application application, User user) {
+ private synchronized void addApplication(CSApp application, User user) {
// Accept
user.submitApplication();
applications.add(application);
- applicationInfos.put(application.getApplicationId(),
- application.getApplicationInfo());
LOG.info("Application added -" +
" appId: " + application.getApplicationId() +
@@ -454,8 +436,7 @@ public class LeafQueue implements Queue
}
@Override
- public void finishApplication(Application application, String queue)
- throws AccessControlException {
+ public void finishApplication(CSApp application, String queue) {
// Careful! Locking order is important!
synchronized (this) {
removeApplication(application, getUser(application.getUser()));
@@ -465,7 +446,7 @@ public class LeafQueue implements Queue
parent.finishApplication(application, queue);
}
- public synchronized void removeApplication(Application application, User user) {
+ public synchronized void removeApplication(CSApp application, User user) {
applications.remove(application);
user.finishApplication();
@@ -473,8 +454,6 @@ public class LeafQueue implements Queue
users.remove(application.getUser());
}
- applicationInfos.remove(application.getApplicationId());
-
LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
" user: " + application.getUser() +
@@ -485,27 +464,21 @@ public class LeafQueue implements Queue
@Override
public synchronized Resource
- assignContainers(Resource clusterResource, NodeInfo node) {
+ assignContainers(Resource clusterResource, CSNode node) {
LOG.info("DEBUG --- assignContainers:" +
" node=" + node.getNodeAddress() +
" #applications=" + applications.size());
// Check for reserved resources
- Application reservedApplication = node.getReservedApplication();
+ CSApp reservedApplication = node.getReservedApplication();
if (reservedApplication != null) {
return assignReservedContainers(reservedApplication, node,
clusterResource);
}
// Try to assign containers to applications in fifo order
- for (Application application : applications) {
-
- if (!application.isSchedulable()) {
- LOG.info("Application " + application.getApplicationId() +
- " not schedulable. State = " + application.getState());
- continue;
- }
+ for (CSApp application : applications) {
LOG.info("DEBUG --- pre-assignContainers for application "
+ application.getApplicationId());
@@ -525,7 +498,7 @@ public class LeafQueue implements Queue
// Are we going over limits by allocating to this application?
ResourceRequest required =
- application.getResourceRequest(priority, NodeManager.ANY);
+ application.getResourceRequest(priority, RMNode.ANY);
// Maximum Capacity of the queue
@@ -552,7 +525,7 @@ public class LeafQueue implements Queue
// Did we schedule or reserve a container?
if (Resources.greaterThan(assigned, Resources.none())) {
Resource assignedResource =
- application.getResourceRequest(priority, NodeManager.ANY).getCapability();
+ application.getResourceRequest(priority, RMNode.ANY).getCapability();
// Book-keeping
allocateResource(clusterResource,
@@ -579,8 +552,8 @@ public class LeafQueue implements Queue
}
- private synchronized Resource assignReservedContainers(Application application,
- NodeInfo node, Resource clusterResource) {
+ private synchronized Resource assignReservedContainers(CSApp application,
+ CSNode node, Resource clusterResource) {
synchronized (application) {
for (Priority priority : application.getPriorities()) {
@@ -589,7 +562,7 @@ public class LeafQueue implements Queue
// Do we really need this reservation still?
ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, NodeManager.ANY);
+ application.getResourceRequest(priority, RMNode.ANY);
if (offSwitchRequest.getNumContainers() == 0) {
// Release
unreserve(application, priority, node);
@@ -626,12 +599,12 @@ public class LeafQueue implements Queue
return true;
}
- private void setUserResourceLimit(Application application, Resource resourceLimit) {
+ private void setUserResourceLimit(CSApp application, Resource resourceLimit) {
application.setAvailableResourceLimit(resourceLimit);
metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit);
}
- private Resource computeUserLimit(Application application,
+ private Resource computeUserLimit(CSApp application,
Resource clusterResource, Resource required) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
@@ -715,17 +688,17 @@ public class LeafQueue implements Queue
return (a + (b - 1)) / b;
}
- boolean needContainers(Application application, Priority priority) {
+ boolean needContainers(CSApp application, Priority priority) {
ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, NodeManager.ANY);
+ application.getResourceRequest(priority, RMNode.ANY);
int requiredContainers = offSwitchRequest.getNumContainers();
int reservedContainers = application.getReservedContainers(priority);
return ((requiredContainers - reservedContainers) > 0);
}
- Resource assignContainersOnNode(Resource clusterResource, NodeInfo node,
- Application application, Priority priority, boolean reserved) {
+ Resource assignContainersOnNode(Resource clusterResource, CSNode node,
+ CSApp application, Priority priority, boolean reserved) {
Resource assigned = Resources.none();
@@ -746,10 +719,10 @@ public class LeafQueue implements Queue
priority, reserved);
}
- Resource assignNodeLocalContainers(Resource clusterResource, NodeInfo node,
- Application application, Priority priority) {
- ResourceRequest request =
- application.getResourceRequest(priority, node.getNodeAddress());
+ Resource assignNodeLocalContainers(Resource clusterResource, CSNode node,
+ CSApp application, Priority priority) {
+ ResourceRequest request = application.getResourceRequest(priority, node
+ .getNodeAddress());
if (request != null) {
if (canAssign(application, priority, node, NodeType.DATA_LOCAL, false)) {
return assignContainer(clusterResource, node, application, priority, request,
@@ -760,8 +733,8 @@ public class LeafQueue implements Queue
return Resources.none();
}
- Resource assignRackLocalContainers(Resource clusterResource, NodeInfo node,
- Application application, Priority priority) {
+ Resource assignRackLocalContainers(Resource clusterResource, CSNode node,
+ CSApp application, Priority priority) {
ResourceRequest request =
application.getResourceRequest(priority, node.getRackName());
if (request != null) {
@@ -773,10 +746,10 @@ public class LeafQueue implements Queue
return Resources.none();
}
- Resource assignOffSwitchContainers(Resource clusterResource, NodeInfo node,
- Application application, Priority priority, boolean reserved) {
+ Resource assignOffSwitchContainers(Resource clusterResource, CSNode node,
+ CSApp application, Priority priority, boolean reserved) {
ResourceRequest request =
- application.getResourceRequest(priority, NodeManager.ANY);
+ application.getResourceRequest(priority, RMNode.ANY);
if (request != null) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reserved)) {
return assignContainer(clusterResource, node, application, priority, request,
@@ -787,11 +760,11 @@ public class LeafQueue implements Queue
return Resources.none();
}
- boolean canAssign(Application application, Priority priority,
- NodeInfo node, NodeType type, boolean reserved) {
+ boolean canAssign(CSApp application, Priority priority,
+ CSNode node, NodeType type, boolean reserved) {
ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, NodeManager.ANY);
+ application.getResourceRequest(priority, RMNode.ANY);
if (offSwitchRequest.getNumContainers() == 0) {
return false;
@@ -857,8 +830,8 @@ public class LeafQueue implements Queue
return false;
}
- private Resource assignContainer(Resource clusterResource, NodeInfo node,
- Application application,
+ private Resource assignContainer(Resource clusterResource, CSNode node,
+ CSApp application,
Priority priority, ResourceRequest request, NodeType type) {
LOG.info("DEBUG --- assignContainers:" +
" node=" + node.getNodeAddress() +
@@ -883,10 +856,11 @@ public class LeafQueue implements Queue
List<Container> containers =
new ArrayList<Container>();
Container container =
- BuilderUtils.newContainer(this.recordFactory,
- application.getApplicationId(),
- application.getNewContainerId(), node.getNodeAddress(),
- node.getHttpAddress(), capability);
+ BuilderUtils.newContainer(this.recordFactory,
+ application.getApplicationId(),
+ application.getNewContainerId(),
+ node.getNodeID(), node.getNodeAddress(),
+ node.getHttpAddress(), capability);
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
@@ -941,24 +915,42 @@ public class LeafQueue implements Queue
return Resources.none();
}
- private void allocate(Application application, NodeType type,
+ private void allocate(CSApp application, NodeType type,
Priority priority, ResourceRequest request,
- NodeInfo node, List<Container> containers) {
+ CSNode node, List<Container> containers) {
// Allocate container to the application
application.allocate(type, node, priority, request, containers);
+ for (Container container : containers) {
+ // Create the container and 'start' it.
+ ContainerId containerId = container.getId();
+ RMContext rmContext = this.scheduler.getRMContext();
+ EventHandler eventHandler = rmContext.getDispatcher().getEventHandler();
+ RMContainer rmContainer = new RMContainerImpl(containerId, application
+ .getApplicationAttemptId(), node.getNodeID(), container,
+ eventHandler, rmContext.getContainerAllocationExpirer());
+ if (rmContext.getRMContainers().putIfAbsent(containerId, rmContainer) != null) {
+ LOG.error("Duplicate container addition! ContainerID : "
+ + containerId);
+ } else {
+ eventHandler.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.START));
+ }
+ }
+
// Inform the NodeManager about the allocation
- node.allocateContainer(application.getApplicationId(), containers);
+ node.allocateContainer(application.getApplicationId(),
+ containers);
}
- private void reserve(Application application, Priority priority,
- NodeInfo node, Resource resource) {
+ private void reserve(CSApp application, Priority priority,
+ CSNode node, Resource resource) {
application.reserveResource(node, priority, resource);
node.reserveResource(application, priority, resource);
}
- private void unreserve(Application application, Priority priority,
- NodeInfo node) {
+ private void unreserve(CSApp application, Priority priority,
+ CSNode node) {
// Done with the reservation?
if (application.isReserved(node, priority)) {
application.unreserveResource(node, priority);
@@ -969,7 +961,7 @@ public class LeafQueue implements Queue
@Override
public void completedContainer(Resource clusterResource,
- Container container, Resource containerResource, Application application) {
+ Container container, Resource containerResource, CSApp application) {
if (application != null) {
// Careful! Locking order is important!
synchronized (this) {
@@ -1066,7 +1058,7 @@ public class LeafQueue implements Queue
@Override
public void recoverContainer(Resource clusterResource,
- Application application, Container container) {
+ CSApp application, Container container) {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, application.getUser(), container.getResource());
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Aug 3 11:31:34 2011
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
@@ -47,8 +46,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@Private
@@ -86,8 +83,6 @@ public class ParentQueue implements Queu
private final QueueMetrics metrics;
private QueueInfo queueInfo;
- private Map<ApplicationId, org.apache.hadoop.yarn.api.records.ApplicationReport>
- applicationInfos;
private Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
@@ -144,11 +139,6 @@ public class ParentQueue implements Queu
this.queueComparator = comparator;
this.childQueues = new TreeSet<Queue>(comparator);
- this.applicationInfos =
- new HashMap<ApplicationId,
- org.apache.hadoop.yarn.api.records.ApplicationReport>();
-
-
LOG.info("Initialized parent-queue " + queueName +
" name=" + queueName +
", fullname=" + getQueuePath());
@@ -258,11 +248,6 @@ public class ParentQueue implements Queu
}
@Override
- public List<Application> getApplications() {
- return null;
- }
-
- @Override
public synchronized List<Queue> getChildQueues() {
return new ArrayList<Queue>(childQueues);
}
@@ -286,25 +271,16 @@ public class ParentQueue implements Queu
}
@Override
- public synchronized QueueInfo getQueueInfo(boolean includeApplications,
+ public synchronized QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
queueInfo.setCurrentCapacity(usedCapacity);
- if (includeApplications) {
- queueInfo.setApplications(
- new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>(
- applicationInfos.values()));
- } else {
- queueInfo.setApplications(
- new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>());
- }
-
List<QueueInfo> childQueuesInfo = new ArrayList<QueueInfo>();
if (includeChildQueues) {
for (Queue child : childQueues) {
// Get queue information recursively?
childQueuesInfo.add(
- child.getQueueInfo(includeApplications, recursive, recursive));
+ child.getQueueInfo(recursive, recursive));
}
}
queueInfo.setChildQueues(childQueuesInfo);
@@ -420,9 +396,8 @@ public class ParentQueue implements Queu
}
@Override
- public void submitApplication(Application application, String user,
- String queue, Priority priority)
- throws AccessControlException {
+ public void submitApplication(CSApp application, String user,
+ String queue) throws AccessControlException {
synchronized (this) {
// Sanity check
@@ -443,7 +418,7 @@ public class ParentQueue implements Queu
// Inform the parent queue
if (parent != null) {
try {
- parent.submitApplication(application, user, queue, priority);
+ parent.submitApplication(application, user, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
@@ -453,14 +428,11 @@ public class ParentQueue implements Queu
}
}
- private synchronized void addApplication(Application application,
+ private synchronized void addApplication(CSApp application,
String user) {
++numApplications;
- applicationInfos.put(application.getApplicationId(),
- application.getApplicationInfo());
-
LOG.info("Application added -" +
" appId: " + application.getApplicationId() +
" user: " + user +
@@ -469,16 +441,9 @@ public class ParentQueue implements Queu
}
@Override
- public void finishApplication(Application application, String queue)
- throws AccessControlException {
+ public void finishApplication(CSApp application, String queue) {
synchronized (this) {
- // Sanity check
- if (queue.equals(queueName)) {
- throw new AccessControlException("Cannot finish application " +
- "from non-leaf queue: " + queueName);
- }
-
removeApplication(application, application.getUser());
}
@@ -488,11 +453,10 @@ public class ParentQueue implements Queu
}
}
- public synchronized void removeApplication(Application application,
+ public synchronized void removeApplication(CSApp application,
String user) {
--numApplications;
- applicationInfos.remove(application.getApplicationId());
LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
@@ -511,7 +475,7 @@ public class ParentQueue implements Queu
@Override
public synchronized Resource assignContainers(
- Resource clusterResource, NodeInfo node) {
+ Resource clusterResource, CSNode node) {
Resource assigned = Resources.createResource(0);
while (canAssign(node)) {
@@ -575,14 +539,14 @@ public class ParentQueue implements Queu
}
- private boolean canAssign(NodeInfo node) {
+ private boolean canAssign(CSNode node) {
return (node.getReservedApplication() == null) &&
Resources.greaterThanOrEqual(node.getAvailableResource(),
minimumAllocation);
}
synchronized Resource assignContainersToChildQueues(Resource cluster,
- NodeInfo node) {
+ CSNode node) {
Resource assigned = Resources.createResource(0);
printChildQueues();
@@ -625,7 +589,7 @@ public class ParentQueue implements Queu
@Override
public void completedContainer(Resource clusterResource,
Container container, Resource containerResource,
- Application application) {
+ CSApp application) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
@@ -682,7 +646,7 @@ public class ParentQueue implements Queu
@Override
public void recoverContainer(Resource clusterResource,
- Application application, Container container) {
+ CSApp application, Container container) {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, container.getResource());
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Wed Aug 3 11:31:34 2011
@@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
/**
* Queue represents a node in the tree of
@@ -124,12 +124,6 @@ extends org.apache.hadoop.yarn.server.re
public List<Queue> getChildQueues();
/**
- * Get applications in this queue
- * @return applications in the queue
- */
- public List<Application> getApplications();
-
- /**
* Check if the <code>user</code> has permission to perform the operation
* @param acl ACL
* @param user user
@@ -143,10 +137,9 @@ extends org.apache.hadoop.yarn.server.re
* @param application application being submitted
* @param user user who submitted the application
* @param queue queue to which the application is submitted
- * @param priority application priority
*/
- public void submitApplication(Application application, String user,
- String queue, Priority priority)
+ public void submitApplication(CSApp application, String user,
+ String queue)
throws AccessControlException;
/**
@@ -154,8 +147,7 @@ extends org.apache.hadoop.yarn.server.re
* @param application
* @param queue application queue
*/
- public void finishApplication(Application application, String queue)
- throws AccessControlException;
+ public void finishApplication(CSApp application, String queue);
/**
* Assign containers to applications in the queue or it's children (if any).
@@ -163,7 +155,7 @@ extends org.apache.hadoop.yarn.server.re
* @param node node on which resources are available
* @return
*/
- public Resource assignContainers(Resource clusterResource, NodeInfo node);
+ public Resource assignContainers(Resource clusterResource, CSNode node);
/**
* A container assigned to the queue has completed.
@@ -175,7 +167,7 @@ extends org.apache.hadoop.yarn.server.re
*/
public void completedContainer(Resource clusterResource,
Container container, Resource containerResource,
- Application application);
+ CSApp application);
/**
* Get the number of applications in the queue.
@@ -204,6 +196,6 @@ extends org.apache.hadoop.yarn.server.re
* @param application the application for which the container was allocated
* @param container the container that was recovered.
*/
- public void recoverContainer(Resource clusterResource, Application application,
+ public void recoverContainer(Resource clusterResource, CSApp application,
Container container);
}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,31 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+
+public class AppAddedSchedulerEvent extends SchedulerEvent {
+
+ private final ApplicationAttemptId applicationAttemptId;
+ private final String queue;
+ private final String user;
+
+ public AppAddedSchedulerEvent(ApplicationAttemptId applicationAttemptId,
+ String queue, String user) {
+ super(SchedulerEventType.APP_ADDED);
+ this.applicationAttemptId = applicationAttemptId;
+ this.queue = queue;
+ this.user = user;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class AppRemovedSchedulerEvent extends SchedulerEvent {
+
+ private final ApplicationAttemptId applicationAttemptId;
+
+ public AppRemovedSchedulerEvent(ApplicationAttemptId applicationAttemptId) {
+ super(SchedulerEventType.APP_REMOVED);
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptID() {
+ return this.applicationAttemptId;
+ }
+
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+public class ContainerFinishedSchedulerEvent extends SchedulerEvent {
+
+ private final Container container;
+
+ public ContainerFinishedSchedulerEvent(Container container) {
+ super(SchedulerEventType.CONTAINER_FINISHED);
+ this.container = container;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class NodeAddedSchedulerEvent extends SchedulerEvent {
+
+ private final RMNode rmNode;
+
+ public NodeAddedSchedulerEvent(RMNode rmNode) {
+ super(SchedulerEventType.NODE_ADDED);
+ this.rmNode = rmNode;
+ }
+
+ public RMNode getAddedRMNode() {
+ return rmNode;
+ }
+
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeRemovedSchedulerEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class NodeRemovedSchedulerEvent extends SchedulerEvent {
+
+ private final RMNode rmNode;
+
+ public NodeRemovedSchedulerEvent(RMNode rmNode) {
+ super(SchedulerEventType.NODE_REMOVED);
+ this.rmNode = rmNode;
+ }
+
+ public RMNode getRemovedRMNode() {
+ return rmNode;
+ }
+
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,30 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class NodeUpdateSchedulerEvent extends SchedulerEvent {
+
+ private final RMNode rmNode;
+ private final Map<ApplicationId, List<Container>> containers;
+
+ public NodeUpdateSchedulerEvent(RMNode rmNode,
+ Map<ApplicationId, List<Container>> containers) {
+ super(SchedulerEventType.NODE_UPDATE);
+ this.rmNode = rmNode;
+ this.containers = containers;
+ }
+
+ public RMNode getRMNode() {
+ return rmNode;
+ }
+
+ public Map<ApplicationId, List<Container>> getContainers() {
+ return containers;
+ }
+
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,9 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SchedulerEvent extends AbstractEvent<SchedulerEventType> {
+ public SchedulerEvent(SchedulerEventType type) {
+ super(type);
+ }
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,17 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+public enum SchedulerEventType {
+
+ // Source: Node
+ NODE_ADDED,
+ NODE_REMOVED,
+ NODE_UPDATE,
+
+ // Source: Container
+ CONTAINER_FINISHED,
+
+ // Source: App
+ APP_ADDED,
+ APP_REMOVED,
+
+}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug 3 11:31:34 2011
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,11 +37,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -51,22 +54,31 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationTrackerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerFinishedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -84,7 +96,9 @@ public class FifoScheduler implements Re
private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
- private ClusterTracker clusterTracker;
+ private RMContext rmContext;
+
+ private Map<NodeId, SchedulerNode> nodes = new ConcurrentHashMap<NodeId, SchedulerNode>();
private static final int MINIMUM_MEMORY = 1024;
@@ -104,9 +118,8 @@ public class FifoScheduler implements Re
private Resource minimumAllocation;
private Resource maximumAllocation;
- Map<ApplicationId, Application> applications =
- new TreeMap<ApplicationId, Application>(
- new BuilderUtils.ApplicationIdComparator());
+ private Map<ApplicationAttemptId, SchedulerApp> applications
+ = new HashMap<ApplicationAttemptId, SchedulerApp>();
private static final String DEFAULT_QUEUE_NAME = "default";
private final QueueMetrics metrics =
@@ -124,20 +137,13 @@ public class FifoScheduler implements Re
}
@Override
- public QueueInfo getQueueInfo(boolean includeApplications,
+ public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
queueInfo.setCapacity(100.0f);
queueInfo.setMaximumCapacity(100.0f);
queueInfo.setChildQueues(new ArrayList<QueueInfo>());
-
- if (includeApplications) {
- queueInfo.setApplications(getApplications());
- } else {
- queueInfo.setApplications(
- new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>());
- }
return queueInfo;
}
@@ -161,16 +167,6 @@ public class FifoScheduler implements Re
return Collections.singletonList(queueUserAclInfo);
}
};
-
- public FifoScheduler() {
- }
-
- public FifoScheduler(ClusterTracker clusterTracker) {
- this.clusterTracker = clusterTracker;
- if (clusterTracker != null) {
- this.clusterTracker.addListener(this);
- }
- }
@Override
public Resource getMinimumResourceCapability() {
@@ -185,14 +181,13 @@ public class FifoScheduler implements Re
@Override
public synchronized void reinitialize(Configuration conf,
ContainerTokenSecretManager containerTokenSecretManager,
- ClusterTracker clusterTracker)
+ RMContext rmContext)
throws IOException
{
if (!this.initialized) {
this.conf = conf;
this.containerTokenSecretManager = containerTokenSecretManager;
- this.clusterTracker = clusterTracker;
- if (clusterTracker != null) this.clusterTracker.addListener(this);
+ this.rmContext = rmContext;
this.minimumAllocation =
Resources.createResource(conf.getInt(MINIMUM_ALLOCATION, MINIMUM_MEMORY));
this.maximumAllocation =
@@ -204,59 +199,47 @@ public class FifoScheduler implements Re
}
@Override
- public synchronized Allocation allocate(ApplicationId applicationId,
- List<ResourceRequest> ask, List<Container> release)
- throws IOException {
- Application application = getApplication(applicationId);
+ public synchronized void allocate(ApplicationAttemptId applicationAttemptId,
+ List<ResourceRequest> ask) {
+ SchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
- "or non existant application " + applicationId);
- return new Allocation(EMPTY_CONTAINER_LIST, Resources.none());
+ "or non existant application " + applicationAttemptId);
+ return;
}
// Sanity check
normalizeRequests(ask);
- List<Container> allocatedContainers = null;
Resource limit = null;
synchronized (application) {
LOG.debug("allocate: pre-update" +
- " applicationId=" + applicationId +
+ " applicationId=" + applicationAttemptId +
" application=" + application);
application.showRequests();
// Update application requests
application.updateResourceRequests(ask);
- // Release containers
- releaseContainers(application, release);
-
LOG.debug("allocate: post-update" +
- " applicationId=" + applicationId +
+ " applicationId=" + applicationAttemptId +
" application=" + application);
application.showRequests();
- allocatedContainers = application.acquire();
- limit = application.getHeadroom();
LOG.debug("allocate:" +
- " applicationId=" + applicationId +
- " #ask=" + ask.size() +
- " #release=" + release.size() +
- " #allocatedContainers=" + allocatedContainers.size() +
- " limit=" + limit);
+ " applicationId=" + applicationAttemptId +
+ " #ask=" + ask.size());
}
-
- return new Allocation(allocatedContainers, limit);
}
- private void releaseContainers(Application application, List<Container> release) {
- application.releaseContainers(release);
- for (Container container : release) {
- releaseContainer(application.getApplicationId(), container);
- }
+ @Override
+ public Resource getResourceLimit(ApplicationAttemptId applicationAttemptId) {
+ SchedulerApp application = getApplication(applicationAttemptId);
+ // TODO: What if null?
+ return application.getHeadroom();
}
-
+
private void normalizeRequests(List<ResourceRequest> asks) {
for (ResourceRequest ask : asks) {
normalizeRequest(ask);
@@ -271,42 +254,41 @@ public class FifoScheduler implements Re
((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
}
- private synchronized Application getApplication(ApplicationId applicationId) {
- return applications.get(applicationId);
+ private synchronized SchedulerApp getApplication(
+ ApplicationAttemptId applicationAttemptId) {
+ return applications.get(applicationAttemptId);
}
- @Override
- public synchronized void addApplication(ApplicationId applicationId, ApplicationMaster master,
- String user, String unusedQueue, Priority unusedPriority, ApplicationStore appStore)
- throws IOException {
- applications.put(applicationId,
- new Application(applicationId, master, DEFAULT_QUEUE, user, appStore));
+ private synchronized void addApplication(ApplicationAttemptId appAttemptId,
+ String queueName, String user) {
+ AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
+ appAttemptId, null, queueName, user, null);
+ SchedulerApp schedulerApp = new SchedulerApp(appSchedulingInfo,
+ DEFAULT_QUEUE);
+ applications.put(appAttemptId, schedulerApp);
metrics.submitApp(user);
- LOG.info("Application Submission: " + applicationId.getId() + " from " + user +
+ LOG.info("Application Submission: " + appAttemptId.getApplicationId() + " from " + user +
", currently active: " + applications.size());
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(appAttemptId,
+ RMAppAttemptEventType.APP_ACCEPTED));
}
- @Override
- public synchronized void doneApplication(ApplicationId applicationId, boolean finishApplication)
- throws IOException {
- Application application = getApplication(applicationId);
+ private synchronized void doneApplication(
+ ApplicationAttemptId applicationAttemptId, boolean finishApplication)
+ throws IOException {
+ SchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
- throw new IOException("Unknown application " + applicationId +
+ throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
}
- // Release current containers
- releaseContainers(application, application.getCurrentContainers());
-
// Clean up pending requests, metrics etc.
application.stop();
if (finishApplication) {
- // Let the cluster know that the applications are done
- finishedApplication(applicationId,
- application.getAllNodesForApplication());
// Remove the application
- applications.remove(applicationId);
+ applications.remove(applicationAttemptId);
}
}
@@ -315,14 +297,15 @@ public class FifoScheduler implements Re
*
* @param node node on which resources are available to be allocated
*/
- private synchronized void assignContainers(NodeInfo node) {
+ private synchronized void assignContainers(SchedulerNode node) {
LOG.debug("assignContainers:" +
- " node=" + node.getNodeAddress() +
+ " node=" + node.getRMNode().getNodeAddress() +
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationId, Application> e : applications.entrySet()) {
- Application application = e.getValue();
+ for (Map.Entry<ApplicationAttemptId, SchedulerApp> e : applications
+ .entrySet()) {
+ SchedulerApp application = e.getValue();
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@@ -354,10 +337,10 @@ public class FifoScheduler implements Re
}
}
- private int getMaxAllocatableContainers(Application application,
- Priority priority, NodeInfo node, NodeType type) {
+ private int getMaxAllocatableContainers(SchedulerApp application,
+ Priority priority, SchedulerNode node, NodeType type) {
ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, NodeManager.ANY);
+ application.getResourceRequest(priority, SchedulerNode.ANY);
int maxContainers = offSwitchRequest.getNumContainers();
if (type == NodeType.OFF_SWITCH) {
@@ -366,7 +349,7 @@ public class FifoScheduler implements Re
if (type == NodeType.RACK_LOCAL) {
ResourceRequest rackLocalRequest =
- application.getResourceRequest(priority, node.getRackName());
+ application.getResourceRequest(priority, node.getRMNode().getRackName());
if (rackLocalRequest == null) {
return maxContainers;
}
@@ -376,7 +359,7 @@ public class FifoScheduler implements Re
if (type == NodeType.DATA_LOCAL) {
ResourceRequest nodeLocalRequest =
- application.getResourceRequest(priority, node.getNodeAddress());
+ application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
if (nodeLocalRequest != null) {
maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
}
@@ -386,8 +369,8 @@ public class FifoScheduler implements Re
}
- private int assignContainersOnNode(NodeInfo node,
- Application application, Priority priority
+ private int assignContainersOnNode(SchedulerNode node,
+ SchedulerApp application, Priority priority
) {
// Data-local
int nodeLocalContainers =
@@ -403,7 +386,7 @@ public class FifoScheduler implements Re
LOG.debug("assignContainersOnNode:" +
- " node=" + node.getNodeAddress() +
+ " node=" + node.getRMNode().getNodeAddress() +
" application=" + application.getApplicationId().getId() +
" priority=" + priority.getPriority() +
" #assigned=" +
@@ -413,11 +396,11 @@ public class FifoScheduler implements Re
return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
}
- private int assignNodeLocalContainers(NodeInfo node,
- Application application, Priority priority) {
+ private int assignNodeLocalContainers(SchedulerNode node,
+ SchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
- application.getResourceRequest(priority, node.getNodeAddress());
+ application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
if (request != null) {
int assignableContainers =
Math.min(
@@ -431,11 +414,11 @@ public class FifoScheduler implements Re
return assignedContainers;
}
- private int assignRackLocalContainers(NodeInfo node,
- Application application, Priority priority) {
+ private int assignRackLocalContainers(SchedulerNode node,
+ SchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
- application.getResourceRequest(priority, node.getRackName());
+ application.getResourceRequest(priority, node.getRMNode().getRackName());
if (request != null) {
int assignableContainers =
Math.min(
@@ -449,11 +432,11 @@ public class FifoScheduler implements Re
return assignedContainers;
}
- private int assignOffSwitchContainers(NodeInfo node,
- Application application, Priority priority) {
+ private int assignOffSwitchContainers(SchedulerNode node,
+ SchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
- application.getResourceRequest(priority, NodeManager.ANY);
+ application.getResourceRequest(priority, SchedulerNode.ANY);
if (request != null) {
assignedContainers =
assignContainers(node, application, priority,
@@ -462,11 +445,11 @@ public class FifoScheduler implements Re
return assignedContainers;
}
- private int assignContainers(NodeInfo node, Application application,
+ private int assignContainers(SchedulerNode node, SchedulerApp application,
Priority priority, int assignableContainers,
ResourceRequest request, NodeType type) {
LOG.debug("assignContainers:" +
- " node=" + node.getNodeAddress() +
+ " node=" + node.getRMNode().getNodeAddress() +
" application=" + application.getApplicationId().getId() +
" priority=" + priority.getPriority() +
" assignableContainers=" + assignableContainers +
@@ -475,11 +458,11 @@ public class FifoScheduler implements Re
int availableContainers =
node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy
- // application
- // with this
- // zero would
- // crash the
- // scheduler.
+ // application
+ // with this
+ // zero would
+ // crash the
+ // scheduler.
int assignedContainers =
Math.min(assignableContainers, availableContainers);
@@ -490,8 +473,9 @@ public class FifoScheduler implements Re
Container container =
BuilderUtils.newContainer(recordFactory,
application.getApplicationId(),
- application.getNewContainerId(), node.getNodeAddress(),
- node.getHttpAddress(), capability);
+ application.getNewContainerId(),
+ node.getRMNode().getNodeID(), node.getRMNode().getNodeAddress(),
+ node.getRMNode().getHttpAddress(), capability);
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
ContainerToken containerToken =
@@ -510,8 +494,9 @@ public class FifoScheduler implements Re
}
containers.add(container);
}
- application.allocate(type, node, priority, request, containers);
- addAllocatedContainers(node, application.getApplicationId(), containers);
+ application.allocate(containers);
+ addAllocatedContainers(node, application.getApplicationAttemptId(),
+ containers);
Resources.addTo(usedResource,
Resources.multiply(capability, assignedContainers));
}
@@ -519,23 +504,23 @@ public class FifoScheduler implements Re
}
private synchronized void killContainers(List<Container> containers) {
- for (Container container : containers) {
- container.setState(ContainerState.COMPLETE);
- }
applicationCompletedContainers(containers);
}
-
- private synchronized void applicationCompletedContainers(
- List<Container> completedContainers) {
- for (Container c: completedContainers) {
- Application app = applications.get(c.getId().getAppId());
+
+ private synchronized void applicationCompletedContainers(List<Container> containers) {
+ for (Container c : containers) {
+ applicationCompletedContainer(c);
+ }
+ }
+
+ private synchronized void applicationCompletedContainer(Container c) {
+ SchedulerApp app = applications.get(c.getId().getAppId());
/** this is possible, since an application can be removed from scheduler but
* the nodemanger is just updating about a completed container.
*/
if (app != null) {
app.completedContainer(c, c.getResource());
}
- }
}
private List<Container> getCompletedContainers(Map<String, List<Container>> allContainers) {
@@ -555,46 +540,71 @@ public class FifoScheduler implements Re
return completedContainers;
}
- @Override
- public synchronized void nodeUpdate(NodeInfo node,
- Map<String,List<Container>> containers ) {
+ private synchronized void nodeUpdate(RMNode rmNode,
+ Map<String, List<Container>> containers) {
+ SchedulerNode node = this.nodes.get(rmNode.getNodeID());
+ node.statusUpdate(containers);
applicationCompletedContainers(getCompletedContainers(containers));
- LOG.info("Node heartbeat " + node.getNodeID() + " resource = " + node.getAvailableResource());
+
+ LOG.info("Node heartbeat " + rmNode.getNodeID() + " resource = " + node.getAvailableResource());
+ LOG.info("Node heartbeat " + rmNode.getNodeID() + " resource = " + node.getAvailableResource().getMemory());
+ LOG.info("=========Node heartbeat " + rmNode.getNodeID() + " resourcemimi = " + minimumAllocation.getMemory());
if (Resources.greaterThanOrEqual(node.getAvailableResource(),
minimumAllocation)) {
assignContainers(node);
}
metrics.setAvailableResourcesToQueue(
Resources.subtract(clusterResource, usedResource));
- LOG.info("Node after allocation " + node.getNodeID() + " resource = "
+ LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
// TODO: Add the list of containers to be preempted when we support
}
@Override
- public synchronized void handle(ASMEvent<ApplicationTrackerEventType> event) {
+ public synchronized void handle(SchedulerEvent event) {
switch(event.getType()) {
- case ADD:
- /**
- * ignore add since its called syncronously from applications manager.
- */
+ case NODE_ADDED:
+ NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
+ addNode(nodeAddedEvent.getAddedRMNode());
break;
- case REMOVE:
- try {
- doneApplication(event.getApplication().getApplicationID(), true);
- } catch(IOException ie) {
- LOG.error("Unable to remove application " + event.getApplication().getApplicationID(), ie);
+ case NODE_REMOVED:
+ NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
+ removeNode(nodeRemovedEvent.getRemovedRMNode());
+ break;
+ case NODE_UPDATE:
+ NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+ Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
+ Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
+ for (Map.Entry<ApplicationId, List<Container>> entry : contAppMapping.entrySet()) {
+ conts.put(entry.getKey().toString(), entry.getValue());
}
- break;
- case EXPIRE:
+ nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
+ break;
+ case APP_ADDED:
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
+ .getQueue(), appAddedEvent.getUser());
+ break;
+ case APP_REMOVED:
+ AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
try {
- doneApplication(event.getApplication().getApplicationID(), false);
+ doneApplication(appRemovedEvent.getApplicationAttemptID(), true);
} catch(IOException ie) {
- LOG.error("Unable to remove application " + event.getApplication().getApplicationID(), ie);
+ LOG.error("Unable to remove application "
+ + appRemovedEvent.getApplicationAttemptID(), ie);
}
break;
+ case CONTAINER_FINISHED:
+ ContainerFinishedSchedulerEvent containerFinishedEvent = (ContainerFinishedSchedulerEvent) event;
+ Container container = containerFinishedEvent.getContainer();
+ applicationCompletedContainer(container);
+ this.rmContext.getRMContainers().remove(container.getId());
+ releaseContainer(container.getId().getAppId(), container);
+ break;
+ default:
+ LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
@@ -602,21 +612,17 @@ public class FifoScheduler implements Re
private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
- public synchronized Resource getClusterResource() {
- return clusterResource;
- }
-
- @Override
- public synchronized void removeNode(NodeInfo nodeInfo) {
+ private synchronized void removeNode(RMNode nodeInfo) {
+ this.nodes.remove(nodeInfo.getNodeID());
Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
killContainers(nodeInfo.getRunningContainers());
}
@Override
- public QueueInfo getQueueInfo(String queueName, boolean includeApplications,
+ public QueueInfo getQueueInfo(String queueName,
boolean includeChildQueues, boolean recursive) {
- return DEFAULT_QUEUE.getQueueInfo(includeApplications, false, false);
+ return DEFAULT_QUEUE.getQueueInfo(false, false);
}
@Override
@@ -624,37 +630,39 @@ public class FifoScheduler implements Re
return DEFAULT_QUEUE.getQueueUserAclInfo(null);
}
- private synchronized List<org.apache.hadoop.yarn.api.records.ApplicationReport>
- getApplications() {
- List<org.apache.hadoop.yarn.api.records.ApplicationReport> applications =
- new ArrayList<org.apache.hadoop.yarn.api.records.ApplicationReport>();
- for (Application application : this.applications.values()) {
- applications.add(application.getApplicationInfo());
- }
- return applications;
- }
-
- @Override
- public synchronized void addNode(NodeInfo nodeManager) {
+ private synchronized void addNode(RMNode nodeManager) {
+ this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
}
- public synchronized void releaseContainer(ApplicationId applicationId,
+ private synchronized void releaseContainer(ApplicationId applicationId,
Container container) {
// Reap containers
LOG.info("Application " + applicationId + " released container " +
container.getId());
- clusterTracker.releaseContainer(container);
+ // TODO:FIXMEVINODKV
+// node.releaseContainer(container);
}
- public synchronized void addAllocatedContainers(NodeInfo nodeInfo,
- ApplicationId applicationId, List<Container> containers) {
- nodeInfo.allocateContainer(applicationId, containers);
- }
-
- public synchronized void finishedApplication(ApplicationId applicationId,
- List<NodeInfo> nodesToNotify) {
- clusterTracker.finishedApplication(applicationId, nodesToNotify);
+ private synchronized void addAllocatedContainers(SchedulerNode node,
+ ApplicationAttemptId appAttemptId, List<Container> containers) {
+ node.allocateContainer(appAttemptId.getApplicationId(), containers);
+ for (Container container : containers) {
+ // Create the container and 'start' it.
+ ContainerId containerId = container.getId();
+ RMContainer rmContainer = new RMContainerImpl(containerId,
+ appAttemptId, node.getNodeID(), container, this.rmContext
+ .getDispatcher().getEventHandler(), this.rmContext
+ .getContainerAllocationExpirer());
+ if (this.rmContext.getRMContainers().putIfAbsent(containerId,
+ rmContainer) != null) {
+ LOG.error("Duplicate container addition! ContainerID : "
+ + containerId);
+ } else {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMContainerEvent(containerId, RMContainerEventType.START));
+ }
+ }
}
@Override
@@ -662,7 +670,7 @@ public class FifoScheduler implements Re
for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
ApplicationId appId = entry.getKey();
ApplicationInfo appInfo = entry.getValue();
- Application app = applications.get(appId);
+ SchedulerApp app = applications.get(appId);
app.allocate(appInfo.getContainers());
}
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Wed Aug 3 11:31:34 2011
@@ -18,16 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
-import com.google.inject.Inject;
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+import org.apache.hadoop.yarn.webapp.view.JQueryUI.Render;
-import static org.apache.hadoop.yarn.util.StringHelper.*;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+import com.google.inject.Inject;
class AppsBlock extends HtmlBlock {
final AppsList list;
@@ -52,21 +55,21 @@ class AppsBlock extends HtmlBlock {
th(".note", "Note")._()._().
tbody();
int i = 0;
- for (Application app : list.apps.values()) {
- String appId = Apps.toString(app.getApplicationID());
- String trackingUrl = app.getMaster().getTrackingUrl();
+ for (RMApp app : list.apps.values()) {
+ String appId = Apps.toString(app.getApplicationId());
+ String trackingUrl = app.getTrackingUrl();
String ui = trackingUrl == null || trackingUrl.isEmpty() ? "UNASSIGNED" :
(app.getFinishTime() == 0 ? "ApplicationMaster" : "JobHistory");
- String percent = String.format("%.1f", app.getStatus().getProgress() * 100);
+ String percent = String.format("%.1f", app.getProgress() * 100);
tbody.
tr().
td().
- br().$title(String.valueOf(app.getApplicationID().getId()))._(). // for sorting
+ br().$title(String.valueOf(app.getApplicationId().getId()))._(). // for sorting
a(url("app", appId), appId)._().
td(app.getUser().toString()).
td(app.getName().toString()).
td(app.getQueue().toString()).
- td(app.getMaster().getState().toString()).
+ td(app.getState().toString()).
td().
br().$title(percent)._(). // for sorting
div(_PROGRESSBAR).
@@ -75,7 +78,7 @@ class AppsBlock extends HtmlBlock {
$style(join("width:", percent, '%'))._()._()._().
td().
a(trackingUrl == null ? "#" : join("http://", trackingUrl), ui)._().
- td(app.getMaster().getDiagnostics())._();
+ td(app.getDiagnostics().toString())._();
if (list.rendering != Render.HTML && ++i >= 20) break;
}
tbody._()._();
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java Wed Aug 3 11:31:34 2011
@@ -29,8 +29,8 @@ import java.io.PrintWriter;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
import org.apache.hadoop.yarn.webapp.ToJSON;
@@ -43,39 +43,39 @@ import com.google.inject.servlet.Request
@RequestScoped
class AppsList implements ToJSON {
final RequestContext rc;
- final ConcurrentMap<ApplicationId, Application> apps;
+ final ConcurrentMap<ApplicationId, RMApp> apps;
Render rendering;
@Inject AppsList(RequestContext ctx, RMContext rmContext) {
rc = ctx;
- apps = rmContext.getApplications();
+ apps = rmContext.getRMApps();
}
void toDataTableArrays(PrintWriter out) {
out.append('[');
boolean first = true;
- for (Application app : apps.values()) {
+ for (RMApp app : apps.values()) {
if (first) {
first = false;
} else {
out.append(",\n");
}
- String appID = Apps.toString(app.getApplicationID());
- String trackingUrl = app.getMaster().getTrackingUrl();
+ String appID = Apps.toString(app.getApplicationId());
+ String trackingUrl = app.getTrackingUrl();
String ui = trackingUrl == null ? "UNASSIGNED" :
(app.getFinishTime() == 0 ? "ApplicationMaster" : "JobHistory");
out.append("[\"");
- appendSortable(out, app.getApplicationID().getId());
+ appendSortable(out, app.getApplicationId().getId());
appendLink(out, appID, rc.prefix(), "app", appID).append(_SEP).
append(escapeHtml(app.getUser().toString())).append(_SEP).
append(escapeHtml(app.getName().toString())).append(_SEP).
append(escapeHtml(app.getQueue())).append(_SEP).
- append(app.getMaster().getState().toString()).append(_SEP);
- appendProgressBar(out, app.getStatus().getProgress()).append(_SEP);
+ append(app.getState().toString()).append(_SEP);
+ appendProgressBar(out, app.getProgress()).append(_SEP);
appendLink(out, ui, rc.prefix(),
trackingUrl == null ? "#" : "http://", trackingUrl).
append(_SEP).append(escapeJavaScript(escapeHtml(
- app.getMaster().getDiagnostics()))).
+ app.getDiagnostics().toString()))).
append("\"]");
}
out.append(']');
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Wed Aug 3 11:31:34 2011
@@ -18,30 +18,32 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import java.util.Date;
-
-import com.google.inject.Inject;
-
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
+
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+import com.google.inject.Inject;
class NodesPage extends RmView {
static class NodesBlock extends HtmlBlock {
- final ClusterTracker resource;
+ final RMContext rmContext;
@Inject
- NodesBlock(ClusterTracker rc, ViewContext ctx) {
+ NodesBlock(RMContext context, ViewContext ctx) {
super(ctx);
- resource = rc;
+ this.rmContext = context;
}
@Override
@@ -56,10 +58,11 @@ class NodesPage extends RmView {
th(".lastHealthUpdate", "Last health-update").
th(".healthReport", "Health-report").
th(".containers", "Containers").
- th(".mem", "Mem Used (MB)").
- th(".mem", "Mem Avail (MB)")._()._().
+// th(".mem", "Mem Used (MB)").
+// th(".mem", "Mem Avail (MB)").
+ _()._().
tbody();
- for (NodeInfo ni : resource.getAllNodeInfo()) {
+ for (RMNode ni : this.rmContext.getRMNodes().values()) {
NodeHealthStatus health = ni.getNodeHealthStatus();
tbody.tr().
td(ni.getRackName()).
@@ -69,8 +72,9 @@ class NodesPage extends RmView {
td(Times.format(health.getLastHealthReportTime())).
td(String.valueOf(health.getHealthReport())).
td(String.valueOf(ni.getNumContainers())).
- td(String.valueOf(ni.getUsedResource().getMemory())).
- td(String.valueOf(ni.getAvailableResource().getMemory()))._();
+// td(String.valueOf(ni.getUsedResource().getMemory())).
+// td(String.valueOf(ni.getAvailableResource().getMemory())).
+ _();
}
tbody._()._();
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java Wed Aug 3 11:31:34 2011
@@ -18,14 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
-import org.apache.hadoop.yarn.server.resourcemanager.ApplicationsManager;
+import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
+
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
import org.apache.hadoop.yarn.webapp.WebApp;
-import static org.apache.hadoop.yarn.util.StringHelper.*;
-
/**
* The RM webapp
*/
@@ -44,8 +42,6 @@ public class RMWebApp extends WebApp {
if (rm != null) {
bind(ResourceManager.class).toInstance(rm);
bind(RMContext.class).toInstance(rm.getRMContext());
- bind(ApplicationsManager.class).toInstance(rm.getApplicationsManager());
- bind(ClusterTracker.class).toInstance(rm.getResourceTracker());
}
route("/", RmController.class);
route("/nodes", RmController.class, "nodes");
|