Author: acmurthy
Date: Sat Apr 30 07:24:20 2011
New Revision: 1098051
URL: http://svn.apache.org/viewvc?rev=1098051&view=rev
Log:
MAPREDUCE-2434. Metrics for ResourceManager. Contributed by Luke Lu.
Modified:
hadoop/mapreduce/branches/MR-279/.gitignore
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
Modified: hadoop/mapreduce/branches/MR-279/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/.gitignore?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/.gitignore (original)
+++ hadoop/mapreduce/branches/MR-279/.gitignore Sat Apr 30 07:24:20 2011
@@ -46,3 +46,4 @@ src/docs/cn/uming.conf
target
SecurityAuth.audit
conf/yarn-site.xml
+.eclipse.templates/
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Sat Apr 30 07:24:20 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ MAPREDUCE-2434. Metrics for ResourceManager. (Luke Lu via acmurthy)
+
Fix container launch w/ inconsistent credential file naming. (cdouglas)
Disable ContainerMonitoring for non-linux systems. (vinodkv)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Sat Apr 30 07:24:20 2011
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
/**
@@ -75,6 +75,7 @@ public class Application {
List<Container> allocated = new ArrayList<Container>();
Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
ApplicationMaster master;
+ boolean pending = true; // for app metrics
/* Reserved containers */
private final Comparator<NodeManager> nodeComparator =
@@ -107,6 +108,14 @@ public class Application {
return user;
}
+ public ApplicationState getState() {
+ return master.getState();
+ }
+
+ public boolean isPending() {
+ return pending;
+ }
+
public synchronized Map<Priority, Map<String, ResourceRequest>> getRequests() {
return requests;
}
@@ -137,10 +146,8 @@ public class Application {
List<Container> heartbeatContainers = allocated;
allocated = new ArrayList<Container>();
- // Metrics
for (Container container : heartbeatContainers) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- overallConsumption, container.getResource());
+ Resources.addTo(overallConsumption, container.getResource());
}
LOG.debug("acquire:" +
@@ -159,13 +166,22 @@ public class Application {
* application, by asking for more resources and releasing resources
* acquired by the application.
* @param requests resources to be acquired
- * @param release resources being released
*/
synchronized public void updateResourceRequests(List<ResourceRequest> requests) {
+ QueueMetrics metrics = queue.getMetrics();
// Update resource requests
for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
String hostName = request.getHostName();
+ boolean updatePendingResources = false;
+ ResourceRequest lastRequest = null;
+
+ if (hostName.equals(NodeManager.ANY)) {
+ LOG.debug("update:" +
+ " application=" + applicationId +
+ " request=" + request);
+ updatePendingResources = true;
+ }
Map<String, ResourceRequest> asks = this.requests.get(priority);
@@ -173,14 +189,24 @@ public class Application {
asks = new HashMap<String, ResourceRequest>();
this.requests.put(priority, asks);
this.priorities.add(priority);
+ } else if (updatePendingResources) {
+ lastRequest = asks.get(hostName);
}
asks.put(hostName, request);
- if (hostName.equals(NodeManager.ANY)) {
- LOG.debug("update:" +
- " application=" + applicationId +
- " request=" + request);
+ if (updatePendingResources) {
+ int lastRequestContainers = lastRequest != null ?
+ lastRequest.getNumContainers() : 0;
+ Resource lastRequestCapability = lastRequest != null ?
+ lastRequest.getCapability() : Resources.none();
+ metrics.incrPendingResources(user,
+ request.getNumContainers() - lastRequestContainers,
+ Resources.subtractFrom( // save a clone
+ Resources.multiply(request.getCapability(),
+ request.getNumContainers()),
+ Resources.multiply(lastRequestCapability,
+ lastRequestContainers)));
}
}
}
@@ -190,8 +216,7 @@ public class Application {
for (Container container : release) {
LOG.debug("update: " +
"application=" + applicationId + " released=" + container);
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- currentConsumption, container.getResource());
+ Resources.subtractFrom(currentConsumption, container.getResource());
for (Iterator<Container> i=acquired.iterator(); i.hasNext();) {
Container c = i.next();
if (c.getId().equals(container.getId())) {
@@ -220,6 +245,7 @@ public class Application {
synchronized public void completedContainer(Container container) {
LOG.info("Completed container: " + container);
completedContainers.add(container);
+ queue.getMetrics().releaseResources(user, 1, container.getResource());
}
synchronized public void completedContainers(List<Container> containers) {
@@ -245,6 +271,15 @@ public class Application {
} else {
allocateOffSwitch(node, priority, request, containers);
}
+ QueueMetrics metrics = queue.getMetrics();
+ if (pending) {
+ // once an allocation is done we assume the application is
+ // running from scheduler's POV.
+ pending = false;
+ metrics.incrAppsRunning(user);
+ }
+ LOG.debug("allocate: user: "+ user +", memory: "+ request.getCapability());
+ metrics.allocateResources(user, containers.size(), request.getCapability());
}
/**
@@ -306,8 +341,7 @@ public class Application {
synchronized private void allocate(List<Container> containers) {
// Update consumption and track allocations
for (Container container : containers) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- currentConsumption, container.getResource());
+ Resources.addTo(currentConsumption, container.getResource());
allocated.add(container);
@@ -391,4 +425,17 @@ public class Application {
}
return false;
}
+
+ synchronized public void finish() {
+ // GC pending resources metrics
+ QueueMetrics metrics = queue.getMetrics();
+ for (Map<String, ResourceRequest> asks : requests.values()) {
+ ResourceRequest request = asks.get(NodeManager.ANY);
+ if (request != null) {
+ metrics.decrPendingResources(user, request.getNumContainers(),
+ Resources.multiply(request.getCapability(),
+ request.getNumContainers()));
+ }
+ }
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Sat Apr 30 07:24:20 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
/**
@@ -93,8 +94,7 @@ public class NodeManagerImpl implements
this.totalCapability = capability;
this.nodeAddress = nodeAddress;
this.httpAddress = httpAddress;
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- availableResource, capability);
+ Resources.addTo(availableResource, capability);
this.node = node;
}
@@ -296,10 +296,8 @@ public class NodeManagerImpl implements
+ this.nodeAddress);
return;
}
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- availableResource, resource);
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- usedResource, resource);
+ Resources.addTo(availableResource, resource);
+ Resources.subtractFrom(usedResource, resource);
}
public synchronized void deductAvailableResource(Resource resource) {
@@ -307,10 +305,8 @@ public class NodeManagerImpl implements
LOG.error("Invalid deduction of null resource for "
+ this.nodeAddress);
}
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- availableResource, resource);
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- usedResource, resource);
+ Resources.subtractFrom(availableResource, resource);
+ Resources.addTo(usedResource, resource);
}
public synchronized void notifyFinishedApplication(ApplicationId applicationId) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Sat Apr 30 07:24:20 2011
@@ -39,6 +39,12 @@ public interface Queue {
String getQueueName();
/**
+ * Get the queue metrics
+ * @return the queue metrics
+ */
+ QueueMetrics getMetrics();
+
+ /**
* Get ACLs for the queue.
* @return ACLs for the queue
*/
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sat Apr 30 07:24:20 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
@@ -153,7 +154,7 @@ implements ResourceScheduler, CapacitySc
CapacitySchedulerConfiguration.PREFIX + ROOT;
private void initializeQueues(CapacitySchedulerConfiguration conf) {
- root = parseQueue(conf, null, ROOT, queues);
+ root = parseQueue(conf, null, ROOT, queues, queues);
LOG.info("Initialized root queue " + root);
}
@@ -161,7 +162,7 @@ implements ResourceScheduler, CapacitySc
throws IOException {
// Parse new queues
Map<String, Queue> newQueues = new HashMap<String, Queue>();
- Queue newRoot = parseQueue(conf, null, ROOT, newQueues);
+ Queue newRoot = parseQueue(conf, null, ROOT, newQueues, queues);
// Ensure all existing queues are still present
validateExistingQueues(queues, newQueues);
@@ -186,20 +187,23 @@ implements ResourceScheduler, CapacitySc
}
private Queue parseQueue(CapacitySchedulerConfiguration conf,
- Queue parent, String queueName, Map<String, Queue> queues) {
+ Queue parent, String queueName, Map<String, Queue> queues,
+ Map<String, Queue> oldQueues) {
Queue queue;
String[] childQueueNames =
conf.getQueues((parent == null) ?
queueName : (parent.getQueuePath()+"."+queueName));
if (childQueueNames == null || childQueueNames.length == 0) {
- queue = new LeafQueue(this, queueName, parent, applicationComparator);
+ queue = new LeafQueue(this, queueName, parent, applicationComparator,
+ oldQueues.get(queueName));
} else {
ParentQueue parentQueue =
- new ParentQueue(this, queueName, queueComparator, parent);
+ new ParentQueue(this, queueName, queueComparator, parent,
+ oldQueues.get(queueName));
List<Queue> childQueues = new ArrayList<Queue>();
for (String childQueueName : childQueueNames) {
Queue childQueue =
- parseQueue(conf, parentQueue, childQueueName, queues);
+ parseQueue(conf, parentQueue, childQueueName, queues, oldQueues);
childQueues.add(childQueue);
}
parentQueue.setChildQueues(childQueues);
@@ -462,8 +466,7 @@ implements ResourceScheduler, CapacitySc
@Override
public synchronized void removeNode(NodeInfo nodeInfo) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- clusterResource, nodeInfo.getTotalCapability());
+ Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
//TODO inform the applications that the containers are completed/failed
nodes.remove(nodeInfo.getNodeAddress());
}
@@ -476,8 +479,7 @@ implements ResourceScheduler, CapacitySc
@Override
public synchronized void addNode(NodeManager nodeManager) {
nodes.put(nodeManager.getNodeAddress(), nodeManager);
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- clusterResource, nodeManager.getTotalCapability());
+ Resources.addTo(clusterResource, nodeManager.getTotalCapability());
}
public synchronized boolean releaseContainer(ApplicationId applicationId,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Sat Apr 30 07:24:20 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
public class CapacitySchedulerConfiguration extends Configuration {
@@ -92,7 +93,11 @@ public class CapacitySchedulerConfigurat
@Private
public static String DEFAULT_ACL = "*";
-
+
+ @Private public static final String ENABLE_USER_METRICS =
+ PREFIX +"user-metrics.enable";
+ @Private public static final boolean DEFAULT_ENABLE_USER_METRICS = false;
+
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@@ -191,8 +196,10 @@ public class CapacitySchedulerConfigurat
public Resource getMinimumAllocation() {
int minimumMemory = getInt(MINIMUM_ALLOCATION, MINIMUM_MEMORY);
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
- createResource(minimumMemory);
+ return Resources.createResource(minimumMemory);
+ }
+
+ public boolean getEnableUserMetrics() {
+ return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
}
-
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Sat Apr 30 07:24:20 2011
@@ -49,10 +49,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@Private
@@ -72,8 +74,7 @@ public class LeafQueue implements Queue
private int maxApplications;
private int maxApplicationsPerUser;
- private Resource usedResources =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+ private Resource usedResources = Resources.createResource(0);
private float utilization = 0.0f;
private float usedCapacity = 0.0f;
private volatile int numContainers;
@@ -86,6 +87,8 @@ public class LeafQueue implements Queue
private Map<String, User> users = new HashMap<String, User>();
+ private final QueueMetrics metrics;
+
private QueueInfo queueInfo;
private Map<ApplicationId, org.apache.hadoop.yarn.api.records.Application>
applicationInfos;
@@ -100,9 +103,13 @@ public class LeafQueue implements Queue
public LeafQueue(CapacitySchedulerContext cs,
String queueName, Queue parent,
- Comparator<Application> applicationComparator) {
+ Comparator<Application> applicationComparator, Queue old) {
this.queueName = queueName;
this.parent = parent;
+ // must be after parent and queueName are initialized
+ this.metrics = old != null ? old.getMetrics() :
+ QueueMetrics.forQueue(getQueuePath(), parent,
+ cs.getConfiguration().getEnableUserMetrics());
this.minimumAllocation = cs.getMinimumAllocation();
this.containerTokenSecretManager = cs.getContainerTokenSecretManager();
@@ -350,7 +357,7 @@ public class LeafQueue implements Queue
leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
leafQueue.state, leafQueue.acls);
- update(clusterResource);
+ updateResource(clusterResource);
}
@Override
@@ -416,6 +423,8 @@ public class LeafQueue implements Queue
addApplication(application, user);
}
+ metrics.submitApp(userName);
+
// Inform the parent queue
try {
parent.submitApplication(application, userName, queue, priority);
@@ -449,6 +458,10 @@ public class LeafQueue implements Queue
synchronized (this) {
removeApplication(application, getUser(application.getUser()));
}
+
+ // Update metrics
+ metrics.finishApp(application);
+ application.finish();
// Inform the parent queue
parent.finishApplication(application, queue);
@@ -508,12 +521,12 @@ public class LeafQueue implements Queue
// Maximum Capacity of the queue
if (!assignToQueue(clusterResource, required.getCapability())) {
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ return Resources.none();
}
// User limits
if (!assignToUser(application.getUser(), clusterResource, required.getCapability())) {
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ return Resources.none();
}
}
@@ -521,9 +534,7 @@ public class LeafQueue implements Queue
Resource assigned =
assignContainersOnNode(clusterResource, node, application, priority);
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
- assigned,
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ if (Resources.greaterThan(assigned, Resources.none())) {
Resource assignedResource =
application.getResourceRequest(priority, NodeManager.ANY).getCapability();
@@ -544,7 +555,7 @@ public class LeafQueue implements Queue
application.showRequests();
}
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ return Resources.none();
}
private synchronized Resource assignReservedContainers(Application application,
@@ -561,7 +572,7 @@ public class LeafQueue implements Queue
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ return Resources.none();
}
private synchronized boolean assignToQueue(Resource clusterResource,
@@ -616,6 +627,9 @@ public class LeafQueue implements Queue
(int)(queueCapacity * userLimitFactor)
);
+ metrics.setAvailableUserMemory(userName,
+ limit - user.getConsumedResources().getMemory());
+
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, so...
if ((user.getConsumedResources().getMemory()) > limit) {
@@ -654,22 +668,17 @@ public class LeafQueue implements Queue
Resource assignContainersOnNode(Resource clusterResource, NodeManager node,
Application application, Priority priority) {
- Resource assigned =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ Resource assigned = Resources.none();
// Data-local
assigned = assignNodeLocalContainers(clusterResource, node, application, priority);
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
- assigned,
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ if (Resources.greaterThan(assigned, Resources.none())) {
return assigned;
}
// Rack-local
assigned = assignRackLocalContainers(clusterResource, node, application, priority);
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
- assigned,
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ if (Resources.greaterThan(assigned, Resources.none())) {
return assigned;
}
@@ -688,7 +697,7 @@ public class LeafQueue implements Queue
}
}
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ return Resources.none();
}
Resource assignRackLocalContainers(Resource clusterResource, NodeManager node,
@@ -702,7 +711,7 @@ public class LeafQueue implements Queue
}
}
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ return Resources.none();
}
Resource assignOffSwitchContainers(Resource clusterResource, NodeManager node,
@@ -716,7 +725,7 @@ public class LeafQueue implements Queue
}
}
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+ return Resources.none();
}
boolean canAssign(Application application, Priority priority,
@@ -837,7 +846,8 @@ public class LeafQueue implements Queue
}
}
- return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+
+ return Resources.none();
}
private void allocate(Application application, NodeType type,
@@ -893,9 +903,8 @@ public class LeafQueue implements Queue
private synchronized void allocateResource(Resource clusterResource,
String userName, Resource resource) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
- addResource(usedResources, resource);
- update(clusterResource);
+ Resources.addTo(usedResources, resource);
+ updateResource(clusterResource);
++numContainers;
User user = getUser(userName);
@@ -904,23 +913,29 @@ public class LeafQueue implements Queue
private synchronized void releaseResource(Resource clusterResource,
String userName, Resource resource) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
- subtractResource(usedResources, resource);
- update(clusterResource);
+ Resources.subtractFrom(usedResources, resource);
+ updateResource(clusterResource);
--numContainers;
User user = getUser(userName);
user.releaseContainer(resource);
}
- private synchronized void update(Resource clusterResource) {
- setUtilization(usedResources.getMemory() / (clusterResource.getMemory() * absoluteCapacity));
+ @Override
+ public synchronized void updateResource(Resource clusterResource) {
+ float memLimit = clusterResource.getMemory() * absoluteCapacity;
+ setUtilization(usedResources.getMemory() / memLimit);
setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
+ metrics.setAvailableQueueMemory((int) memLimit - usedResources.getMemory());
}
-
+
+ @Override
+ public QueueMetrics getMetrics() {
+ return metrics;
+ }
+
static class User {
- Resource consumed =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+ Resource consumed = Resources.createResource(0);
int applications = 0;
public Resource getConsumedResources() {
@@ -940,13 +955,11 @@ public class LeafQueue implements Queue
}
public synchronized void assignContainer(Resource resource) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- consumed, resource);
+ Resources.addTo(consumed, resource);
}
public synchronized void releaseContainer(Resource resource) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- consumed, resource);
+ Resources.subtractFrom(consumed, resource);
}
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Sat Apr 30 07:24:20 2011
@@ -46,9 +46,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@Private
@Evolving
@@ -71,7 +73,7 @@ public class ParentQueue implements Queu
private final Comparator<Queue> queueComparator;
private Resource usedResources =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+ Resources.createResource(0);
private final boolean rootQueue;
@@ -81,7 +83,9 @@ public class ParentQueue implements Queu
private volatile int numContainers;
private QueueState state;
-
+
+ private final QueueMetrics metrics;
+
private QueueInfo queueInfo;
private Map<ApplicationId, org.apache.hadoop.yarn.api.records.Application>
applicationInfos;
@@ -93,13 +97,18 @@ public class ParentQueue implements Queu
RecordFactoryProvider.getRecordFactory(null);
public ParentQueue(CapacitySchedulerContext cs,
- String queueName, Comparator<Queue> comparator, Queue parent) {
+ String queueName, Comparator<Queue> comparator, Queue parent, Queue old) {
minimumAllocation = cs.getMinimumAllocation();
this.parent = parent;
this.queueName = queueName;
this.rootQueue = (parent == null);
-
+
+ // must be called after parent and queueName is set
+ this.metrics = old != null ? old.getMetrics() :
+ QueueMetrics.forQueue(getQueuePath(), parent,
+ cs.getConfiguration().getEnableUserMetrics());
+
float capacity =
(float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
@@ -377,7 +386,7 @@ public class ParentQueue implements Queu
parentQueue.state, parentQueue.acls);
// Update
- update(clusterResource);
+ updateResource(clusterResource);
}
Map<String, Queue> getQueues(Set<Queue> queues) {
@@ -496,8 +505,7 @@ public class ParentQueue implements Queu
@Override
public synchronized Resource assignContainers(
Resource clusterResource, NodeManager node) {
- Resource assigned =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+ Resource assigned = Resources.createResource(0);
while (canAssign(node)) {
LOG.info("DEBUG --- Trying to assign containers to child-queue of " +
@@ -515,15 +523,12 @@ public class ParentQueue implements Queu
Resource assignedToChild = assignContainersToChildQueues(clusterResource, node);
// Done if no child-queue assigned anything
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
- assignedToChild,
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ if (Resources.greaterThan(assignedToChild, Resources.none())) {
// Track resource utilization for the parent-queue
allocateResource(clusterResource, assignedToChild);
// Track resource utilization in this pass of the scheduler
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- assigned, assignedToChild);
+ Resources.addTo(assigned, assignedToChild);
LOG.info("assignedContainer" +
" queue=" + getQueueName() +
@@ -555,15 +560,13 @@ public class ParentQueue implements Queu
private boolean canAssign(NodeManager node) {
return (node.getReservedApplication() == null) &&
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThanOrEqual(
- node.getAvailableResource(),
- minimumAllocation);
+ Resources.greaterThanOrEqual(node.getAvailableResource(),
+ minimumAllocation);
}
synchronized Resource assignContainersToChildQueues(Resource cluster,
NodeManager node) {
- Resource assigned =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(0);
+ Resource assigned = Resources.createResource(0);
printChildQueues();
@@ -576,9 +579,7 @@ public class ParentQueue implements Queu
assigned = childQueue.assignContainers(cluster, node);
// If we do assign, remove the queue and re-insert in-order to re-sort
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
- assigned,
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
+ if (Resources.greaterThan(assigned, Resources.none())) {
// Remove and re-insert to sort
iter.remove();
LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() +
@@ -629,23 +630,28 @@ public class ParentQueue implements Queu
private synchronized void allocateResource(Resource clusterResource,
Resource resource) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
- addResource(usedResources, resource);
- update(clusterResource);
+ Resources.addTo(usedResources, resource);
+ updateResource(clusterResource);
++numContainers;
}
private synchronized void releaseResource(Resource clusterResource,
Resource resource) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
- subtractResource(usedResources, resource);
- update(clusterResource);
+ Resources.subtractFrom(usedResources, resource);
+ updateResource(clusterResource);
--numContainers;
}
- private synchronized void update(Resource clusterResource) {
- setUtilization(usedResources.getMemory() / (clusterResource.getMemory() * absoluteCapacity));
+ @Override
+ public synchronized void updateResource(Resource clusterResource) {
+ float memLimit = clusterResource.getMemory() * absoluteCapacity;
+ setUtilization(usedResources.getMemory() / memLimit);
setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
+ metrics.setAvailableQueueMemory((int) memLimit - usedResources.getMemory());
+ }
+
+ @Override
+ public QueueMetrics getMetrics() {
+ return metrics;
}
-
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Sat Apr 30 07:24:20 2011
@@ -188,4 +188,10 @@ extends org.apache.hadoop.yarn.server.re
*/
public void reinitialize(Queue queue, Resource clusterResource)
throws IOException;
+
+ /**
+ * Update the cluster resource for queues as we add/remove nodes
+ * @param clusterResource the current cluster resource
+ */
+ public void updateResource(Resource clusterResource);
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sat Apr 30 07:24:20 2011
@@ -51,12 +51,14 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@@ -77,24 +79,29 @@ public class FifoScheduler implements Re
private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
- public static final Resource MINIMUM_ALLOCATION =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- MINIMUM_MEMORY);
+ public static final Resource MINIMUM_ALLOCATION =
+ Resources.createResource(MINIMUM_MEMORY);
Map<ApplicationId, Application> applications =
new TreeMap<ApplicationId, Application>(
new org.apache.hadoop.yarn.util.BuilderUtils.ApplicationIdComparator());
+ private static final String DEFAULT_QUEUE_NAME = "default";
+ private final QueueMetrics metrics =
+ QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
+
private final Queue DEFAULT_QUEUE = new Queue() {
-
- private static final String DEFAULT_QUEUE_NAME = "default";
-
@Override
public String getQueueName() {
return DEFAULT_QUEUE_NAME;
}
@Override
+ public QueueMetrics getMetrics() {
+ return metrics;
+ }
+
+ @Override
public QueueInfo getQueueInfo(boolean includeApplications,
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
@@ -196,8 +203,10 @@ public class FifoScheduler implements Re
private void normalizeRequest(ResourceRequest ask) {
int memory = ask.getCapability().getMemory();
- memory =
- MINIMUM_MEMORY * ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY));
+ // FIXME: TestApplicationCleanup is relying on unnormalized behavior.
+ //ask.capability.memory = MINIMUM_MEMORY *
+ memory = MINIMUM_MEMORY *
+ ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
}
private synchronized Application getApplication(ApplicationId applicationId) {
@@ -210,6 +219,7 @@ public class FifoScheduler implements Re
throws IOException {
applications.put(applicationId,
new Application(applicationId, master, DEFAULT_QUEUE, user));
+ metrics.submitApp(user);
LOG.info("Application Submission: " + applicationId.getId() + " from " + user +
", currently active: " + applications.size());
}
@@ -225,7 +235,11 @@ public class FifoScheduler implements Re
// Release current containers
releaseContainers(application, application.getCurrentContainers());
-
+
+ // Update metrics
+ metrics.finishApp(application);
+ application.finish();
+
// Let the cluster know that the applications are done
finishedApplication(applicationId,
application.getAllNodesForApplication());
@@ -269,8 +283,7 @@ public class FifoScheduler implements Re
application.showRequests();
// Done
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.lessThan(
- node.getAvailableResource(), MINIMUM_ALLOCATION)) {
+ if (Resources.lessThan(node.getAvailableResource(), MINIMUM_ALLOCATION)) {
return;
}
}
@@ -416,7 +429,7 @@ public class FifoScheduler implements Re
node.getHttpAddress(), capability);
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
- ContainerToken containerToken = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerToken.class);
+ ContainerToken containerToken = recordFactory.newRecordInstance(ContainerToken.class);
ContainerTokenIdentifier tokenidentifier =
new ContainerTokenIdentifier(container.getId(),
container.getContainerManagerAddress(), container.getResource());
@@ -433,6 +446,8 @@ public class FifoScheduler implements Re
}
application.allocate(type, node, priority, request, containers);
addAllocatedContainers(node, application.getApplicationId(), containers);
+ Resources.addTo(usedResource,
+ Resources.multiply(capability, assignedContainers));
}
return assignedContainers;
}
@@ -457,10 +472,12 @@ public class FifoScheduler implements Re
NodeResponse nodeResponse = nodeUpdateInternal(node, containers);
applicationCompletedContainers(nodeResponse.getCompletedContainers());
LOG.info("Node heartbeat " + node.getNodeID() + " resource = " + node.getAvailableResource());
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
- greaterThanOrEqual(node.getAvailableResource(), MINIMUM_ALLOCATION)) {
+ if (Resources.greaterThanOrEqual(node.getAvailableResource(),
+ MINIMUM_ALLOCATION)) {
assignContainers(node);
}
+ metrics.setAvailableQueueMemory(
+ clusterResource.getMemory() - usedResource.getMemory());
LOG.info("Node after allocation " + node.getNodeID() + " resource = "
+ node.getAvailableResource());
@@ -495,7 +512,8 @@ public class FifoScheduler implements Re
}
private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
- private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
+ private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
+ private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
public synchronized Resource getClusterResource() {
return clusterResource;
@@ -503,8 +521,7 @@ public class FifoScheduler implements Re
@Override
public synchronized void removeNode(NodeInfo nodeInfo) {
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- clusterResource, nodeInfo.getTotalCapability());
+ Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
//TODO inform the the applications that the containers are completed/failed
nodes.remove(nodeInfo.getNodeAddress());
}
@@ -539,8 +556,7 @@ public class FifoScheduler implements Re
@Override
public synchronized void addNode(NodeManager nodeManager) {
nodes.put(nodeManager.getNodeAddress(), nodeManager);
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- clusterResource, nodeManager.getTotalCapability());
+ Resources.addTo(clusterResource, nodeManager.getTotalCapability());
}
public synchronized boolean releaseContainer(ApplicationId applicationId,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Sat Apr 30 07:24:20 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@Private
@@ -211,8 +212,7 @@ public class Application {
stopRequest.setContainerId(containerId);
nodeManager.stopContainer(stopRequest);
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- used, requestSpec.get(task.getPriority()));
+ Resources.subtractFrom(used, requestSpec.get(task.getPriority()));
LOG.info("Finished task " + task.getTaskId() +
" of application " + applicationId +
@@ -324,8 +324,7 @@ public class Application {
Container container = i.next();
String host = container.getContainerManagerAddress();
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.equals(
- requestSpec.get(priority), container.getResource())) {
+ if (Resources.equals(requestSpec.get(priority), container.getResource())) {
// See which task can use this container
for (Iterator<Task> t=tasks.get(priority).iterator(); t.hasNext();) {
Task task = t.next();
@@ -336,8 +335,7 @@ public class Application {
i.remove();
// Track application resource usage
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- used, container.getResource());
+ Resources.addTo(used, container.getResource());
LOG.info("Assigned container (" + container + ") of type " + type +
" to task " + task.getTaskId() + " at priority " + priority +
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Sat Apr 30 07:24:20 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
@@ -81,11 +82,8 @@ public class NodeManager implements Cont
this.nodeHttpAddress = hostName + ":" + httpPort;
this.rackName = rackName;
this.resourceTracker = resourceTracker;
- this.capability =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- memory);
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- available, capability);
+ this.capability = Resources.createResource(memory);
+ Resources.addTo(available, capability);
RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHost(hostName);
@@ -174,10 +172,8 @@ public class NodeManager implements Cont
containerLaunchContext.getResource());
applicationContainers.add(container);
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- available, containerLaunchContext.getResource());
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- used, containerLaunchContext.getResource());
+ Resources.subtractFrom(available, containerLaunchContext.getResource());
+ Resources.addTo(used, containerLaunchContext.getResource());
LOG.info("DEBUG --- startContainer:" +
" node=" + containerManagerAddress +
@@ -230,10 +226,8 @@ public class NodeManager implements Cont
" stopped " + ctr + " times!");
}
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- available, container.getResource());
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- used, container.getResource());
+ Resources.addTo(available, container.getResource());
+ Resources.subtractFrom(used, container.getResource());
LOG.info("DEBUG --- stopContainer:" +
" node=" + containerManagerAddress +
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Sat Apr 30 07:24:20 2011
@@ -18,26 +18,22 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-
import java.io.IOException;
-import junit.framework.TestCase;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class TestResourceManager extends TestCase {
+public class TestResourceManager {
private static final Log LOG = LogFactory.getLog(TestResourceManager.class);
private ResourceManager resourceManager = null;
@@ -133,9 +129,7 @@ public class TestResourceManager extends
// Application resource requirements
final int memory1 = 1024;
- Resource capability1 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- memory1);
+ Resource capability1 = Resources.createResource(memory1);
Priority priority1 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1);
application.addResourceRequestSpec(priority1, capability1);
@@ -144,9 +138,7 @@ public class TestResourceManager extends
application.addTask(t1);
final int memory2 = 2048;
- Resource capability2 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- memory2);
+ Resource capability2 = Resources.createResource(memory2);
Priority priority0 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); // higher
application.addResourceRequestSpec(priority0, capability2);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sat Apr 30 07:24:20 2011
@@ -40,6 +40,7 @@ import org.junit.Test;
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
public class TestCapacityScheduler extends TestCase {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@@ -103,14 +104,10 @@ public class TestCapacityScheduler exten
application_0.addNodeManager(host_0, 1234, nm_0);
application_0.addNodeManager(host_1, 1234, nm_1);
- Resource capability_0_0 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- 1 * GB);
+ Resource capability_0_0 = Resources.createResource(1 * GB);
application_0.addResourceRequestSpec(priority_1, capability_0_0);
- Resource capability_0_1 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- 2 * GB);
+ Resource capability_0_1 = Resources.createResource(2 * GB);
application_0.addResourceRequestSpec(priority_0, capability_0_1);
Task task_0_0 = new Task(application_0, priority_1,
@@ -124,14 +121,10 @@ public class TestCapacityScheduler exten
application_1.addNodeManager(host_0, 1234, nm_0);
application_1.addNodeManager(host_1, 1234, nm_1);
- Resource capability_1_0 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- 3 * GB);
+ Resource capability_1_0 = Resources.createResource(3 * GB);
application_1.addResourceRequestSpec(priority_1, capability_1_0);
- Resource capability_1_1 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- 2 * GB);
+ Resource capability_1_1 = Resources.createResource(2 * GB);
application_1.addResourceRequestSpec(priority_0, capability_1_1);
Task task_1_0 = new Task(application_1, priority_1,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1098051&r1=1098050&r2=1098051&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Sat Apr 30 07:24:20 2011
@@ -36,6 +36,7 @@ import org.junit.Test;
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
public class TestFifoScheduler extends TestCase {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -94,14 +95,10 @@ public class TestFifoScheduler extends T
application_0.addNodeManager(host_0, 1234, nm_0);
application_0.addNodeManager(host_1, 1234, nm_1);
- Resource capability_0_0 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- GB);
+ Resource capability_0_0 = Resources.createResource(GB);
application_0.addResourceRequestSpec(priority_1, capability_0_0);
- Resource capability_0_1 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- 2 * GB);
+ Resource capability_0_1 = Resources.createResource(2 * GB);
application_0.addResourceRequestSpec(priority_0, capability_0_1);
Task task_0_0 = new Task(application_0, priority_1,
@@ -115,14 +112,10 @@ public class TestFifoScheduler extends T
application_1.addNodeManager(host_0, 1234, nm_0);
application_1.addNodeManager(host_1, 1234, nm_1);
- Resource capability_1_0 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- 3 * GB);
+ Resource capability_1_0 = Resources.createResource(3 * GB);
application_1.addResourceRequestSpec(priority_1, capability_1_0);
- Resource capability_1_1 =
- org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
- 4 * GB);
+ Resource capability_1_1 = Resources.createResource(4 * GB);
application_1.addResourceRequestSpec(priority_0, capability_1_1);
Task task_1_0 = new Task(application_1, priority_1,
|