Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Mar 31 22:23:22 2011
@@ -35,12 +35,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.NodeID;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
@@ -77,10 +78,10 @@ implements ResourceScheduler, CapacitySc
private final Comparator<Application> applicationComparator =
new Comparator<Application>() {
- @Override
- public int compare(Application a1, Application a2) {
- return a1.getApplicationId().id - a2.getApplicationId().id;
- }
+ @Override
+ public int compare(Application a1, Application a2) {
+ return a1.getApplicationId().getId() - a2.getApplicationId().getId();
+ }
};
private CapacitySchedulerConfiguration conf;
@@ -90,9 +91,9 @@ implements ResourceScheduler, CapacitySc
private Resource minimumAllocation;
-
- private Map<ApplicationID, Application> applications =
- new TreeMap<ApplicationID, Application>(
+
+ private Map<ApplicationId, Application> applications =
+ new TreeMap<ApplicationId, Application>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.ApplicationID.Comparator());
@@ -177,7 +178,7 @@ implements ResourceScheduler, CapacitySc
* @param priority the priority of the application
* @throws IOException
*/
- public void addApplication(ApplicationID applicationId,
+ public void addApplication(ApplicationId applicationId,
String user, String queueName, Priority priority)
throws IOException {
Queue queue = queues.get(queueName);
@@ -201,7 +202,7 @@ implements ResourceScheduler, CapacitySc
applications.put(applicationId, application);
- LOG.info("Application Submission: " + applicationId.id +
+ LOG.info("Application Submission: " + applicationId.getId() +
", user: " + user +
" queue: " + queue +
", currently active: " + applications.size());
@@ -213,7 +214,7 @@ implements ResourceScheduler, CapacitySc
* @param applicationId the applicationId of the application
* @throws IOException
*/
- public void removeApplication(ApplicationID applicationId)
+ public void removeApplication(ApplicationId applicationId)
throws IOException {
Application application = getApplication(applicationId);
@@ -241,7 +242,7 @@ implements ResourceScheduler, CapacitySc
}
@Override
- public List<Container> allocate(ApplicationID applicationId,
+ public List<Container> allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release)
throws IOException {
@@ -283,17 +284,16 @@ implements ResourceScheduler, CapacitySc
}
private void normalizeRequest(ResourceRequest ask) {
- int memory = ask.capability.memory;
- int minMemory = minimumAllocation.memory;
- ask.capability.memory =
- minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0));
+ int memory = ask.getCapability().getMemory();
+ int minMemory = minimumAllocation.getMemory();
+ ask.getCapability().setMemory (
+ minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
}
@Override
public synchronized NodeResponse nodeUpdate(NodeInfo node,
- Map<CharSequence,List<Container>> containers ) {
-
+ Map<String,List<Container>> containers ) {
LOG.info("nodeUpdate: " + node);
NodeResponse nodeResponse = nodeUpdateInternal(node, containers);
@@ -310,7 +310,7 @@ implements ResourceScheduler, CapacitySc
private synchronized void processCompletedContainers(
List<Container> completedContainers) {
for (Container container: completedContainers) {
- Application application = getApplication(container.id.appID);
+ Application application = getApplication(container.getId().getAppId());
// this is possible, since an application can be removed from scheduler
// but the nodemanger is just updating about a completed container.
@@ -342,7 +342,7 @@ implements ResourceScheduler, CapacitySc
processCompletedContainers(unusedContainers);
}
- private synchronized Application getApplication(ApplicationID applicationId) {
+ private synchronized Application getApplication(ApplicationId applicationId) {
return applications.get(applicationId);
}
@@ -353,7 +353,7 @@ implements ResourceScheduler, CapacitySc
try {
addApplication(event.getAppContext().getApplicationID(),
event.getAppContext().getUser(), event.getAppContext().getQueue(),
- event.getAppContext().getSubmissionContext().priority);
+ event.getAppContext().getSubmissionContext().getPriority());
} catch(IOException ie) {
LOG.error("Error in adding an application to the scheduler", ie);
//TODO do proper error handling to shutdown the Resource Manager is we
@@ -373,7 +373,7 @@ implements ResourceScheduler, CapacitySc
}
private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
- private Resource clusterResource = new Resource();
+ private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
public synchronized Resource getClusterResource() {
@@ -394,7 +394,7 @@ implements ResourceScheduler, CapacitySc
}
@Override
- public synchronized NodeInfo addNode(NodeID nodeId,
+ public synchronized NodeInfo addNode(NodeId nodeId,
String hostName, Node node, Resource capability) {
NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
nodes.put(nodeManager.getHostName(), nodeManager);
@@ -403,30 +403,30 @@ implements ResourceScheduler, CapacitySc
return nodeManager;
}
- public synchronized boolean releaseContainer(ApplicationID applicationId,
+ public synchronized boolean releaseContainer(ApplicationId applicationId,
Container container) {
// Reap containers
LOG.info("Application " + applicationId + " released container " + container);
- NodeManager nodeManager = nodes.get(container.hostName.toString());
+ NodeManager nodeManager = nodes.get(container.getHostName());
return nodeManager.releaseContainer(container);
}
public synchronized NodeResponse nodeUpdateInternal(NodeInfo nodeInfo,
- Map<CharSequence,List<Container>> containers) {
+ Map<String,List<Container>> containers) {
NodeManager node = nodes.get(nodeInfo.getHostName());
LOG.debug("nodeUpdate: node=" + nodeInfo.getHostName() +
- " available=" + nodeInfo.getAvailableResource().memory);
+ " available=" + nodeInfo.getAvailableResource().getMemory());
return node.statusUpdate(containers);
}
public synchronized void addAllocatedContainers(NodeInfo nodeInfo,
- ApplicationID applicationId, List<Container> containers) {
+ ApplicationId applicationId, List<Container> containers) {
NodeManager node = nodes.get(nodeInfo.getHostName());
node.allocateContainer(applicationId, containers);
}
- public synchronized void finishedApplication(ApplicationID applicationId,
+ public synchronized void finishedApplication(ApplicationId applicationId,
List<NodeInfo> nodesToNotify) {
for (NodeInfo node: nodesToNotify) {
NodeManager nodeManager = nodes.get(node.getHostName());
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Thu Mar 31 22:23:22 2011
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.api.records.Resource;
public class CapacitySchedulerConfiguration extends Configuration {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Thu Mar 31 22:23:22 2011
@@ -18,8 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.apache.hadoop.yarn.Resource;
/**
* Read-only interface to {@link CapacityScheduler} context.
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Mar 31 22:23:22 2011
@@ -33,11 +33,12 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
@@ -299,15 +300,15 @@ public class LeafQueue implements Queue
// Are we going over limits by allocating to this application?
ResourceRequest required =
application.getResourceRequest(priority, NodeManager.ANY);
- if (required != null && required.numContainers > 0) {
+ if (required != null && required.getNumContainers() > 0) {
// Maximum Capacity of the queue
- if (!assignToQueue(clusterResource, required.capability)) {
+ if (!assignToQueue(clusterResource, required.getCapability())) {
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
// User limits
- if (!assignToUser(application.getUser(), clusterResource, required.capability)) {
+ if (!assignToUser(application.getUser(), clusterResource, required.getCapability())) {
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
}
@@ -320,7 +321,7 @@ public class LeafQueue implements Queue
assigned,
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
Resource assignedResource =
- application.getResourceRequest(priority, NodeManager.ANY).capability;
+ application.getResourceRequest(priority, NodeManager.ANY).getCapability();
// Book-keeping
allocateResource(clusterResource,
@@ -345,12 +346,12 @@ public class LeafQueue implements Queue
private synchronized boolean assignToQueue(Resource clusterResource,
Resource required) {
float newUtilization =
- (float)(usedResources.memory + required.memory) /
- (clusterResource.memory * absoluteCapacity);
+ (float)(usedResources.getMemory() + required.getMemory()) /
+ (clusterResource.getMemory() * absoluteCapacity);
if (newUtilization > absoluteMaxCapacity) {
LOG.info(getQueueName() +
" current-capacity (" + getUtilization() + ") +" +
- " required (" + required.memory + ")" +
+ " required (" + required.getMemory() + ")" +
" > max-capacity (" + absoluteMaxCapacity + ")");
return false;
}
@@ -369,13 +370,13 @@ public class LeafQueue implements Queue
// Allow progress for queues with miniscule capacity
final int queueCapacity =
Math.max(
- divideAndCeil((int)(absoluteCapacity * clusterResource.memory),
- minimumAllocation.memory),
- required.memory);
+ divideAndCeil((int)(absoluteCapacity * clusterResource.getMemory()),
+ minimumAllocation.getMemory()),
+ required.getMemory());
- final int consumed = usedResources.memory;
+ final int consumed = usedResources.getMemory();
final int currentCapacity =
- (consumed < queueCapacity) ? queueCapacity : (consumed + required.memory);
+ (consumed < queueCapacity) ? queueCapacity : (consumed + required.getMemory());
// Never allow a single user to take more than the
// queue's configured capacity * user-limit-factor.
@@ -394,7 +395,7 @@ public class LeafQueue implements Queue
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, so...
- if ((user.getConsumedResources().memory) > limit) {
+ if ((user.getConsumedResources().getMemory()) > limit) {
LOG.info("User " + userName + " in queue " + getQueueName() +
" will exceed limit, required: " + required +
" consumed: " + user.getConsumedResources() + " limit: " + limit +
@@ -421,7 +422,7 @@ public class LeafQueue implements Queue
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, NodeManager.ANY);
- return (offSwitchRequest.numContainers > 0);
+ return (offSwitchRequest.getNumContainers() > 0);
}
Resource assignContainersOnNode(Resource clusterResource, NodeManager node,
@@ -498,12 +499,12 @@ public class LeafQueue implements Queue
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, NodeManager.ANY);
- if (offSwitchRequest.numContainers == 0) {
+ if (offSwitchRequest.getNumContainers() == 0) {
return false;
}
if (type == NodeType.OFF_SWITCH) {
- return offSwitchRequest.numContainers > 0;
+ return offSwitchRequest.getNumContainers() > 0;
}
if (type == NodeType.RACK_LOCAL) {
@@ -511,9 +512,9 @@ public class LeafQueue implements Queue
application.getResourceRequest(priority, node.getRackName());
if (rackLocalRequest == null) {
// No point waiting for rack-locality if we don't need this rack
- return offSwitchRequest.numContainers > 0;
+ return offSwitchRequest.getNumContainers() > 0;
} else {
- return rackLocalRequest.numContainers > 0;
+ return rackLocalRequest.getNumContainers() > 0;
}
}
@@ -521,7 +522,7 @@ public class LeafQueue implements Queue
ResourceRequest nodeLocalRequest =
application.getResourceRequest(priority, node.getHostName());
if (nodeLocalRequest != null) {
- return nodeLocalRequest.numContainers > 0;
+ return nodeLocalRequest.getNumContainers() > 0;
}
}
@@ -533,13 +534,13 @@ public class LeafQueue implements Queue
Priority priority, ResourceRequest request, NodeType type) {
LOG.info("DEBUG --- assignContainers:" +
" node=" + node.getHostName() +
- " application=" + application.getApplicationId().id +
- " priority=" + priority.priority +
+ " application=" + application.getApplicationId().getId() +
+ " priority=" + priority.getPriority() +
" request=" + request + " type=" + type);
- Resource capability = request.capability;
+ Resource capability = request.getCapability();
int availableContainers =
- node.getAvailableResource().memory / capability.memory; // TODO: A buggy
+ node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy
// application
// with this
// zero would
@@ -557,18 +558,16 @@ public class LeafQueue implements Queue
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
- ContainerToken containerToken = new ContainerToken();
+ ContainerToken containerToken = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerToken.class);
ContainerTokenIdentifier tokenidentifier =
- new ContainerTokenIdentifier(container.id,
- container.hostName.toString(), container.resource);
- containerToken.identifier =
- ByteBuffer.wrap(tokenidentifier.getBytes());
- containerToken.kind = ContainerTokenIdentifier.KIND.toString();
- containerToken.password =
- ByteBuffer.wrap(containerTokenSecretManager
- .createPassword(tokenidentifier));
- containerToken.service = container.hostName; // TODO: port
- container.containerToken = containerToken;
+ new ContainerTokenIdentifier(container.getId(),
+ container.getHostName(), container.getResource());
+ containerToken.setIdentifier(ByteBuffer.wrap(tokenidentifier.getBytes()));
+ containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
+ containerToken.setPassword(ByteBuffer.wrap(containerTokenSecretManager
+ .createPassword(tokenidentifier)));
+ containerToken.setService(container.getHostName()); // TODO: port
+ container.setContainerToken(containerToken);
}
containers.add(container);
@@ -585,7 +584,7 @@ public class LeafQueue implements Queue
" used=" + usedResources +
" cluster=" + clusterResource);
- return container.resource;
+ return container.getResource();
}
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
@@ -602,7 +601,7 @@ public class LeafQueue implements Queue
// Book-keeping
releaseResource(clusterResource,
- application.getUser(), container.resource);
+ application.getUser(), container.getResource());
LOG.info("completedContainer" +
" container=" + container +
@@ -640,8 +639,8 @@ public class LeafQueue implements Queue
}
private synchronized void update(Resource clusterResource) {
- setUtilization(usedResources.memory / (clusterResource.memory * absoluteCapacity));
- setUsedCapacity(usedResources.memory / (clusterResource.memory * capacity));
+ setUtilization(usedResources.getMemory() / (clusterResource.getMemory() * absoluteCapacity));
+ setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
}
static class User {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Mar 31 22:23:22 2011
@@ -31,9 +31,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
@@ -368,7 +368,7 @@ public class ParentQueue implements Queu
// Careful! Locking order is important!
// Book keeping
synchronized (this) {
- releaseResource(clusterResource, container.resource);
+ releaseResource(clusterResource, container.getResource());
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -401,8 +401,8 @@ public class ParentQueue implements Queu
}
private synchronized void update(Resource clusterResource) {
- setUtilization(usedResources.memory / (clusterResource.memory * absoluteCapacity));
- setUsedCapacity(usedResources.memory / (clusterResource.memory * capacity));
+ setUtilization(usedResources.getMemory() / (clusterResource.getMemory() * absoluteCapacity));
+ setUsedCapacity(usedResources.getMemory() / (clusterResource.getMemory() * capacity));
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java Thu Mar 31 22:23:22 2011
@@ -23,10 +23,9 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Mar 31 22:23:22 2011
@@ -34,14 +34,15 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.yarn.NodeID;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
@@ -71,8 +72,8 @@ public class FifoScheduler implements Re
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
MINIMUM_MEMORY);
- Map<ApplicationID, Application> applications =
- new TreeMap<ApplicationID, Application>(
+ Map<ApplicationId, Application> applications =
+ new TreeMap<ApplicationId, Application>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.ApplicationID.Comparator());
private static final Queue DEFAULT_QUEUE = new Queue() {
@@ -100,7 +101,7 @@ public class FifoScheduler implements Re
}
@Override
- public synchronized List<Container> allocate(ApplicationID applicationId,
+ public synchronized List<Container> allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release)
throws IOException {
Application application = getApplication(applicationId);
@@ -147,25 +148,25 @@ public class FifoScheduler implements Re
}
private void normalizeRequest(ResourceRequest ask) {
- int memory = ask.capability.memory;
+ int memory = ask.getCapability().getMemory();
memory =
MINIMUM_MEMORY * ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY));
}
- private synchronized Application getApplication(ApplicationID applicationId) {
+ private synchronized Application getApplication(ApplicationId applicationId) {
return applications.get(applicationId);
}
- public synchronized void addApplication(ApplicationID applicationId,
+ public synchronized void addApplication(ApplicationId applicationId,
String user, String unusedQueue, Priority unusedPriority)
throws IOException {
applications.put(applicationId,
new Application(applicationId, DEFAULT_QUEUE, user));
- LOG.info("Application Submission: " + applicationId.id + " from " + user +
+ LOG.info("Application Submission: " + applicationId.getId() + " from " + user +
", currently active: " + applications.size());
}
- public synchronized void removeApplication(ApplicationID applicationId)
+ public synchronized void removeApplication(ApplicationId applicationId)
throws IOException {
Application application = getApplication(applicationId);
if (application == null) {
@@ -195,7 +196,7 @@ public class FifoScheduler implements Re
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationID, Application> e : applications.entrySet()) {
+ for (Map.Entry<ApplicationId, Application> e : applications.entrySet()) {
Application application = e.getValue();
LOG.debug("pre-assignContainers");
application.showRequests();
@@ -230,7 +231,7 @@ public class FifoScheduler implements Re
Priority priority, NodeInfo node, NodeType type) {
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, NodeManager.ANY);
- int maxContainers = offSwitchRequest.numContainers;
+ int maxContainers = offSwitchRequest.getNumContainers();
if (type == NodeType.OFF_SWITCH) {
return maxContainers;
@@ -243,14 +244,14 @@ public class FifoScheduler implements Re
return maxContainers;
}
- maxContainers = Math.min(maxContainers, rackLocalRequest.numContainers);
+ maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers());
}
if (type == NodeType.DATA_LOCAL) {
ResourceRequest nodeLocalRequest =
application.getResourceRequest(priority, node.getHostName());
if (nodeLocalRequest != null) {
- maxContainers = Math.min(maxContainers, nodeLocalRequest.numContainers);
+ maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
}
}
@@ -276,8 +277,8 @@ public class FifoScheduler implements Re
LOG.debug("assignContainersOnNode:" +
" node=" + node.getHostName() +
- " application=" + application.getApplicationId().id +
- " priority=" + priority.priority +
+ " application=" + application.getApplicationId().getId() +
+ " priority=" + priority.getPriority() +
" #assigned=" +
(nodeLocalContainers + rackLocalContainers + offSwitchContainers));
@@ -295,7 +296,7 @@ public class FifoScheduler implements Re
Math.min(
getMaxAllocatableContainers(application, priority, node,
NodeType.DATA_LOCAL),
- request.numContainers);
+ request.getNumContainers());
assignedContainers =
assignContainers(node, application, priority,
assignableContainers, request, NodeType.DATA_LOCAL);
@@ -313,7 +314,7 @@ public class FifoScheduler implements Re
Math.min(
getMaxAllocatableContainers(application, priority, node,
NodeType.RACK_LOCAL),
- request.numContainers);
+ request.getNumContainers());
assignedContainers =
assignContainers(node, application, priority,
assignableContainers, request, NodeType.RACK_LOCAL);
@@ -329,7 +330,7 @@ public class FifoScheduler implements Re
if (request != null) {
assignedContainers =
assignContainers(node, application, priority,
- request.numContainers, request, NodeType.OFF_SWITCH);
+ request.getNumContainers(), request, NodeType.OFF_SWITCH);
}
return assignedContainers;
}
@@ -339,14 +340,14 @@ public class FifoScheduler implements Re
ResourceRequest request, NodeType type) {
LOG.debug("assignContainers:" +
" node=" + node.getHostName() +
- " application=" + application.getApplicationId().id +
- " priority=" + priority.priority +
+ " application=" + application.getApplicationId().getId() +
+ " priority=" + priority.getPriority() +
" assignableContainers=" + assignableContainers +
" request=" + request + " type=" + type);
- Resource capability = request.capability;
+ Resource capability = request.getCapability();
int availableContainers =
- node.getAvailableResource().memory / capability.memory; // TODO: A buggy
+ node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy
// application
// with this
// zero would
@@ -366,18 +367,18 @@ public class FifoScheduler implements Re
node.getHostName(), capability);
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
- ContainerToken containerToken = new ContainerToken();
+ ContainerToken containerToken = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerToken.class);
ContainerTokenIdentifier tokenidentifier =
- new ContainerTokenIdentifier(container.id,
- container.hostName.toString(), container.resource);
- containerToken.identifier =
- ByteBuffer.wrap(tokenidentifier.getBytes());
- containerToken.kind = ContainerTokenIdentifier.KIND.toString();
- containerToken.password =
+ new ContainerTokenIdentifier(container.getId(),
+ container.getHostName(), container.getResource());
+ containerToken.setIdentifier(
+ ByteBuffer.wrap(tokenidentifier.getBytes()));
+ containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
+ containerToken.setPassword(
ByteBuffer.wrap(containerTokenSecretManager
- .createPassword(tokenidentifier));
- containerToken.service = container.hostName; // TODO: port
- container.containerToken = containerToken;
+ .createPassword(tokenidentifier)));
+ containerToken.setService(container.getHostName()); // TODO: port
+ container.setContainerToken(containerToken);
}
containers.add(container);
}
@@ -390,7 +391,7 @@ public class FifoScheduler implements Re
private synchronized void applicationCompletedContainers(
List<Container> completedContainers) {
for (Container c: completedContainers) {
- Application app = applications.get(c.id.appID);
+ Application app = applications.get(c.getId().getAppId());
/** this is possible, since an application can be removed from scheduler but
* the nodemanger is just updating about a completed container.
*/
@@ -402,7 +403,7 @@ public class FifoScheduler implements Re
@Override
public synchronized NodeResponse nodeUpdate(NodeInfo node,
- Map<CharSequence,List<Container>> containers ) {
+ Map<String,List<Container>> containers ) {
NodeResponse nodeResponse = nodeUpdateInternal(node, containers);
applicationCompletedContainers(nodeResponse.getCompletedContainers());
@@ -425,7 +426,7 @@ public class FifoScheduler implements Re
case ADD:
try {
addApplication(event.getAppContext().getApplicationID(), event.getAppContext().getUser(),
- event.getAppContext().getQueue(), event.getAppContext().getSubmissionContext().priority);
+ event.getAppContext().getQueue(), event.getAppContext().getSubmissionContext().getPriority());
} catch(IOException ie) {
LOG.error("Unable to add application " + event.getAppContext().getApplicationID(), ie);
/** this is fatal we are not able to add applications for scheduling **/
@@ -444,7 +445,7 @@ public class FifoScheduler implements Re
}
private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
- private Resource clusterResource = new Resource();
+ private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
public synchronized Resource getClusterResource() {
return clusterResource;
@@ -464,7 +465,7 @@ public class FifoScheduler implements Re
}
@Override
- public synchronized NodeInfo addNode(NodeID nodeId,
+ public synchronized NodeInfo addNode(NodeId nodeId,
String hostName, Node node, Resource capability) {
NodeManager nodeManager = new NodeManager(nodeId, hostName, node, capability);
nodes.put(nodeManager.getHostName(), nodeManager);
@@ -473,30 +474,30 @@ public class FifoScheduler implements Re
return nodeManager;
}
- public synchronized boolean releaseContainer(ApplicationID applicationId,
+ public synchronized boolean releaseContainer(ApplicationId applicationId,
Container container) {
// Reap containers
LOG.info("Application " + applicationId + " released container " + container);
- NodeManager nodeManager = nodes.get(container.hostName.toString());
+ NodeManager nodeManager = nodes.get(container.getHostName());
return nodeManager.releaseContainer(container);
}
private synchronized NodeResponse nodeUpdateInternal(NodeInfo nodeInfo,
- Map<CharSequence,List<Container>> containers) {
+ Map<String,List<Container>> containers) {
NodeManager node = nodes.get(nodeInfo.getHostName());
LOG.debug("nodeUpdate: node=" + nodeInfo.getHostName() +
- " available=" + nodeInfo.getAvailableResource().memory);
+ " available=" + nodeInfo.getAvailableResource().getMemory());
return node.statusUpdate(containers);
}
public synchronized void addAllocatedContainers(NodeInfo nodeInfo,
- ApplicationID applicationId, List<Container> containers) {
+ ApplicationId applicationId, List<Container> containers) {
NodeManager node = nodes.get(nodeInfo.getHostName());
node.allocateContainer(applicationId, containers);
}
- public synchronized void finishedApplication(ApplicationID applicationId,
+ public synchronized void finishedApplication(ApplicationId applicationId,
List<NodeInfo> nodesToNotify) {
for (NodeInfo node: nodesToNotify) {
NodeManager nodeManager = nodes.get(node.getHostName());
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Thu Mar 31 22:23:22 2011
@@ -56,11 +56,11 @@ class AppsBlock extends HtmlBlock {
CharSequence master = app.master();
String am = master == null ? "UNASSIGNED"
: join(master, ':', app.httpPort());
- String percent = String.format("%.1f", app.status().progress * 100);
+ String percent = String.format("%.1f", app.status().getProgress() * 100);
tbody.
tr().
td().
- br().$title(String.valueOf(app.id().id))._(). // for sorting
+ br().$title(String.valueOf(app.id().getId()))._(). // for sorting
a(url("app", appId), appId)._().
td(app.user().toString()).
td(app.name().toString()).
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsList.java Thu Mar 31 22:23:22 2011
@@ -61,12 +61,12 @@ class AppsList implements ToJSON {
String ui = master == null ? "UNASSIGNED"
: join(master, ':', app.httpPort());
out.append("[\"");
- appendSortable(out, app.id().id);
+ appendSortable(out, app.id().getId());
appendLink(out, appID, rc.prefix(), "app", appID).append(_SEP).
append(escapeHtml(app.user().toString())).append(_SEP).
append(escapeHtml(app.name().toString())).append(_SEP).
append(app.state().toString()).append(_SEP);
- appendProgressBar(out, app.status().progress).append(_SEP);
+ appendProgressBar(out, app.status().getProgress()).append(_SEP);
appendLink(out, ui, rc.prefix(), master == null ? "#" : "http://", ui).
append("\"]");
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java Thu Mar 31 22:23:22 2011
@@ -55,11 +55,11 @@ class NodesPage extends RmView {
for (NodeInfo ni : resource.getAllNodeInfo()) {
tbody.tr().
td(ni.getRackName()).
- td(String.valueOf(ni.getNodeID().id)).
+ td(String.valueOf(ni.getNodeID().getId())).
td(ni.getHostName()).
td(String.valueOf(ni.getNumContainers())).
- td(String.valueOf(ni.getUsedResource().memory)).
- td(String.valueOf(ni.getAvailableResource().memory))._();
+ td(String.valueOf(ni.getUsedResource().getMemory())).
+ td(String.valueOf(ni.getAvailableResource().getMemory()))._();
}
tbody._()._();
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java Thu Mar 31 22:23:22 2011
@@ -24,6 +24,7 @@ import java.util.Date;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.Application;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -31,7 +32,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.hadoop.yarn.webapp.ResponseInfo;
-import org.apache.hadoop.yarn.ApplicationID;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp.*;
import static org.apache.hadoop.yarn.util.StringHelper.*;
@@ -65,7 +65,7 @@ public class RmController extends Contro
setTitle("Bad request: requires application ID");
return;
}
- ApplicationID appID = Apps.toAppID(aid);
+ ApplicationId appID = Apps.toAppID(aid);
ApplicationsManager asm = injector().getInstance(ApplicationsManager.class);
Application app = asm.getApplication(appID);
if (app == null) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Thu Mar 31 22:23:22 2011
@@ -30,23 +30,26 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceRequest;
@Private
public class Application {
@@ -56,8 +59,9 @@ public class Application {
final private String user;
final private String queue;
- final private ApplicationID applicationId;
+ final private ApplicationId applicationId;
final private ResourceManager resourceManager;
+ private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
final private Map<Priority, Resource> requestSpec =
new TreeMap<Priority, Resource>(
@@ -81,15 +85,15 @@ public class Application {
final private Map<String, NodeManager> nodes =
new HashMap<String, NodeManager>();
- Resource used = new Resource();
+ Resource used = recordFactory.newRecordInstance(Resource.class);
public Application(String user, ResourceManager resourceManager)
- throws AvroRemoteException {
+ throws YarnRemoteException {
this(user, "default", resourceManager);
}
public Application(String user, String queue, ResourceManager resourceManager)
- throws AvroRemoteException {
+ throws YarnRemoteException {
this.user = user;
this.queue = queue;
this.resourceManager = resourceManager;
@@ -105,7 +109,7 @@ public class Application {
return queue;
}
- public ApplicationID getApplicationId() {
+ public ApplicationId getApplicationId() {
return applicationId;
}
@@ -122,10 +126,10 @@ public class Application {
}
public synchronized void submit() throws IOException {
- ApplicationSubmissionContext context = new ApplicationSubmissionContext();
- context.applicationId = applicationId;
- context.user = this.user;
- context.queue = this.queue;
+ ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ context.setApplicationId(this.applicationId);
+ context.setUser(this.user);
+ context.setQueue(this.queue);
resourceManager.getApplicationsManager().submitApplication(context);
}
@@ -134,7 +138,7 @@ public class Application {
Resource currentSpec = requestSpec.put(priority, capability);
if (currentSpec != null) {
throw new IllegalStateException("Resource spec already exists for " +
- "priority " + priority.priority + " - " + currentSpec.memory);
+ "priority " + priority.getPriority() + " - " + currentSpec.getMemory());
}
}
@@ -200,9 +204,11 @@ public class Application {
}
NodeManager nodeManager = task.getNodeManager();
- ContainerID containerId = task.getContainerId();
+ ContainerId containerId = task.getContainerId();
task.stop();
- nodeManager.stopContainer(containerId);
+ StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+ stopRequest.setContainerId(containerId);
+ nodeManager.stopContainer(stopRequest);
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
used, requestSpec.get(task.getPriority()));
@@ -223,7 +229,7 @@ public class Application {
priority, resourceName, capability, 1);
requests.put(resourceName, request);
} else {
- ++request.numContainers;
+ request.setNumContainers(request.getNumContainers() + 1);
}
// Note this down for next interaction with ResourceManager
@@ -233,11 +239,11 @@ public class Application {
request)); // clone to ensure the RM doesn't manipulate the same obj
LOG.info("DEBUG --- addResourceRequest:" +
- " applicationId=" + applicationId.id +
- " priority=" + priority.priority +
+ " applicationId=" + applicationId.getId() +
+ " priority=" + priority.getPriority() +
" resourceName=" + resourceName +
" capability=" + capability +
- " numContainers=" + request.numContainers +
+ " numContainers=" + request.getNumContainers() +
" #asks=" + ask.size());
}
@@ -265,7 +271,7 @@ public class Application {
List<Container> containers = new ArrayList<Container>(response.size());
for (Container container : response) {
- if (container.state != ContainerState.COMPLETE) {
+ if (container.getState() != ContainerState.COMPLETE) {
containers.add(
org.apache.hadoop.yarn.server.resourcemanager.resource.Container.create(
container));
@@ -315,22 +321,22 @@ public class Application {
List<Container> containers) throws IOException {
for (Iterator<Container> i=containers.iterator(); i.hasNext();) {
Container container = i.next();
- String host = container.hostName.toString();
+ String host = container.getHostName();
if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.equals(
- requestSpec.get(priority), container.resource)) {
+ requestSpec.get(priority), container.getResource())) {
// See which task can use this container
for (Iterator<Task> t=tasks.get(priority).iterator(); t.hasNext();) {
Task task = t.next();
if (task.getState() == State.PENDING && task.canSchedule(type, host)) {
NodeManager nodeManager = getNodeManager(host);
- task.start(nodeManager, container.id);
+ task.start(nodeManager, container.getId());
i.remove();
// Track application resource usage
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- used, container.resource);
+ used, container.getResource());
LOG.info("Assigned container (" + container + ") of type " + type +
" to task " + task.getTaskId() + " at priority " + priority +
@@ -341,7 +347,9 @@ public class Application {
updateResourceRequests(requests.get(priority), type, task);
// Launch the container
- nodeManager.startContainer(createCLC(container));
+ StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(createCLC(container));
+ nodeManager.startContainer(startRequest);
break;
}
}
@@ -384,7 +392,7 @@ public class Application {
}
private void updateResourceRequest(ResourceRequest request) {
- --request.numContainers;
+ request.setNumContainers(request.getNumContainers() - 1);
// Note this for next interaction with ResourceManager
ask.remove(request);
@@ -398,10 +406,10 @@ public class Application {
}
private ContainerLaunchContext createCLC(Container container) {
- ContainerLaunchContext clc = new ContainerLaunchContext();
- clc.id = container.id;
- clc.user = this.user;
- clc.resource = container.resource;
+ ContainerLaunchContext clc = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ clc.setContainerId(container.getId());
+ clc.setUser(this.user);
+ clc.setResource(container.getResource());
return clc;
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Thu Mar 31 22:23:22 2011
@@ -23,15 +23,18 @@ import com.google.common.collect.Lists;
import java.util.List;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.NodeID;
-import org.apache.hadoop.yarn.Resource;
/**
* Test helper to generate mock nodes
*/
public class MockNodes {
private static int NODE_ID = 0;
+ private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
public static List<NodeInfo> newNodes(int racks, int nodesPerRack,
Resource perNode) {
@@ -44,41 +47,41 @@ public class MockNodes {
return list;
}
- public static NodeID newNodeID(int id) {
- NodeID nid = new NodeID();
- nid.id = id;
+ public static NodeId newNodeID(int id) {
+ NodeId nid = recordFactory.newRecordInstance(NodeId.class);
+ nid.setId(id);
return nid;
}
public static Resource newResource(int mem) {
- Resource rs = new Resource();
- rs.memory = mem;
+ Resource rs = recordFactory.newRecordInstance(Resource.class);
+ rs.setMemory(mem);
return rs;
}
public static Resource newUsedResource(Resource total) {
- Resource rs = new Resource();
- rs.memory = (int)(Math.random() * total.memory);
+ Resource rs = recordFactory.newRecordInstance(Resource.class);
+ rs.setMemory((int)(Math.random() * total.getMemory()));
return rs;
}
public static Resource newAvailResource(Resource total, Resource used) {
- Resource rs = new Resource();
- rs.memory = total.memory - used.memory;
+ Resource rs = recordFactory.newRecordInstance(Resource.class);
+ rs.setMemory(total.getMemory() - used.getMemory());
return rs;
}
public static NodeInfo newNodeInfo(int rack, final Resource perNode) {
final String rackName = "rack"+ rack;
final int nid = NODE_ID++;
- final NodeID nodeID = newNodeID(nid);
+ final NodeId nodeID = newNodeID(nid);
final String hostName = "host"+ nid;
final Resource used = newUsedResource(perNode);
final Resource avail = newAvailResource(perNode, used);
final int containers = (int)(Math.random() * 8);
return new NodeInfo() {
@Override
- public NodeID getNodeID() {
+ public NodeId getNodeID() {
return nodeID;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Thu Mar 31 22:23:22 2011
@@ -27,39 +27,51 @@ import java.util.Map;
import junit.framework.Assert;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerManager;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.ContainerStatus;
-import org.apache.hadoop.yarn.HeartbeatResponse;
-import org.apache.hadoop.yarn.NodeID;
-import org.apache.hadoop.yarn.NodeStatus;
-import org.apache.hadoop.yarn.RegistrationResponse;
-import org.apache.hadoop.yarn.Resource;
@Private
public class NodeManager implements ContainerManager {
private static final Log LOG = LogFactory.getLog(NodeManager.class);
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
final private String hostName;
final private String rackName;
- final private NodeID nodeId;
+ final private NodeId nodeId;
final private Resource capability;
- Resource available = new Resource();
- Resource used = new Resource();
+ Resource available = recordFactory.newRecordInstance(Resource.class);
+ Resource used = recordFactory.newRecordInstance(Resource.class);
final RMResourceTrackerImpl resourceTracker;
final NodeInfo nodeInfo;
- final Map<CharSequence, List<Container>> containers =
- new HashMap<CharSequence, List<Container>>();
+ final Map<String, List<Container>> containers =
+ new HashMap<String, List<Container>>();
public NodeManager(String hostName, String rackName, int memory,
RMResourceTrackerImpl resourceTracker) throws IOException {
@@ -72,14 +84,17 @@ public class NodeManager implements Cont
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
available, capability);
+ RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+ request.setNode(hostName);
+ request.setResource(capability);
RegistrationResponse response =
- resourceTracker.registerNodeManager(hostName, capability);
- this.nodeId = response.nodeID;
+ resourceTracker.registerNodeManager(request).getRegistrationResponse();
+ this.nodeId = response.getNodeId();
this.nodeInfo = resourceTracker.getNodeManager(nodeId);
// Sanity check
Assert.assertEquals(memory,
- nodeInfo.getAvailableResource().memory);
+ nodeInfo.getAvailableResource().getMemory());
}
public String getHostName() {
@@ -90,7 +105,7 @@ public class NodeManager implements Cont
return rackName;
}
- public NodeID getNodeId() {
+ public NodeId getNodeId() {
return nodeId;
}
@@ -108,26 +123,29 @@ public class NodeManager implements Cont
int responseID = 0;
- public void heartbeat() throws AvroRemoteException {
+ public void heartbeat() throws YarnRemoteException {
NodeStatus nodeStatus =
org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeStatus.createNodeStatus(
nodeId, containers);
- nodeStatus.responseId = responseID;
- HeartbeatResponse response = resourceTracker.nodeHeartbeat(nodeStatus);
- responseID = response.responseId;
+ nodeStatus.setResponseId(responseID);
+ NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
+ request.setNodeStatus(nodeStatus);
+ HeartbeatResponse response = resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+ responseID = response.getResponseId();
}
@Override
- synchronized public Void cleanupContainer(ContainerID containerID)
- throws AvroRemoteException {
- // TODO Auto-generated method stub
- return null;
+ synchronized public CleanupContainerResponse cleanupContainer(CleanupContainerRequest request) throws YarnRemoteException {
+ ContainerId containerID = request.getContainerId();
+ CleanupContainerResponse response = recordFactory.newRecordInstance(CleanupContainerResponse.class);
+ return response;
}
@Override
- synchronized public Void startContainer(ContainerLaunchContext containerLaunchContext)
- throws AvroRemoteException {
- String applicationId = String.valueOf(containerLaunchContext.id.appID.id);
+ synchronized public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
+ ContainerLaunchContext containerLaunchContext = request.getContainerLaunchContext();
+
+ String applicationId = String.valueOf(containerLaunchContext.getContainerId().getAppId().getId());
List<Container> applicationContainers = containers.get(applicationId);
if (applicationContainers == null) {
@@ -137,23 +155,23 @@ public class NodeManager implements Cont
// Sanity check
for (Container container : applicationContainers) {
- if (container.id.compareTo(containerLaunchContext.id) == 0) {
+ if (container.getId().compareTo(containerLaunchContext.getContainerId()) == 0) {
throw new IllegalStateException(
- "Container " + containerLaunchContext.id +
+ "Container " + containerLaunchContext.getContainerId() +
" already setup on node " + hostName);
}
}
Container container =
org.apache.hadoop.yarn.server.resourcemanager.resource.Container.create(
- containerLaunchContext.id,
- hostName, containerLaunchContext.resource);
+ containerLaunchContext.getContainerId(),
+ hostName, containerLaunchContext.getResource());
applicationContainers.add(container);
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- available, containerLaunchContext.resource);
+ available, containerLaunchContext.getResource());
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- used, containerLaunchContext.resource);
+ used, containerLaunchContext.getResource());
LOG.info("DEBUG --- startContainer:" +
" node=" + hostName +
@@ -162,26 +180,28 @@ public class NodeManager implements Cont
" available=" + available +
" used=" + used);
- return null;
+ StartContainerResponse response = recordFactory.newRecordInstance(StartContainerResponse.class);
+ return response;
}
synchronized public void checkResourceUsage() {
LOG.info("Checking resource usage for " + hostName);
- Assert.assertEquals(available.memory,
- nodeInfo.getAvailableResource().memory);
- Assert.assertEquals(used.memory,
- nodeInfo.getUsedResource().memory);
+ Assert.assertEquals(available.getMemory(),
+ nodeInfo.getAvailableResource().getMemory());
+ Assert.assertEquals(used.getMemory(),
+ nodeInfo.getUsedResource().getMemory());
}
@Override
- synchronized public Void stopContainer(ContainerID containerID) throws AvroRemoteException {
- String applicationId = String.valueOf(containerID.appID.id);
+ synchronized public StopContainerResponse stopContainer(StopContainerRequest request) throws YarnRemoteException {
+ ContainerId containerID = request.getContainerId();
+ String applicationId = String.valueOf(containerID.getAppId().getId());
// Mark the container as COMPLETE
List<Container> applicationContainers = containers.get(applicationId);
for (Container c : applicationContainers) {
- if (c.id.compareTo(containerID) == 0) {
- c.state = ContainerState.COMPLETE;
+ if (c.getId().compareTo(containerID) == 0) {
+ c.setState(ContainerState.COMPLETE);
}
}
@@ -193,7 +213,7 @@ public class NodeManager implements Cont
Container container = null;
for (Iterator<Container> i=applicationContainers.iterator(); i.hasNext();) {
container = i.next();
- if (container.id.compareTo(containerID) == 0) {
+ if (container.getId().compareTo(containerID) == 0) {
i.remove();
++ctr;
}
@@ -205,9 +225,9 @@ public class NodeManager implements Cont
}
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
- available, container.resource);
+ available, container.getResource());
org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
- used, container.resource);
+ used, container.getResource());
LOG.info("DEBUG --- stopContainer:" +
" node=" + hostName +
@@ -216,13 +236,14 @@ public class NodeManager implements Cont
" available=" + available +
" used=" + used);
- return null;
+ StopContainerResponse response = recordFactory.newRecordInstance(StopContainerResponse.class);
+ return response;
}
@Override
- synchronized public ContainerStatus getContainerStatus(ContainerID containerID)
- throws AvroRemoteException {
- // TODO Auto-generated method stub
- return null;
+ synchronized public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException {
+ ContainerId containerID = request.getContainerId();
+ GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class);
+ return response;
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java Thu Mar 31 22:23:22 2011
@@ -26,24 +26,24 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.Priority;
public class Task {
private static final Log LOG = LogFactory.getLog(Task.class);
public enum State {PENDING, ALLOCATED, RUNNING, COMPLETE};
- final private ApplicationID applicationId;
+ final private ApplicationId applicationId;
final private int taskId;
final private Priority priority;
final private Set<String> hosts = new HashSet<String>();
final private Set<String> racks = new HashSet<String>();
- private ContainerID containerId;
+ private ContainerId containerId;
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager;
private State state;
@@ -80,11 +80,11 @@ public class Task {
return nodeManager;
}
- public ContainerID getContainerId() {
+ public ContainerId getContainerId() {
return containerId;
}
- public ApplicationID getApplicationID() {
+ public ApplicationId getApplicationID() {
return applicationId;
}
@@ -106,7 +106,7 @@ public class Task {
return true;
}
- public void start(NodeManager nodeManager, ContainerID containerId) {
+ public void start(NodeManager nodeManager, ContainerId containerId) {
this.nodeManager = nodeManager;
this.containerId = containerId;
setState(State.RUNNING);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Thu Mar 31 22:23:22 2011
@@ -27,10 +27,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
+
import org.junit.After;
import org.junit.Before;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java Thu Mar 31 22:23:22 2011
@@ -28,15 +28,17 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationState;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
@@ -52,6 +54,7 @@ import org.junit.Test;
/* a test case that tests the launch failure of a AM */
public class TestAMLaunchFailure extends TestCase {
private static final Log LOG = LogFactory.getLog(TestAMLaunchFailure.class);
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationsManagerImpl asmImpl;
YarnScheduler scheduler = new DummyYarnScheduler();
ApplicationTokenSecretManager applicationTokenSecretManager =
@@ -60,10 +63,10 @@ public class TestAMLaunchFailure extends
private ASMContext context;
private static class DummyYarnScheduler implements YarnScheduler {
- private Container container = new Container();
+ private Container container = recordFactory.newRecordInstance(Container.class);
@Override
- public List<Container> allocate(ApplicationID applicationId,
+ public List<Container> allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release) throws IOException {
return Arrays.asList(container);
}
@@ -151,23 +154,23 @@ public class TestAMLaunchFailure extends
asmImpl.stop();
}
- private ApplicationSubmissionContext createDummyAppContext(ApplicationID appID) {
- ApplicationSubmissionContext context = new ApplicationSubmissionContext();
- context.applicationId = appID;
+ private ApplicationSubmissionContext createDummyAppContext(ApplicationId appID) {
+ ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ context.setApplicationId(appID);
return context;
}
@Test
public void testAMLaunchFailure() throws Exception {
- ApplicationID appID = asmImpl.getNewApplicationID();
+ ApplicationId appID = asmImpl.getNewApplicationID();
ApplicationSubmissionContext context = createDummyAppContext(appID);
asmImpl.submitApplication(context);
ApplicationMaster master = asmImpl.getApplicationMaster(appID);
- while (master.state != ApplicationState.FAILED) {
+ while (master.getState() != ApplicationState.FAILED) {
Thread.sleep(200);
master = asmImpl.getApplicationMaster(appID);
}
- assertTrue(master.state == ApplicationState.FAILED);
+ assertTrue(master.getState() == ApplicationState.FAILED);
}
-}
\ No newline at end of file
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java Thu Mar 31 22:23:22 2011
@@ -24,24 +24,29 @@ import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.AMResponse;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationStatus;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.ResourceRequest;
import org.junit.After;
import org.junit.Before;
public class TestAMRMRPCResponseId extends TestCase {
+ private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationMasterService amService = null;
ApplicationTokenSecretManager appTokenManager = new ApplicationTokenSecretManager();
DummyApplicationsManager applicationsManager;
@@ -71,7 +76,7 @@ public class TestAMRMRPCResponseId exten
private class DummyScheduler implements YarnScheduler {
@Override
- public List<Container> allocate(ApplicationID applicationId,
+ public List<Container> allocate(ApplicationId applicationId,
List<ResourceRequest> ask, List<Container> release) throws IOException {
return null;
}
@@ -96,29 +101,37 @@ public class TestAMRMRPCResponseId exten
}
public void testARRMResponseId() throws Exception {
- ApplicationID applicationID = applicationsManager.getNewApplicationID();
- ApplicationSubmissionContext context = new ApplicationSubmissionContext();
- context.applicationId = applicationID;
+ ApplicationId applicationID = applicationsManager.getNewApplicationID();
+ ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ context.setApplicationId(applicationID);
applicationsManager.submitApplication(context);
- ApplicationMaster applicationMaster = new ApplicationMaster();
- applicationMaster.applicationId = applicationID;
- applicationMaster.status = new ApplicationStatus();
- amService.registerApplicationMaster(applicationMaster);
- ApplicationStatus status = new ApplicationStatus();
- status.applicationId = applicationID;
- AMResponse response = amService.allocate(status, null, null);
- assertTrue(response.responseId == 1);
- assertFalse(response.reboot);
- status.responseID = response.responseId;
- response = amService.allocate(status, null, null);
- assertTrue(response.responseId == 2);
+ ApplicationMaster applicationMaster = recordFactory.newRecordInstance(ApplicationMaster.class);
+ applicationMaster.setApplicationId(applicationID);
+ applicationMaster.setStatus(recordFactory.newRecordInstance(ApplicationStatus.class));
+ RegisterApplicationMasterRequest request = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
+ request.setApplicationMaster(applicationMaster);
+ amService.registerApplicationMaster(request);
+ ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
+ status.setApplicationId(applicationID);
+
+ AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class);
+ allocateRequest.setApplicationStatus(status);
+ AMResponse response = amService.allocate(allocateRequest).getAMResponse();
+ assertTrue(response.getResponseId() == 1);
+ assertFalse(response.getReboot());
+ status.setResponseId(response.getResponseId());
+
+ allocateRequest.setApplicationStatus(status);
+ response = amService.allocate(allocateRequest).getAMResponse();
+ assertTrue(response.getResponseId() == 2);
/* try resending */
- response = amService.allocate(status, null, null);
- assertTrue(response.responseId == 2);
+ response = amService.allocate(allocateRequest).getAMResponse();
+ assertTrue(response.getResponseId() == 2);
/** try sending old **/
- status.responseID = 0;
- response = amService.allocate(status, null, null);
- assertTrue(response.reboot);
+ status.setResponseId(0);
+ allocateRequest.setApplicationStatus(status);
+ response = amService.allocate(allocateRequest).getAMResponse();
+ assertTrue(response.getReboot());
}
-}
\ No newline at end of file
+}
|