Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1156037&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
(added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
Wed Aug 10 06:03:11 2011
@@ -0,0 +1,378 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestParentQueue {
+
+ private static final Log LOG = LogFactory.getLog(TestParentQueue.class);
+
+ RMContext rmContext;
+ CapacitySchedulerConfiguration csConf;
+ CapacitySchedulerContext csContext;
+
+ final static int GB = 1024;
+ final static String DEFAULT_RACK = "/default";
+
+ @Before
+ public void setUp() throws Exception {
+ rmContext = TestUtils.getMockRMContext();
+ csConf = new CapacitySchedulerConfiguration();
+
+ csContext = mock(CapacitySchedulerContext.class);
+ when(csContext.getConfiguration()).thenReturn(csConf);
+ when(csContext.getMinimumResourceCapability()).thenReturn(
+ Resources.createResource(GB));
+ when(csContext.getMaximumResourceCapability()).thenReturn(
+ Resources.createResource(16*GB));
+ }
+
+ private static final String A = "a";
+ private static final String B = "b";
+ private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
+
+ // Define top-level queues
+ conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B});
+ conf.setCapacity(CapacityScheduler.ROOT, 100);
+
+ final String Q_A = CapacityScheduler.ROOT + "." + A;
+ conf.setCapacity(Q_A, 30);
+
+ final String Q_B = CapacityScheduler.ROOT + "." + B;
+ conf.setCapacity(Q_B, 70);
+
+ LOG.info("Setup top-level queues a and b");
+ }
+
+ private void stubQueueAllocation(final Queue queue,
+ final Resource clusterResource, final SchedulerNode node,
+ final int allocation) {
+
+ // Simulate the queue allocation
+ doAnswer(new Answer<Resource>() {
+ @Override
+ public Resource answer(InvocationOnMock invocation) throws Throwable {
+ try {
+ throw new Exception();
+ } catch (Exception e) {
+ LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() +
+ " alloc=" + allocation + " node=" + node.getHostName());
+ }
+ final Resource allocatedResource = Resources.createResource(allocation);
+ if (queue instanceof ParentQueue) {
+ ((ParentQueue)queue).allocateResource(clusterResource,
+ allocatedResource);
+ } else {
+ ((LeafQueue)queue).allocateResource(clusterResource, "",
+ allocatedResource);
+ }
+
+ // Next call - nothing
+ if (allocation > 0) {
+ doReturn(Resources.none()).when(queue).assignContainers(
+ eq(clusterResource), eq(node));
+
+ // Mock the node's resource availability
+ Resource available = node.getAvailableResource();
+ doReturn(Resources.subtractFrom(available, allocatedResource)).
+ when(node).getAvailableResource();
+ }
+
+ return allocatedResource;
+ }
+ }).
+ when(queue).assignContainers(eq(clusterResource), eq(node));
+ }
+
+ private float computeQueueUtilization(Queue queue,
+ int expectedMemory, Resource clusterResource) {
+ return (expectedMemory /
+ (clusterResource.getMemory() * queue.getAbsoluteCapacity()));
+ }
+
+ @Test
+ public void testSingleLevelQueues() throws Exception {
+ // Setup queue configs
+ setupSingleLevelQueues(csConf);
+
+ Map<String, Queue> queues = new HashMap<String, Queue>();
+ Queue root =
+ CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacityScheduler.ROOT, queues, queues,
+ CapacityScheduler.queueComparator,
+ CapacityScheduler.applicationComparator,
+ TestUtils.spyHook);
+
+ // Setup some nodes
+ final int memoryPerNode = 10;
+ final int numNodes = 2;
+
+ SchedulerNode node_0 =
+ TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
+ SchedulerNode node_1 =
+ TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
+
+ final Resource clusterResource =
+ Resources.createResource(numNodes * (memoryPerNode*GB));
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Start testing
+ LeafQueue a = (LeafQueue)queues.get(A);
+ LeafQueue b = (LeafQueue)queues.get(B);
+ final float delta = 0.0001f;
+
+ // Simulate B returning a container on node_0
+ stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 1*GB);
+ root.assignContainers(clusterResource, node_0);
+ assertEquals(0.0f, a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 1*GB, clusterResource),
+ b.getUtilization(), delta);
+
+ // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
+ stubQueueAllocation(a, clusterResource, node_1, 2*GB);
+ stubQueueAllocation(b, clusterResource, node_1, 1*GB);
+ root.assignContainers(clusterResource, node_1);
+ InOrder allocationOrder = inOrder(a, b);
+ allocationOrder.verify(a).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ assertEquals(computeQueueUtilization(a, 2*GB, clusterResource),
+ a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 2*GB, clusterResource),
+ b.getUtilization(), delta);
+
+ // Now, B should get the scheduling opportunity
+ // since A has 2/6G while B has 2/14G
+ stubQueueAllocation(a, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 2*GB);
+ root.assignContainers(clusterResource, node_0);
+ allocationOrder = inOrder(b, a);
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(a).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
+ a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 4*GB, clusterResource),
+ b.getUtilization(), delta);
+
+ // Now, B should still get the scheduling opportunity
+ // since A has 3/6G while B has 4/14G
+ stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 4*GB);
+ root.assignContainers(clusterResource, node_0);
+ allocationOrder = inOrder(b, a);
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(a).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
+ a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 8*GB, clusterResource),
+ b.getUtilization(), delta);
+
+ // Now, A should get the scheduling opportunity
+ // since A has 3/6G while B has 8/14G
+ stubQueueAllocation(a, clusterResource, node_1, 1*GB);
+ stubQueueAllocation(b, clusterResource, node_1, 1*GB);
+ root.assignContainers(clusterResource, node_1);
+ allocationOrder = inOrder(a, b);
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(a).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ assertEquals(computeQueueUtilization(a, 4*GB, clusterResource),
+ a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 9*GB, clusterResource),
+ b.getUtilization(), delta);
+ }
+
+ private static final String C = "c";
+ private static final String D = "d";
+ private static final String A1 = "a1";
+ private static final String A2 = "a2";
+ private static final String B1 = "b1";
+ private static final String B2 = "b2";
+ private static final String B3 = "b3";
+
+ private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
+
+ // Define top-level queues
+ conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B, C, D});
+ conf.setCapacity(CapacityScheduler.ROOT, 100);
+
+ final String Q_A = CapacityScheduler.ROOT + "." + A;
+ conf.setCapacity(Q_A, 10);
+
+ final String Q_B = CapacityScheduler.ROOT + "." + B;
+ conf.setCapacity(Q_B, 50);
+
+ final String Q_C = CapacityScheduler.ROOT + "." + C;
+ conf.setCapacity(Q_C, 20);
+
+ final String Q_D = CapacityScheduler.ROOT + "." + D;
+ conf.setCapacity(Q_D, 20);
+
+ // Define 2-nd level queues
+ conf.setQueues(Q_A, new String[] {A1, A2});
+ conf.setCapacity(Q_A + "." + A1, 50);
+ conf.setCapacity(Q_A + "." + A2, 50);
+
+ conf.setQueues(Q_B, new String[] {B1, B2, B3});
+ conf.setCapacity(Q_B + "." + B1, 10);
+ conf.setCapacity(Q_B + "." + B2, 20);
+ conf.setCapacity(Q_B + "." + B3, 70);
+ }
+
+
+
+ @Test
+ public void testMultiLevelQueues() throws Exception {
+ // Setup queue configs
+ setupMultiLevelQueues(csConf);
+
+ Map<String, Queue> queues = new HashMap<String, Queue>();
+ Queue root =
+ CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacityScheduler.ROOT, queues, queues,
+ CapacityScheduler.queueComparator,
+ CapacityScheduler.applicationComparator,
+ TestUtils.spyHook);
+
+ // Setup some nodes
+ final int memoryPerNode = 10;
+ final int numNodes = 3;
+
+ SchedulerNode node_0 =
+ TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
+ SchedulerNode node_1 =
+ TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
+ SchedulerNode node_2 =
+ TestUtils.getMockNode("host_2", DEFAULT_RACK, 0, memoryPerNode*GB);
+
+ final Resource clusterResource =
+ Resources.createResource(numNodes * (memoryPerNode*GB));
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Start testing
+ Queue a = queues.get(A);
+ Queue b = queues.get(B);
+ Queue c = queues.get(C);
+ Queue d = queues.get(D);
+
+ Queue a1 = queues.get(A1);
+ Queue a2 = queues.get(A2);
+
+ Queue b1 = queues.get(B1);
+ Queue b2 = queues.get(B2);
+ Queue b3 = queues.get(B3);
+
+ final float delta = 0.0001f;
+
+ // Simulate C returning a container on node_0
+ stubQueueAllocation(a, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(b, clusterResource, node_0, 0*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(d, clusterResource, node_0, 0*GB);
+ root.assignContainers(clusterResource, node_0);
+ assertEquals(computeQueueUtilization(a, 0*GB, clusterResource),
+ a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 0*GB, clusterResource),
+ b.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(c, 1*GB, clusterResource),
+ c.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(d, 0*GB, clusterResource),
+ d.getUtilization(), delta);
+ reset(a); reset(b); reset(c);
+
+ // Now get B2 to allocate
+ // A = 0/3, B = 0/15, C = 1/6, D=0/6
+ stubQueueAllocation(a, clusterResource, node_1, 0*GB);
+ stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
+ stubQueueAllocation(c, clusterResource, node_1, 0*GB);
+ root.assignContainers(clusterResource, node_1);
+ assertEquals(computeQueueUtilization(a, 0*GB, clusterResource),
+ a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 4*GB, clusterResource),
+ b.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(c, 1*GB, clusterResource),
+ c.getUtilization(), delta);
+ reset(a); reset(b); reset(c);
+
+ // Now get both A1, C & B3 to allocate in right order
+ // A = 0/3, B = 4/15, C = 1/6, D=0/6
+ stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
+ stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
+ stubQueueAllocation(c, clusterResource, node_0, 2*GB);
+ root.assignContainers(clusterResource, node_0);
+ InOrder allocationOrder = inOrder(a, c, b);
+ allocationOrder.verify(a).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(c).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ assertEquals(computeQueueUtilization(a, 1*GB, clusterResource),
+ a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 6*GB, clusterResource),
+ b.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(c, 3*GB, clusterResource),
+ c.getUtilization(), delta);
+ reset(a); reset(b); reset(c);
+
+ // Now verify max-capacity
+ // A = 1/3, B = 6/15, C = 3/6, D=0/6
+ // Ensure a1 won't alloc above max-cap although it should get
+ // scheduling opportunity now, right after a2
+ LOG.info("here");
+ ((ParentQueue)a).setMaxCapacity(.1f); // a should be capped at 3/30
+ stubQueueAllocation(a1, clusterResource, node_2, 1*GB); // shouldn't be
+ // allocated due
+ // to max-cap
+ stubQueueAllocation(a2, clusterResource, node_2, 2*GB);
+ stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
+ stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
+ stubQueueAllocation(c, clusterResource, node_2, 1*GB);
+ root.assignContainers(clusterResource, node_2);
+ allocationOrder = inOrder(a, a2, a1, b, c);
+ allocationOrder.verify(a).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(a2).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ allocationOrder.verify(c).assignContainers(eq(clusterResource),
+ any(SchedulerNode.class));
+ assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
+ a.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(b, 8*GB, clusterResource),
+ b.getUtilization(), delta);
+ assertEquals(computeQueueUtilization(c, 4*GB, clusterResource),
+ c.getUtilization(), delta);
+ reset(a); reset(b); reset(c);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java?rev=1156037&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
(added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
Wed Aug 10 06:03:11 2011
@@ -0,0 +1,146 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.mockito.Mockito.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+public class TestUtils {
+ private static final Log LOG = LogFactory.getLog(TestUtils.class);
+
+ /**
+ * Get a mock {@link RMContext} for use in test cases.
+ * @return a mock {@link RMContext} for use in test cases
+ */
+ @SuppressWarnings("rawtypes")
+ public static RMContext getMockRMContext() {
+ // Null dispatcher
+ Dispatcher nullDispatcher = new Dispatcher() {
+ private final EventHandler handler =
+ new EventHandler() {
+ @Override
+ public void handle(Event event) {
+ }
+ };
+ @Override
+ public void register(Class<? extends Enum> eventType,
+ EventHandler handler) {
+ }
+ @Override
+ public EventHandler getEventHandler() {
+ return handler;
+ }
+ };
+
+ // No op
+ ContainerAllocationExpirer cae =
+ new ContainerAllocationExpirer(nullDispatcher);
+
+ RMContext rmContext =
+ new RMContextImpl(null, nullDispatcher, cae, null);
+
+ return rmContext;
+ }
+
+ /**
+ * Hook to spy on queues.
+ */
+ static class SpyHook extends CapacityScheduler.QueueHook {
+ @Override
+ public Queue hook(Queue queue) {
+ return spy(queue);
+ }
+ }
+ public static SpyHook spyHook = new SpyHook();
+
+ private static final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ public static Priority createMockPriority( int priority) {
+// Priority p = mock(Priority.class);
+// when(p.getPriority()).thenReturn(priority);
+ Priority p = recordFactory.newRecordInstance(Priority.class);
+ p.setPriority(priority);
+ return p;
+ }
+
+ public static ResourceRequest createResourceRequest(
+ String hostName, int memory, int numContainers, Priority priority,
+ RecordFactory recordFactory) {
+ ResourceRequest request =
+ recordFactory.newRecordInstance(ResourceRequest.class);
+ Resource capability = Resources.createResource(memory);
+
+ request.setNumContainers(numContainers);
+ request.setHostName(hostName);
+ request.setCapability(capability);
+ request.setPriority(priority);
+ return request;
+ }
+
+ public static ApplicationAttemptId
+ getMockApplicationAttemptId(int appId, int attemptId) {
+ ApplicationId applicationId = mock(ApplicationId.class);
+ when(applicationId.getClusterTimestamp()).thenReturn(0L);
+ when(applicationId.getId()).thenReturn(appId);
+ ApplicationAttemptId applicationAttemptId = mock(ApplicationAttemptId.class);
+ when(applicationAttemptId.getApplicationId()).thenReturn(applicationId);
+ when(applicationAttemptId.getAttemptId()).thenReturn(attemptId);
+ return applicationAttemptId;
+ }
+
+ public static SchedulerNode getMockNode(
+ String host, String rack, int port, int capability) {
+ NodeId nodeId = mock(NodeId.class);
+ when(nodeId.getHost()).thenReturn(host);
+ when(nodeId.getPort()).thenReturn(port);
+ RMNode rmNode = mock(RMNode.class);
+ when(rmNode.getNodeID()).thenReturn(nodeId);
+ when(rmNode.getTotalCapability()).thenReturn(
+ Resources.createResource(capability));
+ when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
+ when(rmNode.getHostName()).thenReturn(host);
+ when(rmNode.getRackName()).thenReturn(rack);
+
+ SchedulerNode node = spy(new SchedulerNode(rmNode));
+ LOG.info("node = " + host + " avail=" + node.getAvailableResource());
+ return node;
+ }
+
+ public static ContainerId getMockContainerId(SchedulerApp application) {
+ ContainerId containerId = mock(ContainerId.class);
+ doReturn(application.getApplicationAttemptId()).when(containerId).getAppAttemptId();
+ doReturn(application.getApplicationId()).when(containerId).getAppId();
+ doReturn(application.getNewContainerId()).when(containerId).getId();
+ return containerId;
+ }
+
+ public static Container getMockContainer(
+ ContainerId containerId, NodeId nodeId, Resource resource) {
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(containerId);
+ when(container.getNodeId()).thenReturn(nodeId);
+ when(container.getResource()).thenReturn(resource);
+ return container;
+ }
+}
|