Author: vinodkv
Date: Wed Aug 3 11:51:20 2011
New Revision: 1153453
URL: http://svn.apache.org/viewvc?rev=1153453&view=rev
Log:
CS is done (I hope! :) )
Added:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
Removed:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSApp.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSNode.java
Modified:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Wed Aug 3 11:51:20 2011
@@ -3,6 +3,9 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
public interface RMContainer extends EventHandler<RMContainerEvent> {
@@ -14,5 +17,11 @@ public interface RMContainer extends Eve
RMContainerState getState();
Container getContainer();
+
+ Resource getReservedResource();
+
+ NodeId getReservedNode();
+ Priority getReservedPriority();
+
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Wed Aug 3 11:51:20 2011
@@ -8,6 +8,7 @@ public enum RMContainerEventType {
// Source: SchedulerApp
ACQUIRED,
KILL, // Also from Node on NodeRemoval
+ RESERVED,
LAUNCHED,
FINISHED,
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Wed Aug 3 11:51:20 2011
@@ -11,6 +11,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@@ -36,6 +38,19 @@ public class RMContainerImpl implements
RMContainerEventType.START, new ContainerStartedTransition())
.addTransition(RMContainerState.NEW, RMContainerState.KILLED,
RMContainerEventType.KILL)
+ .addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
+ RMContainerEventType.RESERVED, new ContainerReservedTransition())
+
+ // Transitions from RESERVED state
+ .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
+ RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED,
+ RMContainerEventType.START, new ContainerStartedTransition())
+ .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED,
+ RMContainerEventType.KILL) // nothing to do
+ .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED,
+ RMContainerEventType.RELEASED) // nothing to do
+
// Transitions from ALLOCATED state
.addTransition(RMContainerState.ALLOCATED, RMContainerState.ACQUIRED,
@@ -95,6 +110,10 @@ public class RMContainerImpl implements
private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer;
+ private Resource reservedResource;
+ private NodeId reservedNode;
+ private Priority reservedPriority;
+
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId,
EventHandler handler,
@@ -139,6 +158,21 @@ public class RMContainerImpl implements
}
@Override
+ public Resource getReservedResource() {
+ return reservedResource;
+ }
+
+ @Override
+ public NodeId getReservedNode() {
+ return reservedNode;
+ }
+
+ @Override
+ public Priority getReservedPriority() {
+ return reservedPriority;
+ }
+
+ @Override
public void handle(RMContainerEvent event) {
LOG.info("Processing " + event.getContainerId() + " of type " + event.getType());
try {
@@ -171,6 +205,19 @@ public class RMContainerImpl implements
}
}
+ private static final class ContainerReservedTransition extends
+ BaseTransition {
+
+ @Override
+ public void transition(RMContainerImpl container, RMContainerEvent event) {
+ RMContainerReservedEvent e = (RMContainerReservedEvent)event;
+ container.reservedResource = e.getReservedResource();
+ container.reservedNode = e.getReservedNode();
+ container.reservedPriority = e.getReservedPriority();
+ }
+ }
+
+
private static final class ContainerStartedTransition extends
BaseTransition {
@@ -179,7 +226,7 @@ public class RMContainerImpl implements
container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
container.appAttemptId, container.container));
}
-}
+ }
private static final class AcquiredTransition extends BaseTransition {
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java?rev=1153453&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java Wed Aug 3 11:51:20 2011
@@ -0,0 +1,41 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * The event signifying that a container has been reserved.
+ *
+ * The event encapsulates information on the amount of reservation
+ * and the node on which the reservation is in effect.
+ */
+public class RMContainerReservedEvent extends RMContainerEvent {
+
+ private final Resource reservedResource;
+ private final NodeId reservedNode;
+ private final Priority reservedPriority;
+
+ public RMContainerReservedEvent(ContainerId containerId,
+ Resource reservedResource, NodeId reservedNode,
+ Priority reservedPriority) {
+ super(containerId, RMContainerEventType.RESERVED);
+ this.reservedResource = reservedResource;
+ this.reservedNode = reservedNode;
+ this.reservedPriority = reservedPriority;
+ }
+
+ public Resource getReservedResource() {
+ return reservedResource;
+ }
+
+ public NodeId getReservedNode() {
+ return reservedNode;
+ }
+
+ public Priority getReservedPriority() {
+ return reservedPriority;
+ }
+
+}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java Wed Aug 3 11:51:20 2011
@@ -1,5 +1,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
public enum RMContainerState {
- NEW, ALLOCATED, ACQUIRED, RUNNING, COMPLETED, EXPIRED, RELEASED, KILLED
+ NEW,
+ RESERVED,
+ ALLOCATED,
+ ACQUIRED,
+ RUNNING,
+ COMPLETED,
+ EXPIRED,
+ RELEASED,
+ KILLED
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Aug 3 11:51:20 2011
@@ -18,11 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,8 +32,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -74,11 +69,7 @@ public class AppSchedulingInfo {
private final ApplicationStore store;
- /* Current consumption */
- List<Container> acquired = new ArrayList<Container>();
- List<Container> completedContainers = new ArrayList<Container>();
/* Allocated by scheduler */
- List<Container> allocated = new ArrayList<Container>();
boolean pending = true; // for app metrics
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
@@ -124,38 +115,6 @@ public class AppSchedulingInfo {
}
/**
- * the currently acquired/allocated containers by the application masters.
- *
- * @return the current containers being used by the application masters.
- */
- public synchronized List<Container> getCurrentContainers() {
- List<Container> currentContainers = new ArrayList<Container>(acquired);
- currentContainers.addAll(allocated);
- return currentContainers;
- }
-
- /**
- * The ApplicationMaster is acquiring the allocated/completed resources.
- *
- * @return allocated resources
- */
- synchronized private List<Container> acquire() {
- // Return allocated containers
- acquired.addAll(allocated);
- List<Container> heartbeatContainers = allocated;
- allocated = new ArrayList<Container>();
-
- LOG.info("acquire:" + " application=" + applicationId + " #acquired="
- + heartbeatContainers.size());
- heartbeatContainers = (heartbeatContainers == null) ? new ArrayList<Container>()
- : heartbeatContainers;
-
- heartbeatContainers.addAll(completedContainers);
- completedContainers.clear();
- return heartbeatContainers;
- }
-
- /**
* The ApplicationMaster is updating resource requirements for the
* application, by asking for more resources and releasing resources acquired
* by the application.
@@ -204,24 +163,6 @@ public class AppSchedulingInfo {
}
}
- private synchronized void releaseContainers(List<Container> release) {
- // Release containers and update consumption
- for (Container container : release) {
- LOG.debug("update: " + "application=" + applicationId + " released="
- + container);
- // TOday in all code paths, this is taken by completedContainer called by
- // the caller. So commenting this.
- // Resources.subtractFrom(currentConsumption, container.getResource());
- for (Iterator<Container> i = acquired.iterator(); i.hasNext();) {
- Container c = i.next();
- if (c.getId().equals(container.getId())) {
- i.remove();
- LOG.info("Removed acquired container: " + container.getId());
- }
- }
- }
- }
-
synchronized public Collection<Priority> getPriorities() {
return priorities;
}
@@ -242,15 +183,6 @@ public class AppSchedulingInfo {
return request.getCapability();
}
- synchronized private void completedContainer(Container container,
- Resource containerResource) {
- if (container != null) {
- LOG.info("Completed container: " + container);
- completedContainers.add(container);
- }
- queue.getMetrics().releaseResources(user, 1, containerResource);
- }
-
/**
* Resources have been allocated to this application by the resource
* scheduler. Track them.
@@ -263,17 +195,17 @@ public class AppSchedulingInfo {
* the priority of the request.
* @param request
* the request
- * @param containers
+ * @param container
* the containers allocated.
*/
synchronized public void allocate(NodeType type, SchedulerNode node,
- Priority priority, ResourceRequest request, List<Container> containers) {
+ Priority priority, ResourceRequest request, Container container) {
if (type == NodeType.DATA_LOCAL) {
- allocateNodeLocal(node, priority, request, containers);
+ allocateNodeLocal(node, priority, request, container);
} else if (type == NodeType.RACK_LOCAL) {
- allocateRackLocal(node, priority, request, containers);
+ allocateRackLocal(node, priority, request, container);
} else {
- allocateOffSwitch(node, priority, request, containers);
+ allocateOffSwitch(node, priority, request, container);
}
QueueMetrics metrics = queue.getMetrics();
if (pending) {
@@ -284,7 +216,7 @@ public class AppSchedulingInfo {
}
LOG.debug("allocate: user: " + user + ", memory: "
+ request.getCapability());
- metrics.allocateResources(user, containers.size(), request.getCapability());
+ metrics.allocateResources(user, 1, request.getCapability());
}
/**
@@ -295,21 +227,19 @@ public class AppSchedulingInfo {
* resources allocated to the application
*/
synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority,
- ResourceRequest nodeLocalRequest, List<Container> containers) {
+ ResourceRequest nodeLocalRequest, Container container) {
// Update consumption and track allocations
- allocate(containers);
+ allocate(container);
// Update future requirements
- nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers()
- - containers.size());
+ nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
if (nodeLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getNodeAddress());
}
ResourceRequest rackLocalRequest = requests.get(priority).get(
node.getRackName());
- rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
- - containers.size());
+ rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
if (rackLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getRackName());
}
@@ -317,8 +247,7 @@ public class AppSchedulingInfo {
// Do not remove ANY
ResourceRequest offSwitchRequest = requests.get(priority).get(
RMNode.ANY);
- offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
- - containers.size());
+ offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
}
/**
@@ -329,14 +258,13 @@ public class AppSchedulingInfo {
* resources allocated to the application
*/
synchronized private void allocateRackLocal(SchedulerNode node, Priority priority,
- ResourceRequest rackLocalRequest, List<Container> containers) {
+ ResourceRequest rackLocalRequest, Container container) {
// Update consumption and track allocations
- allocate(containers);
+ allocate(container);
// Update future requirements
- rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
- - containers.size());
+ rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
if (rackLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getRackName());
}
@@ -344,8 +272,7 @@ public class AppSchedulingInfo {
// Do not remove ANY
ResourceRequest offSwitchRequest = requests.get(priority).get(
RMNode.ANY);
- offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
- - containers.size());
+ offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
}
/**
@@ -356,33 +283,29 @@ public class AppSchedulingInfo {
* resources allocated to the application
*/
synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority,
- ResourceRequest offSwitchRequest, List<Container> containers) {
+ ResourceRequest offSwitchRequest, Container container) {
// Update consumption and track allocations
- allocate(containers);
+ allocate(container);
// Update future requirements
// Do not remove ANY
- offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
- - containers.size());
+ offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
}
- synchronized private void allocate(List<Container> containers) {
+ synchronized private void allocate(Container container) {
// Update consumption and track allocations
- for (Container container : containers) {
-
- allocated.add(container);
- //TODO: fixme sharad
- /* try {
+ //TODO: fixme sharad
+ /* try {
store.storeContainer(container);
} catch (IOException ie) {
// TODO fix this. we shouldnt ignore
}*/
- LOG.debug("allocate: applicationId=" + applicationId + " container="
- + container.getId() + " host="
- + container.getNodeId().toString());
- }
+
+ LOG.debug("allocate: applicationId=" + applicationId + " container="
+ + container.getId() + " host="
+ + container.getNodeId().toString());
}
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java Wed Aug 3 11:51:20 2011
@@ -12,17 +12,22 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class SchedulerApp {
@@ -44,7 +49,18 @@ public class SchedulerApp {
private List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
- public SchedulerApp(AppSchedulingInfo application, Queue queue) {
+ final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
+ new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+ Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
+
+ final Resource currentReservation = recordFactory
+ .newRecordInstance(Resource.class);
+
+ private final RMContext rmContext;
+ public SchedulerApp(RMContext rmContext,
+ AppSchedulingInfo application, Queue queue) {
+ this.rmContext = rmContext;
this.appSchedulingInfo = application;
this.queue = queue;
application.setQueue(queue);
@@ -75,11 +91,6 @@ public class SchedulerApp {
return this.appSchedulingInfo.getNewContainerId();
}
- @Deprecated
- public List<Container> getCurrentContainers() {
- return this.appSchedulingInfo.getCurrentContainers();
- }
-
public Collection<Priority> getPriorities() {
return this.appSchedulingInfo.getPriorities();
}
@@ -88,6 +99,10 @@ public class SchedulerApp {
return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress);
}
+ public synchronized int getTotalRequiredResources(Priority priority) {
+ return getResourceRequest(priority, RMNode.ANY).getNumContainers();
+ }
+
public Resource getResource(Priority priority) {
return this.appSchedulingInfo.getResource(priority);
}
@@ -100,10 +115,6 @@ public class SchedulerApp {
return this.appSchedulingInfo.getQueueName();
}
- public Queue getQueue() {
- return this.queue;
- }
-
public synchronized Collection<RMContainer> getLiveContainers() {
return new ArrayList<RMContainer>(liveContainers.values());
}
@@ -126,61 +137,66 @@ public class SchedulerApp {
SchedulerApp application) {
}
- synchronized public void containerCompleted(Container cont,
+ synchronized public void containerCompleted(RMContainer rmContainer,
RMContainerEventType event) {
- ContainerId containerId = cont.getId();
+
+ Container container = rmContainer.getContainer();
+ ContainerId containerId = container.getId();
+
// Inform the container
- RMContainer container = getRMContainer(containerId);
-
- if (container == null) {
- LOG.error("Invalid container completed " + cont.getId());
- return;
- }
-
if (event.equals(RMContainerEventType.FINISHED)) {
// Have to send diagnostics for finished containers.
- container.handle(new RMContainerFinishedEvent(containerId,
- cont.getContainerStatus()));
+ rmContainer.handle(new RMContainerFinishedEvent(containerId,
+ container.getContainerStatus()));
} else {
- container.handle(new RMContainerEvent(containerId, event));
+ rmContainer.handle(new RMContainerEvent(containerId, event));
}
- LOG.info("Completed container: " + container.getContainerId() +
- " in state: " + container.getState());
+ LOG.info("Completed container: " + rmContainer.getContainerId() +
+ " in state: " + rmContainer.getState());
// Remove from the list of containers
- liveContainers.remove(container.getContainerId());
+ liveContainers.remove(rmContainer.getContainerId());
// Update usage metrics
- Resource containerResource = container.getContainer().getResource();
+ Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
}
- synchronized public void allocate(NodeType type, SchedulerNode node,
+ synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
Priority priority, ResourceRequest request,
- List<RMContainer> containers) {
- // Update consumption and track allocations
- List<Container> allocatedContainers =
- new ArrayList<Container>();
- for (RMContainer container : containers) {
- Container c = container.getContainer();
- // Inform the container
- container.handle(
- new RMContainerEvent(c.getId(), RMContainerEventType.START));
- allocatedContainers.add(c);
-
- Resources.addTo(currentConsumption, c.getResource());
- LOG.debug("allocate: applicationId=" + c.getId().getAppId()
- + " container=" + c.getId() + " host="
- + c.getNodeId().toString());
-
- // Add it to allContainers list.
- newlyAllocatedContainers.add(container);
- liveContainers.put(c.getId(), container);
+ Container container) {
+
+ // Required sanity check - AM can call 'allocate' to update resource
+ // request without locking the scheduler, hence we need to check
+ if (getTotalRequiredResources(priority) <= 0) {
+ return null;
}
- appSchedulingInfo.allocate(type, node, priority,
- request, allocatedContainers);
+ // Create RMContainer
+ RMContainer rmContainer = new RMContainerImpl(container, this
+ .getApplicationAttemptId(), node.getNodeID(), this.rmContext
+ .getDispatcher().getEventHandler(), this.rmContext
+ .getContainerAllocationExpirer());
+
+ // Update consumption and track allocations
+
+ // Inform the container
+ rmContainer.handle(
+ new RMContainerEvent(container.getId(), RMContainerEventType.START));
+
+ Resources.addTo(currentConsumption, container.getResource());
+ LOG.debug("allocate: applicationId=" + container.getId().getAppId()
+ + " container=" + container.getId() + " host="
+ + container.getNodeId().toString());
+
+ // Add it to allContainers list.
+ newlyAllocatedContainers.add(rmContainer);
+ liveContainers.put(container.getId(), rmContainer);
+
+ appSchedulingInfo.allocate(type, node, priority, request, container);
+
+ return rmContainer;
}
synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -214,18 +230,150 @@ public class SchedulerApp {
}
}
}
+ // TODO - Remove block
+ for (Priority priority : getPriorities()) {
+ Map<String, ResourceRequest> requests = getResourceRequests(priority);
+ if (requests != null) {
+ LOG.info("showRequests:" + " application=" + getApplicationId() +
+ " headRoom=" + getHeadroom() +
+ " currentConsumption=" + currentConsumption.getMemory());
+ for (ResourceRequest request : requests.values()) {
+ LOG.info("showRequests:" + " application=" + getApplicationId()
+ + " request=" + request);
+ }
+ }
+ }
+
}
public synchronized void setAvailableResourceLimit(Resource globalLimit) {
this.resourceLimit = globalLimit;
}
+ public synchronized RMContainer getRMContainer(ContainerId id) {
+ return liveContainers.get(id);
+ }
+
+ synchronized public void resetSchedulingOpportunities(Priority priority) {
+ Integer schedulingOpportunities = this.schedulingOpportunities
+ .get(priority);
+ schedulingOpportunities = 0;
+ this.schedulingOpportunities.put(priority, schedulingOpportunities);
+ }
+
+ synchronized public void addSchedulingOpportunity(Priority priority) {
+ Integer schedulingOpportunities = this.schedulingOpportunities
+ .get(priority);
+ if (schedulingOpportunities == null) {
+ schedulingOpportunities = 0;
+ }
+ ++schedulingOpportunities;
+ this.schedulingOpportunities.put(priority, schedulingOpportunities);
+ }
+
+ synchronized public int getSchedulingOpportunities(Priority priority) {
+ Integer schedulingOpportunities = this.schedulingOpportunities
+ .get(priority);
+ if (schedulingOpportunities == null) {
+ schedulingOpportunities = 0;
+ this.schedulingOpportunities.put(priority, schedulingOpportunities);
+ }
+ return schedulingOpportunities;
+ }
+
+ public synchronized int getNumReservedContainers(Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ return (reservedContainers == null) ? 0 : reservedContainers.size();
+ }
+
+ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+ RMContainer rmContainer, Container container) {
+ // Create RMContainer if necessary
+ if (rmContainer == null) {
+ rmContainer =
+ new RMContainerImpl(container, getApplicationAttemptId(),
+ node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
+ rmContext.getContainerAllocationExpirer());
+ }
+ rmContainer.handle(new RMContainerReservedEvent(container.getId(),
+ container.getResource(), node.getNodeID(), priority));
+
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers == null) {
+ reservedContainers = new HashMap<NodeId, RMContainer>();
+ this.reservedContainers.put(priority, reservedContainers);
+ }
+ reservedContainers.put(node.getNodeID(), rmContainer);
+
+ Resources.add(currentReservation, container.getResource());
+
+ LOG.info("Application " + getApplicationId()
+ + " reserved container " + rmContainer
+ + " on node " + node + ", currently has " + reservedContainers.size()
+ + " at priority " + priority
+ + "; currentReservation " + currentReservation);
+
+ return rmContainer;
+ }
+
+ public synchronized void unreserve(SchedulerNode node, Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
+ if (reservedContainers.isEmpty()) {
+ this.reservedContainers.remove(priority);
+ }
+
+ Resource resource = reservedContainer.getContainer().getResource();
+ Resources.subtract(currentReservation, resource);
+
+ LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ + node + ", currently has " + reservedContainers.size() + " at priority "
+ + priority + "; currentReservation " + currentReservation);
+ }
+
+ /**
+ * Has the application reserved the given <code>node</code> at the
+ * given <code>priority</code>?
+ * @param node node to be checked
+ * @param priority priority of reserved container
+ * @return
+ */
+ public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers != null) {
+ return reservedContainers.containsKey(node.getNodeID());
+ }
+ return false;
+ }
+
+ public synchronized float getLocalityWaitFactor(
+ Priority priority, int clusterNodes) {
+ // Estimate: Required unique resources (i.e. hosts + racks)
+ int requiredResources =
+ Math.max(this.getResourceRequests(priority).size() - 1, 0);
+ return ((float) requiredResources / clusterNodes);
+ }
+
+ public synchronized List<RMContainer> getAllReservedContainers() {
+ List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+ for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
+ this.reservedContainers.entrySet()) {
+ reservedContainers.addAll(e.getValue().values());
+ }
+ return reservedContainers;
+ }
+
/**
* Get available headroom in terms of resources for the application's user.
* @return available resource headroom
*/
public synchronized Resource getHeadroom() {
Resource limit = Resources.subtract(resourceLimit, currentConsumption);
+ Resources.subtractFrom(limit, currentReservation);
// Corner case to deal with applications being slightly over-limit
if (limit.getMemory() < 0) {
@@ -235,7 +383,8 @@ public class SchedulerApp {
return limit;
}
- public synchronized RMContainer getRMContainer(ContainerId id) {
- return liveContainers.get(id);
+ public Queue getQueue() {
+ return queue;
}
+
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Wed Aug 3 11:51:20 2011
@@ -7,14 +7,17 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class SchedulerNode {
@@ -29,9 +32,11 @@ public class SchedulerNode {
private volatile int numContainers;
+ private RMContainer reservedContainer;
+
/* set of containers that are allocated containers */
- private final Map<ContainerId, Container> launchedContainers =
- new TreeMap<ContainerId, Container>();
+ private final Map<ContainerId, RMContainer> launchedContainers =
+ new TreeMap<ContainerId, RMContainer>();
private final RMNode rmNode;
@@ -70,11 +75,12 @@ public class SchedulerNode {
* @param containers allocated containers
*/
public synchronized void allocateContainer(ApplicationId applicationId,
- Container container) {
+ RMContainer rmContainer) {
+ Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
- launchedContainers.put(container.getId(), container);
+ launchedContainers.put(container.getId(), rmContainer);
LOG.info("Allocated container " + container.getId() +
" to node " + rmNode.getNodeAddress());
@@ -115,7 +121,6 @@ public class SchedulerNode {
}
/* remove the containers from the nodemanger */
-
launchedContainers.remove(container.getId());
updateResource(container);
@@ -157,7 +162,63 @@ public class SchedulerNode {
return numContainers;
}
- public synchronized List<Container> getRunningContainers() {
- return new ArrayList<Container>(launchedContainers.values());
+ public synchronized List<RMContainer> getRunningContainers() {
+ return new ArrayList<RMContainer>(launchedContainers.values());
+ }
+
+ public synchronized void reserveResource(
+ SchedulerApp application, Priority priority,
+ RMContainer reservedContainer) {
+ // Check if it's already reserved
+ if (this.reservedContainer != null) {
+ // Sanity check
+ if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+ throw new IllegalStateException("Trying to reserve" +
+ " container " + reservedContainer +
+ " on node " + reservedContainer.getReservedNode() +
+ " when currently" + " reserved resource " + this.reservedContainer +
+ " on node " + this.reservedContainer.getReservedNode());
+ }
+
+ // Cannot reserve more than one application on a given node!
+ if (!this.reservedContainer.getContainer().getId().getAppAttemptId().equals(
+ reservedContainer.getContainer().getId().getAppAttemptId())) {
+ throw new IllegalStateException("Trying to reserve" +
+ " container " + reservedContainer +
+ " for application " + application.getApplicationId() +
+ " when currently" +
+ " reserved container " + this.reservedContainer +
+ " on node " + this);
+ }
+
+ LOG.info("Updated reserved container " +
+ reservedContainer.getContainer().getId() + " on node " +
+ this + " for application " + application);
+ } else {
+ LOG.info("Reserved container " + reservedContainer.getContainer().getId() +
+ " on node " + this + " for application " + application);
+ }
+ this.reservedContainer = reservedContainer;
+ }
+
+ public synchronized void unreserveResource(SchedulerApp application) {
+ // Cannot unreserve for wrong application...
+ ApplicationAttemptId reservedApplication =
+ reservedContainer.getContainer().getId().getAppAttemptId();
+ if (!reservedApplication.equals(
+ application.getApplicationAttemptId())) {
+ throw new IllegalStateException("Trying to unreserve " +
+ " for application " + application.getApplicationId() +
+ " when currently reserved " +
+ " for application " + reservedApplication.getApplicationId() +
+ " on node " + this);
+ }
+
+ reservedContainer = null;
+ }
+
+ public synchronized RMContainer getReservedContainer() {
+ return reservedContainer;
}
+
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1153453&r1=1153452&r2=1153453&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Aug 3 11:51:20 2011
@@ -20,14 +20,10 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@@ -38,14 +34,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,14 +53,18 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -97,10 +96,10 @@ implements ResourceScheduler, CapacitySc
}
};
- private final Comparator<CSApp> applicationComparator =
- new Comparator<CSApp>() {
+ private final Comparator<SchedulerApp> applicationComparator =
+ new Comparator<SchedulerApp>() {
@Override
- public int compare(CSApp a1, CSApp a2) {
+ public int compare(SchedulerApp a1, SchedulerApp a2) {
return a1.getApplicationId().getId() - a2.getApplicationId().getId();
}
};
@@ -111,7 +110,8 @@ implements ResourceScheduler, CapacitySc
private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
- private Map<NodeId, CSNode> csNodes = new ConcurrentHashMap<NodeId, CSNode>();
+ private Map<NodeId, SchedulerNode> nodes =
+ new ConcurrentHashMap<NodeId, SchedulerNode>();
private Resource clusterResource =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
@@ -120,8 +120,8 @@ implements ResourceScheduler, CapacitySc
private Resource minimumAllocation;
private Resource maximumAllocation;
- private Map<ApplicationAttemptId, CSApp> applications = Collections
- .synchronizedMap(new TreeMap<ApplicationAttemptId, CSApp>());
+ private Map<ApplicationAttemptId, SchedulerApp> applications =
+ new ConcurrentHashMap<ApplicationAttemptId, SchedulerApp>();
private boolean initialized = false;
@@ -150,11 +150,11 @@ implements ResourceScheduler, CapacitySc
}
public synchronized Resource getUsedResource(NodeId nodeId) {
- return csNodes.get(nodeId).getUsedResource();
+ return nodes.get(nodeId).getUsedResource();
}
public synchronized Resource getAvailableResource(NodeId nodeId) {
- return csNodes.get(nodeId).getAvailableResource();
+ return nodes.get(nodeId).getAvailableResource();
}
public synchronized int getNumClusterNodes() {
@@ -316,19 +316,20 @@ implements ResourceScheduler, CapacitySc
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
applicationAttemptId, queueName, user, null);
- CSApp csApp = new CSApp(appSchedulingInfo, queue);
+ SchedulerApp SchedulerApp =
+ new SchedulerApp(this.rmContext, appSchedulingInfo, queue);
// Submit to the queue
try {
- queue.submitApplication(csApp, user, queueName);
+ queue.submitApplication(SchedulerApp, user, queueName);
} catch (AccessControlException ace) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, StringUtils
- .stringifyException(ace)));
+ new RMAppAttemptRejectedEvent(applicationAttemptId,
+ ace.toString()));
return;
}
- applications.put(applicationAttemptId, csApp);
+ applications.put(applicationAttemptId, SchedulerApp);
LOG.info("Application Submission: " + applicationAttemptId +
", user: " + user +
@@ -346,7 +347,7 @@ implements ResourceScheduler, CapacitySc
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- CSApp application = getApplication(applicationAttemptId);
+ SchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
// throw new IOException("Unknown application " + applicationId +
@@ -356,10 +357,14 @@ implements ResourceScheduler, CapacitySc
}
// Release all the running containers
- processReleasedContainers(application, application.getCurrentContainers());
+ for (RMContainer rmContainer : application.getLiveContainers()) {
+ completedContainer(rmContainer, RMContainerEventType.KILL);
+ }
// Release all reserved containers
- releaseReservedContainers(application);
+ for (RMContainer rmContainer : application.getAllReservedContainers()) {
+ completedContainer(rmContainer, RMContainerEventType.KILL);
+ }
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);
@@ -386,7 +391,7 @@ implements ResourceScheduler, CapacitySc
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<Container> release) {
- CSApp application = getApplication(applicationAttemptId);
+ SchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@@ -396,6 +401,12 @@ implements ResourceScheduler, CapacitySc
// Sanity check
normalizeRequests(ask);
+ // Release containers
+ for (Container releasedContainer : release) {
+ completedContainer(getRMContainer(releasedContainer),
+ RMContainerEventType.RELEASED);
+ }
+
synchronized (application) {
LOG.info("DEBUG --- allocate: pre-update" +
@@ -465,154 +476,82 @@ implements ResourceScheduler, CapacitySc
minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
}
- @Lock(CapacityScheduler.class)
- private List<Container> getCompletedContainers(
- Map<String, List<Container>> allContainers) {
- if (allContainers == null) {
- return new ArrayList<Container>();
- }
- List<Container> completedContainers = new ArrayList<Container>();
- // Iterate through the running containers and update their status
- for (Map.Entry<String, List<Container>> e :
- allContainers.entrySet()) {
- for (Container c: e.getValue()) {
- if (c.getState() == ContainerState.COMPLETE) {
- completedContainers.add(c);
- }
- }
- }
- return completedContainers;
- }
-
private synchronized void nodeUpdate(RMNode nm,
Map<String,List<Container>> containers ) {
LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
- SchedulerNode node = this.csNodes.get(nm.getNodeID());
- //TODO node.statusUpdate(containers);
-
- // Completed containers
- processCompletedContainers(getCompletedContainers(containers));
+
+ SchedulerNode node = getNode(nm.getNodeID());
+ for (List<Container> appContainers : containers.values()) {
+ for (Container container : appContainers) {
+ if (container.getState() == ContainerState.RUNNING) {
+ containerLaunchedOnNode(container, node);
+ } else { // has to be 'COMPLETE'
+ LOG.info("DEBUG --- Container FINISHED: " + container.getId());
+ completedContainer(getRMContainer(container),
+ RMContainerEventType.FINISHED);
+ }
+ }
+ }
- // Assign new containers
+ // Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
- CSNode csNode = this.csNodes.get(nm.getNodeID());
-
- CSApp reservedApplication = csNode.getReservedApplication();
- if (reservedApplication != null) {
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+ SchedulerApp reservedApplication =
+ getApplication(reservedContainer.getApplicationAttemptId());
+
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " + nm);
- LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
- Resource released = queue.assignContainers(clusterResource, csNode);
- // Is the reservation necessary? If not, release the reservation
- if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
- released, org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
- queue.completedContainer(clusterResource, null, released, reservedApplication);
- }
+ LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
+ queue.assignContainers(clusterResource, node);
}
// Try to schedule more if there are no reservations to fulfill
- if (csNode.getReservedApplication() == null) {
- root.assignContainers(clusterResource, csNode);
+ if (node.getReservedContainer() == null) {
+ root.assignContainers(clusterResource, node);
} else {
LOG.info("Skipping scheduling since node " + nm +
" is reserved by application " +
- csNode.getReservedApplication().getApplicationId());
- }
-
- }
-
- @Lock(CapacityScheduler.class)
- private void killRunningContainers(List<Container> containers) {
- for (Container container : containers) {
- container.setState(ContainerState.COMPLETE);
- LOG.info("Killing running container " + container.getId());
- CSApp application = applications.get(container.getId().getAppId());
- processReleasedContainers(application, Collections.singletonList(container));
- }
- }
-
- @Lock(Lock.NoLock.class)
- private void processCompletedContainers(
- List<Container> completedContainers) {
- for (Container container: completedContainers) {
- processSingleCompletedContainer(container);
- }
- }
-
- private void processSingleCompletedContainer(Container container) {
- CSApp application = getApplication(container.getId().getAppAttemptId());
-
- // this is possible, since an application can be removed from scheduler
- // but the nodemanger is just updating about a completed container.
- if (application != null) {
-
- // Inform the queue
- LeafQueue queue = (LeafQueue)application.getQueue();
- queue.completedContainer(clusterResource, container,
- container.getResource(), application);
- }
- }
-
- @Lock(Lock.NoLock.class)
- private synchronized void processReleasedContainers(CSApp application,
- List<Container> releasedContainers) {
-
- // Inform clusterTracker
- List<Container> unusedContainers = new ArrayList<Container>();
- for (Container container : releasedContainers) {
- if (releaseContainer(
- application.getApplicationId(),
- container)) {
- unusedContainers.add(container);
- }
+ node.getReservedContainer().getContainerId().getAppId());
}
- // Update queue capacities
- processCompletedContainers(unusedContainers);
}
- @Lock(CapacityScheduler.class)
- private void releaseReservedContainers(CSApp application) {
- LOG.info("Releasing reservations for completed application: " +
- application.getApplicationId());
- Queue queue = queues.get(application.getQueue().getQueueName());
- Map<Priority, Set<CSNode>> reservations = application.getAllReservations();
- for (Map.Entry<Priority, Set<CSNode>> e : reservations.entrySet()) {
- Priority priority = e.getKey();
- Set<CSNode> reservedNodes = new HashSet<CSNode>(e.getValue());
- for (CSNode node : reservedNodes) {
- Resource allocatedResource =
- application.getResourceRequest(priority, SchedulerNode.ANY).getCapability();
-
- application.unreserveResource(node, priority);
- node.unreserveResource(application, priority);
-
- queue.completedContainer(clusterResource, null, allocatedResource, application);
- }
+ private void containerLaunchedOnNode(Container container, SchedulerNode node) {
+ // Get the application for the finished container
+ ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+ SchedulerApp application = getApplication(applicationAttemptId);
+ if (application == null) {
+ LOG.info("Unknown application: " + applicationAttemptId +
+ " launched container " + container.getId() +
+ " on node: " + node);
+ return;
}
- }
-
- @Lock(Lock.NoLock.class)
- private CSApp getApplication(ApplicationAttemptId applicationAttemptId) {
- return applications.get(applicationAttemptId);
+
+ application.containerLaunchedOnNode(container.getId());
}
@Override
- public synchronized void handle(SchedulerEvent event) {
+ public void handle(SchedulerEvent event) {
switch(event.getType()) {
case NODE_ADDED:
+ {
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
- break;
+ }
+ break;
case NODE_REMOVED:
+ {
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
removeNode(nodeRemovedEvent.getRemovedRMNode());
- break;
+ }
+ break;
case NODE_UPDATE:
+ {
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
@@ -620,24 +559,37 @@ implements ResourceScheduler, CapacitySc
conts.put(entry.getKey().toString(), entry.getValue());
}
nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
- break;
+ }
+ break;
case APP_ADDED:
+ {
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
.getQueue(), appAddedEvent.getUser());
- break;
+ }
+ break;
case APP_REMOVED:
+ {
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
doneApplication(appRemovedEvent.getApplicationAttemptID(),
appRemovedEvent.getFinalAttemptState());
- break;
+ }
+ break;
+ case CONTAINER_EXPIRED:
+ {
+ ContainerExpiredSchedulerEvent containerExpiredEvent =
+ (ContainerExpiredSchedulerEvent) event;
+ completedContainer(getRMContainer(containerExpiredEvent.getContainer()),
+ RMContainerEventType.EXPIRE);
+ }
+ break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
private synchronized void addNode(RMNode nodeManager) {
- this.csNodes.put(nodeManager.getNodeID(), new CSNode(nodeManager));
+ this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
++numNodeManagers;
LOG.info("Added node " + nodeManager.getNodeAddress() +
@@ -645,43 +597,86 @@ implements ResourceScheduler, CapacitySc
}
private synchronized void removeNode(RMNode nodeInfo) {
- CSNode csNode = this.csNodes.remove(nodeInfo.getNodeID());
+ SchedulerNode node = this.nodes.remove(nodeInfo.getNodeID());
Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
--numNodeManagers;
// Remove running containers
- List<Container> runningContainers = null;//TODO = nodeInfo.getRunningContainers();
- killRunningContainers(runningContainers);
+ List<RMContainer> runningContainers = node.getRunningContainers();
+ for (RMContainer container : runningContainers) {
+ completedContainer(container, RMContainerEventType.KILL);
+ }
// Remove reservations, if any
- CSApp reservedApplication = csNode.getReservedApplication();
- if (reservedApplication != null) {
- LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
- Resource released = csNode.getReservedResource();
- queue.completedContainer(clusterResource, null, released, reservedApplication);
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+ completedContainer(reservedContainer, RMContainerEventType.KILL);
}
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
}
- private synchronized boolean releaseContainer(ApplicationId applicationId,
- Container container) {
- // Reap containers
- LOG.info("Application " + applicationId + " released container " + container);
- csNodes.get(container.getNodeId()).releaseContainer(container);
- return true;
+ @Lock(CapacityScheduler.class)
+ private synchronized void completedContainer(RMContainer rmContainer,
+ RMContainerEventType event) {
+ if (rmContainer == null) {
+ LOG.info("Null container completed...");
+ return;
+ }
+
+ Container container = rmContainer.getContainer();
+
+ // Get the application for the finished container
+ ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+ SchedulerApp application = getApplication(applicationAttemptId);
+ if (application == null) {
+ LOG.info("Container " + container + " of" +
+ " unknown application " + applicationAttemptId +
+ " completed with event " + event);
+ return;
+ }
+
+ // Get the node on which the container was allocated
+ SchedulerNode node = getNode(container.getNodeId());
+
+ // Inform the queue
+ LeafQueue queue = (LeafQueue)application.getQueue();
+ queue.completedContainer(clusterResource, application, node,
+ rmContainer, event);
+
+ LOG.info("Application " + applicationAttemptId +
+ " released container " + container.getId() +
+ " on node: " + node +
+ " with event: " + event);
+ }
+
+ @Lock(Lock.NoLock.class)
+ private SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
+ return applications.get(applicationAttemptId);
+ }
+
+ @Lock(Lock.NoLock.class)
+ private SchedulerNode getNode(NodeId nodeId) {
+ return nodes.get(nodeId);
+ }
+
+ private RMContainer getRMContainer(Container container) {
+ ContainerId containerId = container.getId();
+ SchedulerApp application =
+ getApplication(container.getId().getAppAttemptId());
+ return (application == null) ? null : application.getRMContainer(containerId);
}
@Override
@Lock(Lock.NoLock.class)
public void recover(RMState state) throws Exception {
- // TODO: VINDOKVFIXME
+ // TODO: VINDOKVFIXME recovery
// applications.clear();
// for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {
// ApplicationId appId = entry.getKey();
// ApplicationInfo appInfo = entry.getValue();
-// CSApp app = applications.get(appId);
+// SchedulerApp app = applications.get(appId);
// app.allocate(appInfo.getContainers());
// for (Container c: entry.getValue().getContainers()) {
// Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());
@@ -692,7 +687,9 @@ implements ResourceScheduler, CapacitySc
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
- // TODO Auto-generated method stub
- return null;
+ SchedulerNode node = getNode(nodeId);
+ return new SchedulerNodeReport(
+ node.getUsedResource(), node.getNumContainers());
}
+
}
|