Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Tue Jul 17 01:51:07 2012
@@ -18,31 +18,410 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-/**
- * This class extends the application lifecycle management contained with
- * the {@link SchedulerApp} class and adds delay-scheduling information
- * specific to the FairScheduler.
- */
-public class FSSchedulerApp extends SchedulerApp {
- private static final Log LOG = LogFactory.getLog(SchedulerApp.class);
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+
+public class FSSchedulerApp extends SchedulerApplication {
+
+ private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
+
+ private final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ private final AppSchedulingInfo appSchedulingInfo;
+ private final Queue queue;
+
+ private final Resource currentConsumption = recordFactory
+ .newRecordInstance(Resource.class);
+ private Resource resourceLimit = recordFactory
+ .newRecordInstance(Resource.class);
+
+ private Map<ContainerId, RMContainer> liveContainers
+ = new HashMap<ContainerId, RMContainer>();
+ private List<RMContainer> newlyAllocatedContainers =
+ new ArrayList<RMContainer>();
+
+ final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
+ new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+ /**
+ * Count how many times the application has been given an opportunity
+ * to schedule a task at each priority. Each time the scheduler
+ * asks the application for a task at this priority, it is incremented,
+ * and each time the application successfully schedules a task, it
+ * is reset to 0.
+ */
+ Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+
+ Multiset<Priority> reReservations = HashMultiset.create();
+
+ Resource currentReservation = recordFactory
+ .newRecordInstance(Resource.class);
+
+ private final RMContext rmContext;
+ public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ RMContext rmContext, ApplicationStore store) {
+ this.rmContext = rmContext;
+ this.appSchedulingInfo =
+ new AppSchedulingInfo(applicationAttemptId, user, queue,
+ activeUsersManager, store);
+ this.queue = queue;
+ }
+
+ public ApplicationId getApplicationId() {
+ return this.appSchedulingInfo.getApplicationId();
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return this.appSchedulingInfo.getApplicationAttemptId();
+ }
+
+ public String getUser() {
+ return this.appSchedulingInfo.getUser();
+ }
+
+ public synchronized void updateResourceRequests(
+ List<ResourceRequest> requests) {
+ this.appSchedulingInfo.updateResourceRequests(requests);
+ }
+
+ public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
+ return this.appSchedulingInfo.getResourceRequests(priority);
+ }
+
+ public int getNewContainerId() {
+ return this.appSchedulingInfo.getNewContainerId();
+ }
+
+ public Collection<Priority> getPriorities() {
+ return this.appSchedulingInfo.getPriorities();
+ }
+
+ public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) {
+ return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress);
+ }
+
+ public synchronized int getTotalRequiredResources(Priority priority) {
+ return getResourceRequest(priority, RMNode.ANY).getNumContainers();
+ }
+
+ public Resource getResource(Priority priority) {
+ return this.appSchedulingInfo.getResource(priority);
+ }
+
+ /**
+ * Is this application pending?
+ * @return true if it is else false.
+ */
+ @Override
+ public boolean isPending() {
+ return this.appSchedulingInfo.isPending();
+ }
+
+ public String getQueueName() {
+ return this.appSchedulingInfo.getQueueName();
+ }
+
+ /**
+ * Get the list of live containers
+ * @return All of the live containers
+ */
+ @Override
+ public synchronized Collection<RMContainer> getLiveContainers() {
+ return new ArrayList<RMContainer>(liveContainers.values());
+ }
+
+ public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
+ // Cleanup all scheduling information
+ this.appSchedulingInfo.stop(rmAppAttemptFinalState);
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void containerLaunchedOnNode(ContainerId containerId,
+ NodeId nodeId) {
+ // Inform the container
+ RMContainer rmContainer =
+ getRMContainer(containerId);
+ if (rmContainer == null) {
+ // Some unknown container sneaked into the system. Kill it.
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+ return;
+ }
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.LAUNCHED));
+ }
+
+ synchronized public void containerCompleted(RMContainer rmContainer,
+ ContainerStatus containerStatus, RMContainerEventType event) {
+
+ Container container = rmContainer.getContainer();
+ ContainerId containerId = container.getId();
+
+ // Inform the container
+ rmContainer.handle(
+ new RMContainerFinishedEvent(
+ containerId,
+ containerStatus,
+ event)
+ );
+ LOG.info("Completed container: " + rmContainer.getContainerId() +
+ " in state: " + rmContainer.getState() + " event:" + event);
+
+ // Remove from the list of containers
+ liveContainers.remove(rmContainer.getContainerId());
+
+ RMAuditLogger.logSuccess(getUser(),
+ AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
+ getApplicationId(), containerId);
+
+ // Update usage metrics
+ Resource containerResource = rmContainer.getContainer().getResource();
+ queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+ Resources.subtractFrom(currentConsumption, containerResource);
+ }
+
+ synchronized public List<Container> pullNewlyAllocatedContainers() {
+ List<Container> returnContainerList = new ArrayList<Container>(
+ newlyAllocatedContainers.size());
+ for (RMContainer rmContainer : newlyAllocatedContainers) {
+ rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+ RMContainerEventType.ACQUIRED));
+ returnContainerList.add(rmContainer.getContainer());
+ }
+ newlyAllocatedContainers.clear();
+ return returnContainerList;
+ }
+
+ public Resource getCurrentConsumption() {
+ return this.currentConsumption;
+ }
+
+ synchronized public void showRequests() {
+ if (LOG.isDebugEnabled()) {
+ for (Priority priority : getPriorities()) {
+ Map<String, ResourceRequest> requests = getResourceRequests(priority);
+ if (requests != null) {
+ LOG.debug("showRequests:" + " application=" + getApplicationId() +
+ " headRoom=" + getHeadroom() +
+ " currentConsumption=" + currentConsumption.getMemory());
+ for (ResourceRequest request : requests.values()) {
+ LOG.debug("showRequests:" + " application=" + getApplicationId()
+ + " request=" + request);
+ }
+ }
+ }
+ }
+ }
+
+ public synchronized RMContainer getRMContainer(ContainerId id) {
+ return liveContainers.get(id);
+ }
+
+ synchronized public void addSchedulingOpportunity(Priority priority) {
+ this.schedulingOpportunities.setCount(priority,
+ schedulingOpportunities.count(priority) + 1);
+ }
+
+ /**
+ * Return the number of times the application has been given an opportunity
+ * to schedule a task at the given priority since the last time it
+ * successfully did so.
+ */
+ synchronized public int getSchedulingOpportunities(Priority priority) {
+ return this.schedulingOpportunities.count(priority);
+ }
+
+ synchronized void resetReReservations(Priority priority) {
+ this.reReservations.setCount(priority, 0);
+ }
+
+ synchronized void addReReservation(Priority priority) {
+ this.reReservations.add(priority);
+ }
+
+ synchronized public int getReReservations(Priority priority) {
+ return this.reReservations.count(priority);
+ }
+
+ public synchronized int getNumReservedContainers(Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ return (reservedContainers == null) ? 0 : reservedContainers.size();
+ }
+
+ /**
+ * Get total current reservations.
+ * Used only by unit tests
+ * @return total current reservations
+ */
+ @Stable
+ @Private
+ public synchronized Resource getCurrentReservation() {
+ return currentReservation;
+ }
+
+ public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority,
+ RMContainer rmContainer, Container container) {
+ // Create RMContainer if necessary
+ if (rmContainer == null) {
+ rmContainer =
+ new RMContainerImpl(container, getApplicationAttemptId(),
+ node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
+ rmContext.getContainerAllocationExpirer());
+
+ Resources.addTo(currentReservation, container.getResource());
+
+ // Reset the re-reservation count
+ resetReReservations(priority);
+ } else {
+ // Note down the re-reservation
+ addReReservation(priority);
+ }
+ rmContainer.handle(new RMContainerReservedEvent(container.getId(),
+ container.getResource(), node.getNodeID(), priority));
+
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers == null) {
+ reservedContainers = new HashMap<NodeId, RMContainer>();
+ this.reservedContainers.put(priority, reservedContainers);
+ }
+ reservedContainers.put(node.getNodeID(), rmContainer);
+
+ LOG.info("Application " + getApplicationId()
+ + " reserved container " + rmContainer
+ + " on node " + node + ", currently has " + reservedContainers.size()
+ + " at priority " + priority
+ + "; currentReservation " + currentReservation.getMemory());
+
+ return rmContainer;
+ }
+
+ public synchronized void unreserve(FSSchedulerNode node, Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
+ if (reservedContainers.isEmpty()) {
+ this.reservedContainers.remove(priority);
+ }
+
+ // Reset the re-reservation count
+ resetReReservations(priority);
+
+ Resource resource = reservedContainer.getContainer().getResource();
+ Resources.subtractFrom(currentReservation, resource);
+
+ LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ + node + ", currently has " + reservedContainers.size() + " at priority "
+ + priority + "; currentReservation " + currentReservation);
+ }
+
+ /**
+ * Has the application reserved the given <code>node</code> at the
+ * given <code>priority</code>?
+ * @param node node to be checked
+ * @param priority priority of reserved container
+ * @return true is reserved, false if not
+ */
+ public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers != null) {
+ return reservedContainers.containsKey(node.getNodeID());
+ }
+ return false;
+ }
+
+ public synchronized float getLocalityWaitFactor(
+ Priority priority, int clusterNodes) {
+ // Estimate: Required unique resources (i.e. hosts + racks)
+ int requiredResources =
+ Math.max(this.getResourceRequests(priority).size() - 1, 0);
+
+ // waitFactor can't be more than '1'
+ // i.e. no point skipping more than clustersize opportunities
+ return Math.min(((float)requiredResources / clusterNodes), 1.0f);
+ }
+
+ /**
+ * Get the list of reserved containers
+ * @return All of the reserved containers.
+ */
+ @Override
+ public synchronized List<RMContainer> getReservedContainers() {
+ List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+ for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
+ this.reservedContainers.entrySet()) {
+ reservedContainers.addAll(e.getValue().values());
+ }
+ return reservedContainers;
+ }
+
+ public synchronized void setHeadroom(Resource globalLimit) {
+ this.resourceLimit = globalLimit;
+ }
+
+ /**
+ * Get available headroom in terms of resources for the application's user.
+ * @return available resource headroom
+ */
+ public synchronized Resource getHeadroom() {
+ // Corner case to deal with applications being slightly over-limit
+ if (resourceLimit.getMemory() < 0) {
+ resourceLimit.setMemory(0);
+ }
+
+ return resourceLimit;
+ }
+
+ public Queue getQueue() {
+ return queue;
+ }
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
@@ -62,13 +441,6 @@ public class FSSchedulerApp extends Sche
// Time of the last container scheduled at the current allowed level
Map<Priority, Long> lastScheduledContainer = new HashMap<Priority, Long>();
- public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
- RMContext rmContext, ApplicationStore store) {
- super(applicationAttemptId, user, queue, activeUsersManager,
- rmContext, store);
- }
-
/**
* Should be called when an application has successfully scheduled a container,
* or when the scheduling locality threshold is relaxed.
@@ -78,7 +450,7 @@ public class FSSchedulerApp extends Sche
*/
synchronized public void resetSchedulingOpportunities(Priority priority) {
this.lastScheduledContainer.put(priority, System.currentTimeMillis());
- super.resetSchedulingOpportunities(priority);
+ this.schedulingOpportunities.setCount(priority, 0);
}
/**
@@ -127,7 +499,7 @@ public class FSSchedulerApp extends Sche
}
- synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
+ synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
// Update allowed locality level
@@ -143,7 +515,42 @@ public class FSSchedulerApp extends Sche
this.resetAllowedLocalityLevel(priority, type);
}
}
- return super.allocate(type, node, priority, request, container);
+
+ // Required sanity check - AM can call 'allocate' to update resource
+ // request without locking the scheduler, hence we need to check
+ if (getTotalRequiredResources(priority) <= 0) {
+ return null;
+ }
+
+ // Create RMContainer
+ RMContainer rmContainer = new RMContainerImpl(container, this
+ .getApplicationAttemptId(), node.getNodeID(), this.rmContext
+ .getDispatcher().getEventHandler(), this.rmContext
+ .getContainerAllocationExpirer());
+
+ // Add it to allContainers list.
+ newlyAllocatedContainers.add(rmContainer);
+ liveContainers.put(container.getId(), rmContainer);
+
+ // Update consumption and track allocations
+ appSchedulingInfo.allocate(type, node, priority, request, container);
+ Resources.addTo(currentConsumption, container.getResource());
+
+ // Inform the container
+ rmContainer.handle(
+ new RMContainerEvent(container.getId(), RMContainerEventType.START));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocate: applicationAttemptId="
+ + container.getId().getApplicationAttemptId()
+ + " container=" + container.getId() + " host="
+ + container.getNodeId().getHost() + " type=" + type);
+ }
+ RMAuditLogger.logSuccess(getUser(),
+ AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
+ getApplicationId(), container.getId());
+
+ return rmContainer;
}
/**
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Jul 17 01:51:07 2012
@@ -63,9 +63,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -116,12 +114,12 @@ public class FairScheduler implements Re
// This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup.
- protected Map<ApplicationAttemptId, SchedulerApp> applications
- = new HashMap<ApplicationAttemptId, SchedulerApp>();
+ protected Map<ApplicationAttemptId, FSSchedulerApp> applications
+ = new HashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
- private Map<NodeId, SchedulerNode> nodes =
- new ConcurrentHashMap<NodeId, SchedulerNode>();
+ private Map<NodeId, FSSchedulerNode> nodes =
+ new ConcurrentHashMap<NodeId, FSSchedulerNode>();
// Aggregate capacity of the cluster
private Resource clusterCapacity =
@@ -158,7 +156,7 @@ public class FairScheduler implements Re
}
private RMContainer getRMContainer(ContainerId containerId) {
- SchedulerApp application =
+ FSSchedulerApp application =
applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@@ -294,7 +292,8 @@ public class FairScheduler implements Re
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none()))
return;
- Map<RMContainer, SchedulerApp> apps = new HashMap<RMContainer, SchedulerApp>();
+ Map<RMContainer, FSSchedulerApp> apps =
+ new HashMap<RMContainer, FSSchedulerApp>();
Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
// Collect running containers from over-scheduled queues
@@ -526,7 +525,7 @@ public class FairScheduler implements Re
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- SchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@@ -576,7 +575,7 @@ public class FairScheduler implements Re
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- SchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
@@ -585,7 +584,7 @@ public class FairScheduler implements Re
}
// Get the node on which the container was allocated
- SchedulerNode node = nodes.get(container.getNodeId());
+ FSSchedulerNode node = nodes.get(container.getNodeId());
if (rmContainer.getState() == RMContainerState.RESERVED) {
application.unreserve(node, rmContainer.getReservedPriority());
@@ -602,7 +601,7 @@ public class FairScheduler implements Re
}
private synchronized void addNode(RMNode node) {
- this.nodes.put(node.getNodeID(), new SchedulerNode(node));
+ this.nodes.put(node.getNodeID(), new FSSchedulerNode(node));
Resources.addTo(clusterCapacity, node.getTotalCapability());
LOG.info("Added node " + node.getNodeAddress() +
@@ -610,7 +609,7 @@ public class FairScheduler implements Re
}
private synchronized void removeNode(RMNode rmNode) {
- SchedulerNode node = this.nodes.get(rmNode.getNodeID());
+ FSSchedulerNode node = this.nodes.get(rmNode.getNodeID());
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
// Remove running containers
@@ -643,7 +642,7 @@ public class FairScheduler implements Re
List<ResourceRequest> ask, List<ContainerId> release) {
// Make sure this application exists
- SchedulerApp application = applications.get(appAttemptId);
+ FSSchedulerApp application = applications.get(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@@ -704,10 +703,10 @@ public class FairScheduler implements Re
* Process a container which has launched on a node, as reported by the
* node.
*/
- private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
+ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- SchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
@@ -726,7 +725,7 @@ public class FairScheduler implements Re
List<ContainerStatus> completedContainers) {
LOG.info("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
eventLog.log("HEARTBEAT", nm.getHostName());
- SchedulerNode node = nodes.get(nm.getNodeID());
+ FSSchedulerNode node = nodes.get(nm.getNodeID());
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
@@ -749,7 +748,7 @@ public class FairScheduler implements Re
// already, we try to complete the reservation.
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
- SchedulerApp reservedApplication =
+ FSSchedulerApp reservedApplication =
applications.get(reservedContainer.getApplicationAttemptId());
// Try to fulfill the reservation
@@ -787,7 +786,7 @@ public class FairScheduler implements Re
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
- SchedulerNode node = nodes.get(nodeId);
+ FSSchedulerNode node = nodes.get(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Tue Jul 17 01:51:07 2012
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -422,7 +421,7 @@ public class QueueManager {
/**
* Remove an app
*/
- public synchronized void removeJob(SchedulerApp app) {
+ public synchronized void removeJob(FSSchedulerApp app) {
getQueue(app.getQueueName()).removeJob(app);
}
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Tue Jul 17 01:51:07 2012
@@ -23,7 +23,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
/**
* A Schedulable represents an entity that can launch tasks, such as a job
@@ -104,7 +103,7 @@ abstract class Schedulable {
* already exists on this node, and the schedulable should fulfill that
* reservation if possible.
*/
- public abstract Resource assignContainer(SchedulerNode node, boolean reserved);
+ public abstract Resource assignContainer(FSSchedulerNode node, boolean reserved);
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare) {
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Jul 17 01:51:07 2012
@@ -71,11 +71,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@@ -103,14 +103,14 @@ public class FifoScheduler implements Re
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
private RMContext rmContext;
- private Map<NodeId, SchedulerNode> nodes = new ConcurrentHashMap<NodeId, SchedulerNode>();
+ private Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private boolean initialized;
private Resource minimumAllocation;
private Resource maximumAllocation;
- private Map<ApplicationAttemptId, SchedulerApp> applications
- = new TreeMap<ApplicationAttemptId, SchedulerApp>();
+ private Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+ = new TreeMap<ApplicationAttemptId, FiCaSchedulerApp>();
private ActiveUsersManager activeUsersManager;
@@ -223,7 +223,7 @@ public class FifoScheduler implements Re
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release) {
- SchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@@ -276,7 +276,7 @@ public class FifoScheduler implements Re
}
}
- private SchedulerApp getApplication(
+ private FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return applications.get(applicationAttemptId);
}
@@ -284,19 +284,19 @@ public class FifoScheduler implements Re
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
- SchedulerApp app = getApplication(applicationAttemptId);
+ FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
- private SchedulerNode getNode(NodeId nodeId) {
+ private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
private synchronized void addApplication(ApplicationAttemptId appAttemptId,
String user) {
// TODO: Fix store
- SchedulerApp schedulerApp =
- new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
+ FiCaSchedulerApp schedulerApp =
+ new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
this.rmContext, null);
applications.put(appAttemptId, schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId());
@@ -311,7 +311,7 @@ public class FifoScheduler implements Re
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState)
throws IOException {
- SchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
@@ -344,15 +344,15 @@ public class FifoScheduler implements Re
*
* @param node node on which resources are available to be allocated
*/
- private void assignContainers(SchedulerNode node) {
+ private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationAttemptId, SchedulerApp> e : applications
+ for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
.entrySet()) {
- SchedulerApp application = e.getValue();
+ FiCaSchedulerApp application = e.getValue();
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@@ -383,15 +383,15 @@ public class FifoScheduler implements Re
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
- for (SchedulerApp application : applications.values()) {
+ for (FiCaSchedulerApp application : applications.values()) {
application.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
- private int getMaxAllocatableContainers(SchedulerApp application,
- Priority priority, SchedulerNode node, NodeType type) {
+ private int getMaxAllocatableContainers(FiCaSchedulerApp application,
+ Priority priority, FiCaSchedulerNode node, NodeType type) {
ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, SchedulerNode.ANY);
+ application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
int maxContainers = offSwitchRequest.getNumContainers();
if (type == NodeType.OFF_SWITCH) {
@@ -420,8 +420,8 @@ public class FifoScheduler implements Re
}
- private int assignContainersOnNode(SchedulerNode node,
- SchedulerApp application, Priority priority
+ private int assignContainersOnNode(FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority
) {
// Data-local
int nodeLocalContainers =
@@ -447,8 +447,8 @@ public class FifoScheduler implements Re
return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
}
- private int assignNodeLocalContainers(SchedulerNode node,
- SchedulerApp application, Priority priority) {
+ private int assignNodeLocalContainers(FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
@@ -473,15 +473,15 @@ public class FifoScheduler implements Re
return assignedContainers;
}
- private int assignRackLocalContainers(SchedulerNode node,
- SchedulerApp application, Priority priority) {
+ private int assignRackLocalContainers(FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, node.getRMNode().getRackName());
if (request != null) {
// Don't allocate on this rack if the application doens't need containers
ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, SchedulerNode.ANY);
+ application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
if (offSwitchRequest.getNumContainers() <= 0) {
return 0;
}
@@ -498,11 +498,11 @@ public class FifoScheduler implements Re
return assignedContainers;
}
- private int assignOffSwitchContainers(SchedulerNode node,
- SchedulerApp application, Priority priority) {
+ private int assignOffSwitchContainers(FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
- application.getResourceRequest(priority, SchedulerNode.ANY);
+ application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
if (request != null) {
assignedContainers =
assignContainer(node, application, priority,
@@ -511,7 +511,7 @@ public class FifoScheduler implements Re
return assignedContainers;
}
- private int assignContainer(SchedulerNode node, SchedulerApp application,
+ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, int assignableContainers,
ResourceRequest request, NodeType type) {
LOG.debug("assignContainers:" +
@@ -577,7 +577,7 @@ public class FifoScheduler implements Re
private synchronized void nodeUpdate(RMNode rmNode,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
- SchedulerNode node = getNode(rmNode.getNodeID());
+ FiCaSchedulerNode node = getNode(rmNode.getNodeID());
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
@@ -667,10 +667,10 @@ public class FifoScheduler implements Re
}
}
- private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
+ private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- SchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
@@ -696,10 +696,10 @@ public class FifoScheduler implements Re
// Get the application for the finished container
Container container = rmContainer.getContainer();
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- SchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
// Get the node on which the container was allocated
- SchedulerNode node = getNode(container.getNodeId());
+ FiCaSchedulerNode node = getNode(container.getNodeId());
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
@@ -729,7 +729,7 @@ public class FifoScheduler implements Re
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) {
- SchedulerNode node = getNode(nodeInfo.getNodeID());
+ FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
if (node == null) {
return;
}
@@ -761,7 +761,7 @@ public class FifoScheduler implements Re
}
private synchronized void addNode(RMNode nodeManager) {
- this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
+ this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
}
@@ -778,12 +778,12 @@ public class FifoScheduler implements Re
@Override
public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
- SchedulerNode node = getNode(nodeId);
+ FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
private RMContainer getRMContainer(ContainerId containerId) {
- SchedulerApp application =
+ FiCaSchedulerApp application =
getApplication(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Tue Jul 17 01:51:07 2012
@@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.BuilderUtils;
@Private
@@ -72,7 +72,7 @@ public class NodeManager implements Cont
Resource used = recordFactory.newRecordInstance(Resource.class);
final ResourceTrackerService resourceTrackerService;
- final SchedulerNode schedulerNode;
+ final FiCaSchedulerNode schedulerNode;
final Map<ApplicationId, List<Container>> containers =
new HashMap<ApplicationId, List<Container>>();
@@ -98,7 +98,7 @@ public class NodeManager implements Cont
request.setNodeId(this.nodeId);
resourceTrackerService.registerNodeManager(request)
.getRegistrationResponse();
- this.schedulerNode = new SchedulerNode(rmContext.getRMNodes().get(
+ this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
this.nodeId));
// Sanity check
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Tue Jul 17 01:51:07 2012
@@ -40,8 +40,8 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -111,8 +111,8 @@ public class TestApplicationLimits {
LOG.info("Setup top-level queues a and b");
}
- private SchedulerApp getMockApplication(int appId, String user) {
- SchedulerApp application = mock(SchedulerApp.class);
+ private FiCaSchedulerApp getMockApplication(int appId, String user) {
+ FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
TestUtils.getMockApplicationAttemptId(appId, 0);
doReturn(applicationAttemptId.getApplicationId()).
@@ -209,7 +209,7 @@ public class TestApplicationLimits {
int APPLICATION_ID = 0;
// Submit first application
- SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_0, user_0, A);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@@ -217,7 +217,7 @@ public class TestApplicationLimits {
assertEquals(0, queue.getNumPendingApplications(user_0));
// Submit second application
- SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_1, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@@ -225,7 +225,7 @@ public class TestApplicationLimits {
assertEquals(0, queue.getNumPendingApplications(user_0));
// Submit third application, should remain pending
- SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_2, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@@ -240,7 +240,7 @@ public class TestApplicationLimits {
assertEquals(0, queue.getNumPendingApplications(user_0));
// Submit another one for user_0
- SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_3, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@@ -251,7 +251,7 @@ public class TestApplicationLimits {
doReturn(3).when(queue).getMaximumActiveApplications();
// Submit first app for user_1
- SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
+ FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
queue.submitApplication(app_4, user_1, A);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@@ -261,7 +261,7 @@ public class TestApplicationLimits {
assertEquals(0, queue.getNumPendingApplications(user_1));
// Submit second app for user_1, should block due to queue-limit
- SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
+ FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
queue.submitApplication(app_5, user_1, A);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
@@ -290,7 +290,7 @@ public class TestApplicationLimits {
doReturn(2).when(queue).getMaximumActiveApplications();
// Submit first application
- SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_0, user_0, A);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@@ -299,7 +299,7 @@ public class TestApplicationLimits {
assertTrue(queue.activeApplications.contains(app_0));
// Submit second application
- SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_1, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@@ -308,7 +308,7 @@ public class TestApplicationLimits {
assertTrue(queue.activeApplications.contains(app_1));
// Submit third application, should remain pending
- SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_2, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@@ -317,7 +317,7 @@ public class TestApplicationLimits {
assertTrue(queue.pendingApplications.contains(app_2));
// Submit fourth application, should remain pending
- SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
queue.submitApplication(app_3, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
@@ -393,7 +393,7 @@ public class TestApplicationLimits {
String host_0 = "host_0";
String rack_0 = "rack_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB);
final String user_0 = "user_0";
final String user_1 = "user_1";
@@ -408,8 +408,8 @@ public class TestApplicationLimits {
// and check headroom
final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0_0 =
- spy(new SchedulerApp(appAttemptId_0_0, user_0, queue,
+ FiCaSchedulerApp app_0_0 =
+ spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_0_0, user_0, A);
@@ -427,8 +427,8 @@ public class TestApplicationLimits {
// Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_0_1 =
- spy(new SchedulerApp(appAttemptId_0_1, user_0, queue,
+ FiCaSchedulerApp app_0_1 =
+ spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_0_1, user_0, A);
@@ -446,8 +446,8 @@ public class TestApplicationLimits {
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0);
- SchedulerApp app_1_0 =
- spy(new SchedulerApp(appAttemptId_1_0, user_1, queue,
+ FiCaSchedulerApp app_1_0 =
+ spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.submitApplication(app_1_0, user_1, A);
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Tue Jul 17 01:51:07 2012
@@ -62,8 +62,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.junit.After;
import org.junit.Before;
@@ -171,14 +171,14 @@ public class TestLeafQueue {
@Override
public Container answer(InvocationOnMock invocation)
throws Throwable {
- final SchedulerApp application =
- (SchedulerApp)(invocation.getArguments()[0]);
+ final FiCaSchedulerApp application =
+ (FiCaSchedulerApp)(invocation.getArguments()[0]);
final ContainerId containerId =
TestUtils.getMockContainerId(application);
Container container = TestUtils.getMockContainer(
containerId,
- ((SchedulerNode)(invocation.getArguments()[1])).getNodeID(),
+ ((FiCaSchedulerNode)(invocation.getArguments()[1])).getNodeID(),
(Resource)(invocation.getArguments()[2]),
((Priority)invocation.getArguments()[3]));
return container;
@@ -186,8 +186,8 @@ public class TestLeafQueue {
}
).
when(queue).createContainer(
- any(SchedulerApp.class),
- any(SchedulerNode.class),
+ any(FiCaSchedulerApp.class),
+ any(FiCaSchedulerNode.class),
any(Resource.class),
any(Priority.class)
);
@@ -195,7 +195,7 @@ public class TestLeafQueue {
// 2. Stub out LeafQueue.parent.completedContainer
CSQueue parent = queue.getParent();
doNothing().when(parent).completedContainer(
- any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class),
+ any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class),
any(RMContainer.class), any(ContainerStatus.class),
any(RMContainerEventType.class));
@@ -238,22 +238,22 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, B);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_1 =
- new SchedulerApp(appAttemptId_1, user_0, a,
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_0, B); // same user
// Setup some nodes
String host_0 = "host_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@@ -284,14 +284,14 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
- SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_d, d, null,
+ FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null,
rmContext, null);
d.submitApplication(app_0, user_d, D);
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
- SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_d, d, null,
+ FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null,
rmContext, null);
d.submitApplication(app_1, user_d, D); // same user
}
@@ -309,7 +309,7 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
- SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null,
+ FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null,
rmContext, null);
a.submitApplication(app_0, user_0, B);
@@ -324,7 +324,7 @@ public class TestLeafQueue {
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
- SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null,
+ FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null,
rmContext, null);
a.submitApplication(app_1, user_0, B); // same user
@@ -359,22 +359,22 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_1 =
- new SchedulerApp(appAttemptId_1, user_0, a,
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
// Setup some nodes
String host_0 = "host_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@@ -483,30 +483,30 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_1 =
- new SchedulerApp(appAttemptId_1, user_0, a,
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
- SchedulerApp app_2 =
- new SchedulerApp(appAttemptId_2, user_1, a,
+ FiCaSchedulerApp app_2 =
+ new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "host_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "host_1";
- SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@@ -576,30 +576,30 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_1 =
- new SchedulerApp(appAttemptId_1, user_0, a,
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
- SchedulerApp app_2 =
- new SchedulerApp(appAttemptId_2, user_1, a,
+ FiCaSchedulerApp app_2 =
+ new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "host_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "host_1";
- SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@@ -687,35 +687,35 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_1 =
- new SchedulerApp(appAttemptId_1, user_0, a,
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
- SchedulerApp app_2 =
- new SchedulerApp(appAttemptId_2, user_1, a,
+ FiCaSchedulerApp app_2 =
+ new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
- SchedulerApp app_3 =
- new SchedulerApp(appAttemptId_3, user_2, a,
+ FiCaSchedulerApp app_3 =
+ new FiCaSchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_3, user_2, A);
// Setup some nodes
String host_0 = "host_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@@ -862,21 +862,21 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_1 =
- new SchedulerApp(appAttemptId_1, user_1, a,
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
@@ -961,23 +961,23 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_1 =
- new SchedulerApp(appAttemptId_1, user_1, a,
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
String host_1 = "host_1";
- SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
@@ -1060,24 +1060,24 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- SchedulerApp app_1 =
- new SchedulerApp(appAttemptId_1, user_1, a,
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
String host_1 = "host_1";
- SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
@@ -1175,23 +1175,23 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- spy(new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0 = "host_0";
String rack_0 = "rack_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
String host_1 = "host_1";
String rack_1 = "rack_1";
- SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
String host_2 = "host_2";
String rack_2 = "rack_2";
- SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
+ FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@@ -1284,7 +1284,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority));
String host_3 = "host_3"; // on rack_1
- SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
+ FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
assignment = a.assignContainers(clusterResource, node_3);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
@@ -1305,23 +1305,23 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- spy(new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0 = "host_0";
String rack_0 = "rack_0";
- SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
String host_1 = "host_1";
String rack_1 = "rack_1";
- SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
String host_2 = "host_2";
String rack_2 = "rack_2";
- SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
+ FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
@@ -1435,22 +1435,22 @@ public class TestLeafQueue {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- SchedulerApp app_0 =
- spy(new SchedulerApp(appAttemptId_0, user_0, a,
+ FiCaSchedulerApp app_0 =
+ spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0_0 = "host_0_0";
String rack_0 = "rack_0";
- SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
+ FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
String host_0_1 = "host_0_1";
- SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
+ FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
String host_1_0 = "host_1_0";
String rack_1 = "rack_1";
- SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
+ FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1362334&r1=1362333&r2=1362334&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Tue Jul 17 01:51:07 2012
@@ -44,8 +44,8 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -99,22 +99,22 @@ public class TestParentQueue {
LOG.info("Setup top-level queues a and b");
}
- private SchedulerApp getMockApplication(int appId, String user) {
- SchedulerApp application = mock(SchedulerApp.class);
+ private FiCaSchedulerApp getMockApplication(int appId, String user) {
+ FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
doReturn(user).when(application).getUser();
doReturn(Resources.createResource(0)).when(application).getHeadroom();
return application;
}
private void stubQueueAllocation(final CSQueue queue,
- final Resource clusterResource, final SchedulerNode node,
+ final Resource clusterResource, final FiCaSchedulerNode node,
final int allocation) {
stubQueueAllocation(queue, clusterResource, node, allocation,
NodeType.NODE_LOCAL);
}
private void stubQueueAllocation(final CSQueue queue,
- final Resource clusterResource, final SchedulerNode node,
+ final Resource clusterResource, final FiCaSchedulerNode node,
final int allocation, final NodeType type) {
// Simulate the queue allocation
@@ -132,7 +132,7 @@ public class TestParentQueue {
((ParentQueue)queue).allocateResource(clusterResource,
allocatedResource);
} else {
- SchedulerApp app1 = getMockApplication(0, "");
+ FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
allocatedResource);
}
@@ -198,9 +198,9 @@ public class TestParentQueue {
final int memoryPerNode = 10;
final int numNodes = 2;
- SchedulerNode node_0 =
+ FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
- SchedulerNode node_1 =
+ FiCaSchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
@@ -224,9 +224,9 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -237,9 +237,9 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -250,9 +250,9 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@@ -263,9 +263,9 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_1);
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@@ -346,11 +346,11 @@ public class TestParentQueue {
final int memoryPerNode = 10;
final int numNodes = 3;
- SchedulerNode node_0 =
+ FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
- SchedulerNode node_1 =
+ FiCaSchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
- SchedulerNode node_2 =
+ FiCaSchedulerNode node_2 =
TestUtils.getMockNode("host_2", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
@@ -401,11 +401,11 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_0);
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -427,13 +427,13 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_2);
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -457,9 +457,9 @@ public class TestParentQueue {
final int memoryPerNode = 10;
final int numNodes = 2;
- SchedulerNode node_0 =
+ FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
- SchedulerNode node_1 =
+ FiCaSchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
@@ -484,9 +484,9 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -498,9 +498,9 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -523,9 +523,9 @@ public class TestParentQueue {
final int memoryPerNode = 10;
final int numNodes = 2;
- SchedulerNode node_0 =
+ FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
- SchedulerNode node_1 =
+ FiCaSchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
@@ -550,9 +550,9 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@@ -564,9 +564,9 @@ public class TestParentQueue {
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
- any(SchedulerNode.class));
+ any(FiCaSchedulerNode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);
|