Author: vinodkv
Date: Wed Aug 3 11:36:27 2011
New Revision: 1153435
URL: http://svn.apache.org/viewvc?rev=1153435&view=rev
Log:
Introduced MockAM and MockNM in RM for better testability.
Added:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
Modified:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1153435&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
(added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
Wed Aug 3 11:36:27 2011
@@ -0,0 +1,125 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Records;
+
+public class MockAM {
+
+ private volatile int responseId = 0;
+ private final ApplicationAttemptId attemptId;
+ private final RMContext context;
+ private final AMRMProtocol amRMProtocol;
+
+ MockAM(RMContext context, AMRMProtocol amRMProtocol,
+ ApplicationAttemptId attemptId) {
+ this.context = context;
+ this.amRMProtocol = amRMProtocol;
+ this.attemptId = attemptId;
+ }
+
+ public void waitForState(RMAppAttemptState finalState) throws Exception {
+ RMApp app = context.getRMApps().get(attemptId.getApplicationId());
+ RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
+ int timeoutSecs = 0;
+ while (!finalState.equals(attempt.getAppAttemptState())
+ && timeoutSecs++ < 20) {
+ System.out
+ .println("AppAttempt State is : " + attempt.getAppAttemptState()
+ + " Waiting for state : " + finalState);
+ Thread.sleep(500);
+ }
+ System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
+ Assert.assertEquals("AppAttempt state is not correct (timedout)",
+ finalState, attempt.getAppAttemptState());
+ }
+
+ public void registerAppAttempt() throws Exception {
+ waitForState(RMAppAttemptState.LAUNCHED);
+ responseId = 0;
+ RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class);
+ req.setApplicationAttemptId(attemptId);
+ req.setHost("");
+ req.setRpcPort(1);
+ req.setTrackingUrl("");
+ amRMProtocol.registerApplicationMaster(req);
+ }
+
+ public List<Container> allocate(
+ String host, int memory, int numContainers,
+ List<ContainerId> releases) throws Exception {
+ List reqs = createReq(host, memory, 1, numContainers);
+ List<Container> toRelease = new ArrayList<Container>();
+ for (ContainerId id : releases) {
+ Container cont = Records.newRecord(Container.class);
+ cont.setId(id);
+ //TOOD: set all fields
+ }
+ return allocate(toRelease, reqs);
+ }
+
+ private List<ResourceRequest> createReq(String host, int memory, int priority,
+ int containers) throws Exception {
+ ResourceRequest hostReq = createResourceReq(host, memory, priority,
+ containers);
+ ResourceRequest rackReq = createResourceReq("default-rack", memory,
+ priority, containers);
+ ResourceRequest offRackReq = createResourceReq("*", memory, priority,
+ containers);
+ return Arrays.asList(new ResourceRequest[] {hostReq, rackReq, offRackReq});
+
+ }
+ private ResourceRequest createResourceReq(String resource, int memory, int priority,
+ int containers) throws Exception {
+ ResourceRequest req = Records.newRecord(ResourceRequest.class);
+ req.setHostName(resource);
+ req.setNumContainers(containers);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(1);
+ req.setPriority(pri);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(memory);
+ req.setCapability(capability);
+ return req;
+ }
+
+ public List<Container> allocate(
+ List<Container> releases, List<ResourceRequest> resourceRequest)
+ throws Exception {
+ AllocateRequest req = Records.newRecord(AllocateRequest.class);
+ req.setResponseId(++responseId);
+ req.setApplicationAttemptId(attemptId);
+ req.addAllAsks(resourceRequest);
+ req.addAllReleases(releases);
+ AllocateResponse resp = amRMProtocol.allocate(req);
+ return resp.getAMResponse().getContainerList();
+ }
+
+ public void unregisterAppAttempt() throws Exception {
+ waitForState(RMAppAttemptState.RUNNING);
+ FinishApplicationMasterRequest req = Records.newRecord(FinishApplicationMasterRequest.class);
+ req.setAppAttemptId(attemptId);
+ req.setDiagnostics("");
+ req.setFinalState("");
+ req.setTrackingUrl("");
+ amRMProtocol.finishApplicationMaster(req);
+ }
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1153435&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
(added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
Wed Aug 3 11:36:27 2011
@@ -0,0 +1,77 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.Records;
+
+public class MockNM {
+
+ private int responseId;
+ private NodeId nodeId;
+ private final String nodeIdStr;
+ private final int memory;
+ private final ResourceTrackerService resourceTracker;
+
+ MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
+ this.nodeIdStr = nodeIdStr;
+ this.memory = memory;
+ this.resourceTracker = resourceTracker;
+ }
+
+ public void containerStatus(Container container) throws Exception {
+ Map<ApplicationId, List<Container>> conts = new HashMap<ApplicationId,
List<Container>>();
+ conts.put(container.getId().getAppId(), Arrays.asList(new Container[]{}));
+ nodeHeartbeat(conts, true);
+ }
+
+ public NodeId registerNode() throws Exception {
+ String[] splits = nodeIdStr.split(":");
+ nodeId = Records.newRecord(NodeId.class);
+ nodeId.setHost(splits[0]);
+ nodeId.setPort(Integer.parseInt(splits[1]));
+ RegisterNodeManagerRequest req = Records.newRecord(
+ RegisterNodeManagerRequest.class);
+ req.setNodeId(nodeId);
+ req.setHttpPort(2);
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(memory);
+ req.setResource(resource);
+ resourceTracker.registerNodeManager(req);
+ return nodeId;
+ }
+
+ public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
+ return nodeHeartbeat(new HashMap<ApplicationId, List<Container>>(), b);
+ }
+
+ public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
+ List<Container>> conts, boolean isHealthy) throws Exception {
+ NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
+ NodeStatus status = Records.newRecord(NodeStatus.class);
+ status.setNodeId(nodeId);
+ for (Map.Entry<ApplicationId, List<Container>> entry : conts.entrySet())
{
+ status.setContainers(entry.getKey(), entry.getValue());
+ }
+ NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
+ healthStatus.setHealthReport("");
+ healthStatus.setIsNodeHealthy(isHealthy);
+ healthStatus.setLastHealthReportTime(1);
+ status.setNodeHealthStatus(healthStatus);
+ status.setResponseId(++responseId);
+ req.setNodeStatus(status);
+ return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
+ }
+
+}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1153435&r1=1153434&r2=1153435&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
Wed Aug 3 11:36:27 2011
@@ -1,50 +1,28 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.ams.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -52,10 +30,6 @@ import org.apache.log4j.Logger;
public class MockRM extends ResourceManager {
- private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- private Map<NodeId, Integer> responseIds = new HashMap<NodeId, Integer>();
- private Map<ApplicationAttemptId, Integer> AMResponseIds = new HashMap<ApplicationAttemptId,
Integer>();
-
public MockRM() {
this(new Configuration());
}
@@ -82,35 +56,18 @@ public class MockRM extends ResourceMana
finalState, app.getState());
}
- public void waitForState(ApplicationAttemptId attemptId, RMAppAttemptState finalState)
- throws Exception {
- RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
- RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
- int timeoutSecs = 0;
- while (!finalState.equals(attempt.getAppAttemptState())
- && timeoutSecs++ < 20) {
- System.out
- .println("AppAttempt State is : " + attempt.getAppAttemptState()
- + " Waiting for state : " + finalState);
- Thread.sleep(500);
- }
- System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
- Assert.assertEquals("AppAttempt state is not correct (timedout)",
- finalState, attempt.getAppAttemptState());
- }
-
//client
public RMApp submitApp(int masterMemory) throws Exception {
ClientRMProtocol client = getClientRMService();
- GetNewApplicationIdResponse resp = client.getNewApplicationId(recordFactory.newRecordInstance(GetNewApplicationIdRequest.class));
+ GetNewApplicationIdResponse resp = client.getNewApplicationId(Records.newRecord(GetNewApplicationIdRequest.class));
ApplicationId appId = resp.getApplicationId();
- SubmitApplicationRequest req = recordFactory.newRecordInstance(SubmitApplicationRequest.class);
- ApplicationSubmissionContext sub = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ SubmitApplicationRequest req = Records.newRecord(SubmitApplicationRequest.class);
+ ApplicationSubmissionContext sub = Records.newRecord(ApplicationSubmissionContext.class);
sub.setApplicationId(appId);
sub.setApplicationName("");
sub.setUser("");
- Resource capability = recordFactory.newRecordInstance(Resource.class);
+ Resource capability = Records.newRecord(Resource.class);
capability.setMemory(masterMemory);
sub.setMasterCapability(capability);
req.setApplicationSubmissionContext(sub);
@@ -120,152 +77,36 @@ public class MockRM extends ResourceMana
return getRMContext().getRMApps().get(appId);
}
+ public MockNM registerNode(String nodeIdStr, int memory) throws Exception {
+ MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService());
+ nm.registerNode();
+ return nm;
+ }
+
public void killApp(ApplicationId appId) throws Exception {
ClientRMProtocol client = getClientRMService();
- FinishApplicationRequest req = recordFactory.newRecordInstance(FinishApplicationRequest.class);
+ FinishApplicationRequest req = Records.newRecord(FinishApplicationRequest.class);
req.setApplicationId(appId);
client.finishApplication(req);
}
//from AMLauncher
- public void sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception {
- waitForState(appAttemptId, RMAppAttemptState.ALLOCATED);
+ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception {
+ MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
+ am.waitForState(RMAppAttemptState.ALLOCATED);
getRMContext().getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED));
+ return am;
}
+
public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Exception {
- waitForState(appAttemptId, RMAppAttemptState.ALLOCATED);
+ MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
+ am.waitForState(RMAppAttemptState.ALLOCATED);
getRMContext().getDispatcher().getEventHandler().handle(
new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed"));
}
- //from AMS
- public void registerAppAttempt(ApplicationAttemptId attemptId) throws Exception {
- waitForState(attemptId, RMAppAttemptState.LAUNCHED);
- AMResponseIds.put(attemptId, 0);
- RegisterApplicationMasterRequest req = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
- req.setApplicationAttemptId(attemptId);
- req.setHost("");
- req.setRpcPort(1);
- req.setTrackingUrl("");
- masterService.registerApplicationMaster(req);
- }
-
- public List<Container> allocate(ApplicationAttemptId attemptId,
- String host, int memory, int numContainers,
- List<ContainerId> releases) throws Exception {
- List reqs = createReq(host, memory, 1, numContainers);
- List<Container> toRelease = new ArrayList<Container>();
- for (ContainerId id : releases) {
- Container cont = recordFactory.newRecordInstance(Container.class);
- cont.setId(id);
- //TOOD: set all fields
- }
- return allocate(attemptId, toRelease, reqs);
- }
-
- private List<ResourceRequest> createReq(String host, int memory, int priority,
- int containers) throws Exception {
- ResourceRequest hostReq = createResourceReq(host, memory, priority,
- containers);
- ResourceRequest rackReq = createResourceReq("default-rack", memory,
- priority, containers);
- ResourceRequest offRackReq = createResourceReq("*", memory, priority,
- containers);
- return Arrays.asList(new ResourceRequest[] {hostReq, rackReq, offRackReq});
-
- }
- private ResourceRequest createResourceReq(String resource, int memory, int priority,
- int containers) throws Exception {
- ResourceRequest req = recordFactory.newRecordInstance(ResourceRequest.class);
- req.setHostName(resource);
- req.setNumContainers(containers);
- Priority pri = recordFactory.newRecordInstance(Priority.class);
- pri.setPriority(1);
- req.setPriority(pri);
- Resource capability = recordFactory.newRecordInstance(Resource.class);
- capability.setMemory(memory);
- req.setCapability(capability);
- return req;
- }
-
- public List<Container> allocate(ApplicationAttemptId attemptId,
- List<Container> releases, List<ResourceRequest> resourceRequest)
- throws Exception {
- AllocateRequest req = recordFactory.newRecordInstance(AllocateRequest.class);
- int responseId = AMResponseIds.remove(attemptId) + 1;
- AMResponseIds.put(attemptId, responseId);
- req.setResponseId(responseId);
- req.setApplicationAttemptId(attemptId);
- req.addAllAsks(resourceRequest);
- req.addAllReleases(releases);
- AllocateResponse resp = masterService.allocate(req);
- return resp.getAMResponse().getContainerList();
- }
-
- public void unregisterAppAttempt(ApplicationAttemptId attemptId) throws Exception {
- AMResponseIds.remove(attemptId);
- waitForState(attemptId, RMAppAttemptState.RUNNING);
- FinishApplicationMasterRequest req = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
- req.setAppAttemptId(attemptId);
- req.setDiagnostics("");
- req.setFinalState("");
- req.setTrackingUrl("");
- masterService.finishApplicationMaster(req);
- }
-
- //from Node
- public void containerStatus(Container container, NodeId nodeId) throws Exception {
- Map<ApplicationId, List<Container>> conts = new HashMap<ApplicationId,
List<Container>>();
- conts.put(container.getId().getAppId(), Arrays.asList(new Container[]{}));
- nodeHeartbeat(nodeId, conts, true);
- }
-
- public void registerNode(String nodeIdStr, int memory) throws Exception {
- String[] splits = nodeIdStr.split(":");
- NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
- nodeId.setHost(splits[0]);
- nodeId.setPort(Integer.parseInt(splits[1]));
- RegisterNodeManagerRequest req = recordFactory
- .newRecordInstance(RegisterNodeManagerRequest.class);
- req.setNodeId(nodeId);
- req.setHttpPort(2);
- Resource resource = recordFactory.newRecordInstance(Resource.class);
- resource.setMemory(memory);
- req.setResource(resource);
- getResourceTrackerService().registerNodeManager(req);
- responseIds.put(nodeId, 0);
- }
-
- public HeartbeatResponse nodeHeartbeat(String nodeIdStr, boolean b) throws Exception {
- String[] splits = nodeIdStr.split(":");
- NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
- nodeId.setHost(splits[0]);
- nodeId.setPort(Integer.parseInt(splits[1]));
- return nodeHeartbeat(nodeId, new HashMap<ApplicationId, List<Container>>(),
b);
- }
-
- public HeartbeatResponse nodeHeartbeat(NodeId nodeId, Map<ApplicationId,
- List<Container>> conts, boolean isHealthy) throws Exception {
- NodeHeartbeatRequest req = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
- NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
- status.setNodeId(nodeId);
- for (Map.Entry<ApplicationId, List<Container>> entry : conts.entrySet())
{
- status.setContainers(entry.getKey(), entry.getValue());
- }
- NodeHealthStatus healthStatus = recordFactory.newRecordInstance(NodeHealthStatus.class);
- healthStatus.setHealthReport("");
- healthStatus.setIsNodeHealthy(isHealthy);
- healthStatus.setLastHealthReportTime(1);
- status.setNodeHealthStatus(healthStatus);
- int responseId = responseIds.remove(nodeId) + 1;
- responseIds.put(nodeId, responseId);
- status.setResponseId(responseId);
- req.setNodeStatus(status);
- return getResourceTrackerService().nodeHeartbeat(req).getHeartbeatResponse();
- }
-
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), amLivelinessMonitor,
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1153435&r1=1153434&r2=1153435&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
Wed Aug 3 11:36:27 2011
@@ -7,7 +7,6 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -26,29 +25,29 @@ public class TestApplicationCleanup {
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
rm.start();
- NodeId node1 = rm.registerNode("h1", 5000);
+ MockNM nm1 = rm.registerNode("h1:1234", 5000);
RMApp app = rm.submitApp(2000);
//kick the scheduling
- rm.nodeHeartbeat(node1, true);
+ nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
- rm.sendAMLaunched(attempt.getAppAttemptId());
- rm.registerAppAttempt(attempt.getAppAttemptId());
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
//request for containers
int request = 2;
- rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request,
+ am.allocate("h1" , 1000, request,
new ArrayList<ContainerId>());
//kick the scheduler
- rm.nodeHeartbeat(node1, true);
- List<Container> conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+ nm1.nodeHeartbeat(true);
+ List<Container> conts = am.allocate(new ArrayList<Container>(),
new ArrayList<ResourceRequest>());
int contReceived = conts.size();
while (contReceived < request) {
- conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+ conts = am.allocate(new ArrayList<Container>(),
new ArrayList<ResourceRequest>());
contReceived += conts.size();
Log.info("Got " + contReceived + " containers. Waiting to get " + request);
@@ -56,14 +55,14 @@ public class TestApplicationCleanup {
}
Assert.assertEquals(request, conts.size());
- rm.unregisterAppAttempt(attempt.getAppAttemptId());
- rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+ am.unregisterAppAttempt();
+ am.waitForState(RMAppAttemptState.FINISHED);
- int size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size();
+ int size = nm1.nodeHeartbeat(true).getApplicationsToCleanupList().size();
while(size < 1) {
Thread.sleep(1000);
Log.info("Waiting to get application cleanup..");
- size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size();
+ size = nm1.nodeHeartbeat(true).getApplicationsToCleanupList().size();
}
Assert.assertEquals(1, size);
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1153435&r1=1153434&r2=1153435&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
Wed Aug 3 11:36:27 2011
@@ -9,7 +9,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -29,18 +28,18 @@ public class TestRM {
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
rm.start();
- rm.registerNode("h1:1234", 5000);
+ MockNM nm1 = rm.registerNode("h1:1234", 5000);
RMApp app = rm.submitApp(2000);
//kick the scheduling
- rm.nodeHeartbeat("h1:1234", true);
+ nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
- rm.sendAMLaunched(attempt.getAppAttemptId());
- rm.registerAppAttempt(attempt.getAppAttemptId());
- rm.unregisterAppAttempt(attempt.getAppAttemptId());
- rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ am.unregisterAppAttempt();
+ am.waitForState(RMAppAttemptState.FINISHED);
rm.stop();
}
@@ -50,30 +49,29 @@ public class TestRM {
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
rm.start();
- rm.registerNode("h1:1234", 5000);
- rm.registerNode("h2:5678", 10000);
+ MockNM nm1 = rm.registerNode("h1:1234", 5000);
+ MockNM nm2 = rm.registerNode("h2:5678", 10000);
RMApp app = rm.submitApp(2000);
//kick the scheduling
- rm.nodeHeartbeat("h1:1234", true);
+ nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
- rm.sendAMLaunched(attempt.getAppAttemptId());
- rm.registerAppAttempt(attempt.getAppAttemptId());
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
//request for containers
int request = 13;
- rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request,
- new ArrayList<ContainerId>());
+ am.allocate("h1" , 1000, request, new ArrayList<ContainerId>());
//kick the scheduler
- rm.nodeHeartbeat("h1:1234", true);
- List<Container> conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+ nm1.nodeHeartbeat(true);
+ List<Container> conts = am.allocate(new ArrayList<Container>(),
new ArrayList<ResourceRequest>());
int contReceived = conts.size();
while (contReceived < 3) {//only 3 containers are available on node1
- conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+ conts = am.allocate(new ArrayList<Container>(),
new ArrayList<ResourceRequest>());
contReceived += conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
@@ -82,12 +80,12 @@ public class TestRM {
Assert.assertEquals(3, conts.size());
//send node2 heartbeat
- rm.nodeHeartbeat("h2:5678", true);
- conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+ nm2.nodeHeartbeat(true);
+ conts = am.allocate(new ArrayList<Container>(),
new ArrayList<ResourceRequest>());
contReceived = conts.size();
while (contReceived < 10) {
- conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+ conts = am.allocate(new ArrayList<Container>(),
new ArrayList<ResourceRequest>());
contReceived += conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
@@ -95,8 +93,8 @@ public class TestRM {
}
Assert.assertEquals(10, conts.size());
- rm.unregisterAppAttempt(attempt.getAppAttemptId());
- rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+ am.unregisterAppAttempt();
+ am.waitForState(RMAppAttemptState.FINISHED);
rm.stop();
}
|