Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Aug 3 11:31:34 2011
@@ -19,6 +19,13 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
@@ -26,8 +33,11 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -40,40 +50,74 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
-public class ResourceTrackerService extends AbstractService
-implements ResourceTracker{
+public class ResourceTrackerService extends AbstractService implements
+ ResourceTracker, RMNodeRemovalListener {
private static final Log LOG = LogFactory.getLog(ResourceTrackerService.class);
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- private final RMResourceTrackerImpl resourceTracker;
+ private final RMContext rmContext;
+ private final NodesListManager nodesListManager;
+ private final NMLivelinessMonitor nmLivelinessMonitor;
+ private final ContainerTokenSecretManager containerTokenSecretManager;
+
+ /* we dont garbage collect on nodes. A node can come back up again and re register,
+ * so no use garbage collecting. Though admin can break the RM by bouncing
+ * nodemanagers on different ports again and again.
+ */
+ private Map<String, NodeId> nodes = new ConcurrentHashMap<String, NodeId>();
+ private final AtomicInteger nodeCounter = new AtomicInteger(0);
private Server server;
private InetSocketAddress resourceTrackerAddress;
- public ResourceTrackerService(RMResourceTrackerImpl resourceTracker) {
- super(ResourceTrackerService.class.getName());
- this.resourceTracker = resourceTracker;
+ private static final NodeHeartbeatResponse reboot = recordFactory
+ .newRecordInstance(NodeHeartbeatResponse.class);
+ static {
+ HeartbeatResponse rebootResp = recordFactory
+ .newRecordInstance(HeartbeatResponse.class);
+ rebootResp.setReboot(true);
+ reboot.setHeartbeatResponse(rebootResp);
}
- public RMResourceTrackerImpl getResourceTracker() {
- return resourceTracker;
+ private final ConcurrentMap<NodeId, HeartbeatResponse> lastHeartBeats
+ = new ConcurrentHashMap<NodeId, HeartbeatResponse>();
+
+ public ResourceTrackerService(RMContext rmContext,
+ NodesListManager nodesListManager,
+ NMLivelinessMonitor nmLivelinessMonitor,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ super(ResourceTrackerService.class.getName());
+ this.rmContext = rmContext;
+ this.nodesListManager = nodesListManager;
+ this.nmLivelinessMonitor = nmLivelinessMonitor;
+ this.containerTokenSecretManager = containerTokenSecretManager;
}
@Override
public synchronized void init(Configuration conf) {
- super.init(conf);
String resourceTrackerBindAddress =
conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
resourceTrackerAddress = NetUtils.createSocketAddr(resourceTrackerBindAddress);
- resourceTracker.init(conf);
+
+ RackResolver.init(conf);
+ super.init(conf);
}
@Override
@@ -93,12 +137,10 @@ implements ResourceTracker{
RMConfig.DEFAULT_RM_RESOURCE_TRACKER_THREADS));
this.server.start();
- resourceTracker.start();
}
@Override
public synchronized void stop() {
- resourceTracker.stop();
if (this.server != null) {
this.server.close();
}
@@ -108,38 +150,207 @@ implements ResourceTracker{
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
- RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
- RegisterNodeManagerResponse.class);
+
+ String host = request.getHost();
+ int cmPort = request.getContainerManagerPort();
+ int httpPort = request.getHttpPort();
+ Resource capability = request.getResource();
+
try {
- response.setRegistrationResponse(
- resourceTracker.registerNodeManager(
- request.getHost(), request.getContainerManagerPort(),
- request.getHttpPort(), request.getResource()));
+ // Check if this node is a 'valid' node
+ if (!this.nodesListManager.isValidNode(host)) {
+ LOG.info("Disallowed NodeManager from " + host);
+ throw new IOException("Disallowed NodeManager from " + host);
+ }
+
+ String node = host + ":" + cmPort;
+ NodeId nodeId = mayBeCreateAndGetNodeId(node);
+
+ createNewNode(nodeId, host, cmPort, httpPort, resolve(host), capability);
+
+ this.nmLivelinessMonitor.register(nodeId);
+
+ LOG.info("NodeManager from node " + host +
+ "(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
+ + "registered with capability: " + capability.getMemory()
+ + ", assigned nodeId " + nodeId.getId());
+
+ RegistrationResponse regResponse = recordFactory.newRecordInstance(
+ RegistrationResponse.class);
+ regResponse.setNodeId(nodeId);
+ SecretKey secretKey =
+ this.containerTokenSecretManager.createAndGetSecretKey(node);
+ regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setRegistrationResponse(regResponse);
+ return response;
} catch (IOException ioe) {
LOG.info("Exception in node registration from " + request.getHost(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
- return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
- NodeHeartbeatResponse response = recordFactory.newRecordInstance(
- NodeHeartbeatResponse.class);
+
+ NodeStatus remoteNodeStatus = request.getNodeStatus();
try {
- response.setHeartbeatResponse(
- resourceTracker.nodeHeartbeat(request.getNodeStatus()));
+ /**
+ * Here is the node heartbeat sequence...
+ * 1. Check if it's a registered node
+ * 2. Check if it's a valid (i.e. not excluded) node
+ * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
+ * 4. Send healthStatus to RMNode
+ */
+
+ NodeId nodeId = remoteNodeStatus.getNodeId();
+
+ // 1. Check if it's a registered node
+ RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+ if (rmNode == null) {
+ /* node does not exist */
+ LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
+ return reboot;
+ }
+
+ // Send ping
+ this.nmLivelinessMonitor.receivedPing(nodeId);
+
+ // 2. Check if it's a valid (i.e. not excluded) node
+ if (!this.nodesListManager.isValidNode(rmNode.getNodeHostName())) {
+ LOG.info("Disallowed NodeManager nodeId: " + nodeId +
+ " hostname: " + rmNode.getNodeAddress());
+ throw new IOException("Disallowed NodeManager nodeId: " +
+ remoteNodeStatus.getNodeId());
+ }
+
+ NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
+ .newRecordInstance(NodeHeartbeatResponse.class);
+
+ // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
+ if (remoteNodeStatus.getResponseId() + 1 == this.lastHeartBeats.get(nodeId)
+ .getResponseId()) {
+ LOG.info("Received duplicate heartbeat from node " +
+ rmNode.getNodeAddress());
+ nodeHeartBeatResponse.setHeartbeatResponse(this.lastHeartBeats
+ .get(nodeId));
+ return nodeHeartBeatResponse;
+ } else if (remoteNodeStatus.getResponseId() + 1 < this.lastHeartBeats
+ .get(nodeId).getResponseId()) {
+ LOG.info("Too far behind rm response id:" +
+ this.lastHeartBeats.get(nodeId).getResponseId() + " nm response id:"
+ + remoteNodeStatus.getResponseId());
+ // TODO: Just sending reboot is not enough. Think more.
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
+ return reboot;
+ }
+
+ // 4. Send status to RMNode
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
+ remoteNodeStatus.getAllContainers()));
+
+ // Heartbeat response
+ HeartbeatResponse response = recordFactory
+ .newRecordInstance(HeartbeatResponse.class);
+ response
+ .setResponseId(this.lastHeartBeats.get(nodeId).getResponseId() + 1);
+ response.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
+ response.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
+
+ // Save the response
+ this.lastHeartBeats.put(nodeId, response);
+ nodeHeartBeatResponse.setHeartbeatResponse(response);
+ return nodeHeartBeatResponse;
} catch (IOException ioe) {
LOG.info("Exception in heartbeat from node " +
request.getNodeStatus().getNodeId(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
- return response;
}
public void recover(RMState state) {
- resourceTracker.recover(state);
+//
+// List<RMNode> nodeManagers = state.getStoredNodeManagers();
+// for (RMNode nm : nodeManagers) {
+// createNewNode(nm.getNodeID(), nm.getNodeHostName(), nm
+// .getCommandPort(), nm.getHttpPort(), nm.getNode(), nm
+// .getTotalCapability());
+// }
+// for (Map.Entry<ApplicationId, ApplicationInfo> entry : state
+// .getStoredApplications().entrySet()) {
+// List<Container> containers = entry.getValue().getContainers();
+// List<Container> containersToAdd = new ArrayList<Container>();
+// for (Container c : containers) {
+// RMNode containerNode = this.rmContext.getNodesCollection()
+// .getNodeInfo(c.getNodeId());
+// containersToAdd.add(c);
+// containerNode.allocateContainer(entry.getKey(), containersToAdd);
+// containersToAdd.clear();
+// }
+// }
+ }
+
+ private void createNewNode(NodeId nodeId, String hostName, int cmPort,
+ int httpPort, Node node, Resource capability) throws IOException {
+
+ RMNode rmNode = new RMNodeImpl(nodeId, rmContext, hostName, cmPort,
+ httpPort, node, capability, this);
+
+ if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
+ throw new IOException("Duplicate registration from the node!");
+ }
+
+ // Record the new node
+ synchronized (nodes) {
+ LOG.info("DEBUG -- Adding " + hostName);
+ HeartbeatResponse response = recordFactory
+ .newRecordInstance(HeartbeatResponse.class);
+ response.setResponseId(0);
+ this.lastHeartBeats.put(nodeId, response);
+ nodes.put(rmNode.getNodeAddress(), nodeId);
+ }
+ }
+
+ @Override
+ public void RMNodeRemoved(NodeId nodeId) {
+ RMNode node = null;
+ synchronized (nodes) {
+ node = this.rmContext.getRMNodes().get(nodeId);
+ if (node != null) {
+ nodes.remove(node.getNodeAddress());
+ this.lastHeartBeats.remove(nodeId);
+ } else {
+ LOG.warn("Unknown node " + nodeId + " unregistered");
+ }
+ }
+
+ if (node != null) {
+ this.rmContext.getRMNodes().remove(nodeId);
+ }
+ }
+
+ private NodeId mayBeCreateAndGetNodeId(String node) {
+ NodeId nodeId;
+ nodeId = nodes.get(node);
+ if (nodeId == null) {
+ nodeId = recordFactory.newRecordInstance(NodeId.class);
+ nodeId.setId(nodeCounter.getAndIncrement());
+ }
+ return nodeId;
+ }
+
+ /**
+ * resolving the network topology.
+ * @param hostName the hostname of this node.
+ * @return the resolved {@link Node} for this nodemanager.
+ */
+ public static Node resolve(String hostName) {
+ return RackResolver.resolve(hostName);
}
}
Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Wed Aug 3 11:31:34 2011
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -42,12 +42,12 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -63,11 +63,11 @@ import org.apache.hadoop.yarn.security.A
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMFinishEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
/**
* The launch of the AM itself.
@@ -78,7 +78,7 @@ public class AMLauncher implements Runna
private ContainerManager containerMgrProxy;
- private final Application application;
+ private final RMAppAttempt application;
private final Configuration conf;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final ApplicationTokenSecretManager applicationTokenSecretManager;
@@ -89,7 +89,7 @@ public class AMLauncher implements Runna
private final EventHandler handler;
@SuppressWarnings("unchecked")
- public AMLauncher(RMContext asmContext, Application application,
+ public AMLauncher(RMContext rmContext, RMAppAttempt application,
AMLauncherEventType eventType,ApplicationTokenSecretManager applicationTokenSecretManager,
ClientToAMSecretManager clientToAMSecretManager, Configuration conf) {
this.application = application;
@@ -100,7 +100,7 @@ public class AMLauncher implements Runna
YarnConfiguration.YARN_SECURITY_INFO,
ContainerManagerSecurityInfo.class, SecurityInfo.class);
this.eventType = eventType;
- this.handler = asmContext.getDispatcher().getEventHandler();
+ this.handler = rmContext.getDispatcher().getEventHandler();
}
private void connect() throws IOException {
@@ -116,14 +116,16 @@ public class AMLauncher implements Runna
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + application.getMasterContainer()
- + " for AM " + application.getMaster());
+ + " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
request.setContainerLaunchContext(launchContext);
containerMgrProxy.startContainer(request);
LOG.info("Done launching container " + application.getMasterContainer()
- + " for AM " + application.getMaster());
+ + " for AM " + application.getAppAttemptId());
+ this.handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
+ RMAppAttemptEventType.LAUNCHED));
}
private void cleanup() throws IOException {
@@ -172,7 +174,8 @@ public class AMLauncher implements Runna
ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
container.addAllCommands(applicationMasterContext.getCommandList());
StringBuilder mergedCommand = new StringBuilder();
- String failCount = Integer.toString(application.getFailedCount());
+ String failCount = Integer.toString(application.getAppAttemptId()
+ .getAttemptId());
List<String> commandList = new ArrayList<String>();
for (String str : container.getCommandList()) {
// This is out-right wrong. AM FAIL count should be passed via env.
@@ -219,7 +222,7 @@ public class AMLauncher implements Runna
}
ApplicationTokenIdentifier id = new ApplicationTokenIdentifier(
- application.getApplicationID());
+ application.getAppAttemptId().getApplicationId());
Token<ApplicationTokenIdentifier> token =
new Token<ApplicationTokenIdentifier>(id,
this.applicationTokenSecretManager);
@@ -244,7 +247,7 @@ public class AMLauncher implements Runna
asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
- this.application.getApplicationID());
+ application.getAppAttemptId().getApplicationId());
SecretKey clientSecretKey =
this.clientToAMSecretManager.getMasterKey(identifier);
String encoded =
@@ -259,20 +262,22 @@ public class AMLauncher implements Runna
public void run() {
switch (eventType) {
case LAUNCH:
- ApplicationEventType targetEventType = ApplicationEventType.LAUNCHED;
try {
- LOG.info("Launching master" + application.getMaster());
+ LOG.info("Launching master" + application.getAppAttemptId());
launch();
+ handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
+ RMAppAttemptEventType.LAUNCHED));
} catch(Exception ie) {
- LOG.info("Error launching ", ie);
- targetEventType = ApplicationEventType.LAUNCH_FAILED;
+ String message = "Error launching " + application.getAppAttemptId()
+ + ". Got exception: " + StringUtils.stringifyException(ie);
+ LOG.info(message);
+ handler.handle(new RMAppAttemptLaunchFailedEvent(application
+ .getAppAttemptId(), message));
}
- handler.handle(new ApplicationEvent(targetEventType, application
- .getApplicationID()));
break;
case CLEANUP:
try {
- LOG.info("Cleaning master " + application.getMaster());
+ LOG.info("Cleaning master " + application.getAppAttemptId());
cleanup();
} catch(IOException ie) {
LOG.info("Error cleaning master ", ie);
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+public class AMLauncherEvent extends AbstractEvent<AMLauncherEventType> {
+
+ private final RMAppAttempt appAttempt;
+
+ public AMLauncherEvent(AMLauncherEventType type, RMAppAttempt appAttempt) {
+ super(type);
+ this.appAttempt = appAttempt;
+ }
+
+ public RMAppAttempt getAppAttempt() {
+ return this.appAttempt;
+ }
+
+}
Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEventType.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/AMLauncherEventType.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEventType.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEventType.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/AMLauncherEventType.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/AMLauncherEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEventType.java Wed Aug 3 11:31:34 2011
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events;
+package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
public enum AMLauncherEventType {
LAUNCH,
Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java Wed Aug 3 11:31:34 2011
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -27,13 +27,13 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.service.AbstractService;
-public class ApplicationMasterLauncher extends AbstractService implements EventHandler<ASMEvent<AMLauncherEventType>> {
+public class ApplicationMasterLauncher extends AbstractService implements
+ EventHandler<AMLauncherEvent> {
private static final Log LOG = LogFactory.getLog(
ApplicationMasterLauncher.class);
private final ThreadPoolExecutor launcherPool;
@@ -67,13 +67,13 @@ public class ApplicationMasterLauncher e
super.start();
}
- protected Runnable createRunnableLauncher(Application application, AMLauncherEventType event) {
+ protected Runnable createRunnableLauncher(RMAppAttempt application, AMLauncherEventType event) {
Runnable launcher = new AMLauncher(context, application, event,
applicationTokenSecretManager, clientToAMSecretManager, getConfig());
return launcher;
}
- private void launch(Application application) {
+ private void launch(RMAppAttempt application) {
Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.LAUNCH);
masterEvents.add(launcher);
}
@@ -106,15 +106,15 @@ public class ApplicationMasterLauncher e
}
}
- private void cleanup(Application application) {
+ private void cleanup(RMAppAttempt application) {
Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.CLEANUP);
masterEvents.add(launcher);
}
@Override
- public synchronized void handle(ASMEvent<AMLauncherEventType> appEvent) {
+ public synchronized void handle(AMLauncherEvent appEvent) {
AMLauncherEventType event = appEvent.getType();
- Application application = appEvent.getApplication();
+ RMAppAttempt application = appEvent.getAppAttempt();
switch (event) {
case LAUNCH:
launch(application);
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,295 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.ams;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+@Private
+public class ApplicationMasterService extends AbstractService implements
+AMRMProtocol, EventHandler<ApplicationMasterServiceEvent> {
+ private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
+ private final AMLivelinessMonitor amLivelinessMonitor;
+ private YarnScheduler rScheduler;
+ private ApplicationTokenSecretManager appTokenManager;
+ private InetSocketAddress masterServiceAddress;
+ private Server server;
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap =
+ new ConcurrentHashMap<ApplicationAttemptId, AMResponse>();
+ private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
+ private final RMContext rmContext;
+
+ public ApplicationMasterService(RMContext rmContext,
+ AMLivelinessMonitor amLivelinessMonitor,
+ ApplicationTokenSecretManager appTokenManager, YarnScheduler scheduler) {
+ super(ApplicationMasterService.class.getName());
+ this.amLivelinessMonitor = amLivelinessMonitor;
+ this.appTokenManager = appTokenManager;
+ this.rScheduler = scheduler;
+ this.reboot.setReboot(true);
+// this.reboot.containers = new ArrayList<Container>();
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ String bindAddress =
+ conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+ masterServiceAddress = NetUtils.createSocketAddr(bindAddress);
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ YarnRPC rpc = YarnRPC.create(getConfig());
+ Configuration serverConf = new Configuration(getConfig());
+ serverConf.setClass(
+ CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ SchedulerSecurityInfo.class, SecurityInfo.class);
+ this.server =
+ rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
+ serverConf, this.appTokenManager,
+ serverConf.getInt(RMConfig.RM_AM_THREADS,
+ RMConfig.DEFAULT_RM_AM_THREADS));
+ this.server.start();
+ super.start();
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnRemoteException {
+
+ ApplicationAttemptId applicationAttemptId = request
+ .getApplicationAttemptId();
+ AMResponse lastResponse = responseMap.get(applicationAttemptId);
+ if (lastResponse == null) {
+ String message = "Application doesn't exist in cache "
+ + applicationAttemptId;
+ LOG.error(message);
+ throw RPCUtil.getRemoteException(message);
+ }
+
+ // Allow only one thread in AM to do registerApp at a time.
+ synchronized (lastResponse) {
+
+ LOG.info("AM registration " + applicationAttemptId);
+ this.amLivelinessMonitor.receivedPing(applicationAttemptId);
+
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRegistrationEvent(applicationAttemptId, request
+ .getHost(), request.getRpcPort(), request.getTrackingUrl()));
+
+ // Pick up min/max resource from scheduler...
+ RegisterApplicationMasterResponse response = recordFactory
+ .newRecordInstance(RegisterApplicationMasterResponse.class);
+ response.setMinimumResourceCapability(rScheduler
+ .getMinimumResourceCapability());
+ response.setMaximumResourceCapability(rScheduler
+ .getMaximumResourceCapability());
+ return response;
+ }
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnRemoteException {
+
+ ApplicationAttemptId applicationAttemptId = request
+ .getApplicationAttemptId();
+ AMResponse lastResponse = responseMap.get(applicationAttemptId);
+ if (lastResponse == null) {
+ String message = "Application doesn't exist in cache "
+ + applicationAttemptId;
+ LOG.error(message);
+ throw RPCUtil.getRemoteException(message);
+ }
+
+ // Allow only one thread in AM to do finishApp at a time.
+ synchronized (lastResponse) {
+
+ this.amLivelinessMonitor.receivedPing(applicationAttemptId);
+
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
+ .getTrackingUrl(), request.getFinalState(), request
+ .getDiagnostics()));
+
+ FinishApplicationMasterResponse response = recordFactory
+ .newRecordInstance(FinishApplicationMasterResponse.class);
+ return response;
+ }
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnRemoteException {
+
+ ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
+ ApplicationId applicationId = appAttemptId.getApplicationId();
+
+ this.amLivelinessMonitor.receivedPing(appAttemptId);
+
+ /* check if its in cache */
+ AllocateResponse allocateResponse = recordFactory
+ .newRecordInstance(AllocateResponse.class);
+ AMResponse lastResponse = responseMap.get(applicationId);
+ if (lastResponse == null) {
+ LOG.error("Application doesnt exist in cache " + applicationId);
+ allocateResponse.setAMResponse(reboot);
+ return allocateResponse;
+ }
+ if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
+ /* old heartbeat */
+ allocateResponse.setAMResponse(lastResponse);
+ return allocateResponse;
+ } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
+ LOG.error("Invalid responseid from application " + applicationId);
+ // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
+ allocateResponse.setAMResponse(reboot);
+ return allocateResponse;
+ }
+
+ // Allow only one thread in AM to do heartbeat at a time.
+ synchronized (lastResponse) {
+
+ // Send the status update to the appAttempt.
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptStatusupdateEvent(appAttemptId, request
+ .getProgress()));
+
+ List<ResourceRequest> ask = request.getAskList();
+ List<Container> release = request.getReleaseList();
+
+ // Send new requests to appAttempt.
+ if (!ask.isEmpty()) {
+ this.rScheduler.allocate(appAttemptId, ask);
+ }
+
+ // Send events to the containers being released.
+ for (Container releasedContainer : release) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMContainerEvent(releasedContainer.getId(),
+ RMContainerEventType.RELEASED));
+ }
+
+ RMApp app = this.rmContext.getRMApps().get(applicationId);
+ RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
+
+ // Get the list of finished containers.
+ List<Container> finishedContainers = appAttempt
+ .pullJustFinishedContainers();
+
+ // Get the list of newly allocated containers.
+ List<Container> newlyAllocatedContainers = appAttempt
+ .pullNewlyAllocatedContainers();
+
+ // TODO: For now all containers are combined
+ List<Container> allContainers = new ArrayList<Container>(
+ finishedContainers);
+ allContainers.addAll(newlyAllocatedContainers);
+
+ AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+ response.addAllContainers(allContainers);
+ response.setResponseId(lastResponse.getResponseId() + 1);
+ response.setAvailableResources(rScheduler
+ .getResourceLimit(appAttemptId));
+ responseMap.put(appAttemptId, response);
+ allocateResponse.setAMResponse(response);
+ return allocateResponse;
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (this.server != null) {
+ this.server.close();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(ApplicationMasterServiceEvent amsEvent) {
+ ApplicationMasterServiceEventType eventType = amsEvent.getType();
+ ApplicationAttemptId attemptId = amsEvent.getAppAttemptId();
+ switch (eventType) {
+ case REGISTER:
+ AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+ response.setResponseId(0);
+ responseMap.put(attemptId, response);
+ break;
+ case UNREGISTER:
+ AMResponse lastResponse = responseMap.get(attemptId);
+ if (lastResponse != null) {
+ synchronized (lastResponse) {
+ responseMap.remove(attemptId);
+ }
+ }
+ break;
+ default:
+ LOG.error("Unknown event " + eventType + ". Ignoring..");
+ }
+ }
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,21 @@
+package org.apache.hadoop.yarn.server.resourcemanager.ams;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class ApplicationMasterServiceEvent extends
+ AbstractEvent<ApplicationMasterServiceEventType> {
+
+ private final ApplicationAttemptId attemptId;
+
+ public ApplicationMasterServiceEvent(ApplicationAttemptId attemptId,
+ ApplicationMasterServiceEventType type) {
+ super(type);
+ this.attemptId = attemptId;
+ }
+
+ public ApplicationAttemptId getAppAttemptId() {
+ return this.attemptId;
+ }
+
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEventType.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEventType.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterServiceEventType.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,6 @@
+package org.apache.hadoop.yarn.server.resourcemanager.ams;
+
+public enum ApplicationMasterServiceEventType {
+ REGISTER,
+ UNREGISTER
+}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java Wed Aug 3 11:31:34 2011
@@ -7,14 +7,12 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
/**
* Licensed to the Apache Software Foundation (ASF) under one
@@ -45,10 +43,10 @@ public class MemStore implements Store {
}
@Override
- public void storeNode(NodeManager node) throws IOException {}
+ public void storeNode(RMNode node) throws IOException {}
@Override
- public void removeNode(NodeManager node) throws IOException {}
+ public void removeNode(RMNode node) throws IOException {}
private class ApplicationStoreImpl implements ApplicationStore {
@Override
@@ -102,8 +100,8 @@ public class MemStore implements Store {
}
@Override
- public List<NodeManager> getStoredNodeManagers() {
- return new ArrayList<NodeManager>();
+ public List<RMNode> getStoredNodeManagers() {
+ return new ArrayList<RMNode>();
}
@Override
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java Wed Aug 3 11:31:34 2011
@@ -21,12 +21,12 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public interface NodeStore {
- public void storeNode(NodeManager node) throws IOException;
- public void removeNode(NodeManager node) throws IOException;
+ public void storeNode(RMNode node) throws IOException;
+ public void removeNode(RMNode node) throws IOException;
public NodeId getNextNodeId() throws IOException;
public boolean isLoggable();
}
\ No newline at end of file
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java Wed Aug 3 11:31:34 2011
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public interface Store extends NodeStore, ApplicationsStore {
@@ -37,7 +37,7 @@ public interface Store extends NodeStore
public List<Container> getContainers();
}
public interface RMState {
- public List<NodeManager> getStoredNodeManagers() ;
+ public List<RMNode> getStoredNodeManagers() ;
public Map<ApplicationId, ApplicationInfo> getStoredApplications();
public NodeId getLastLoggedNodeId();
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Wed Aug 3 11:31:34 2011
@@ -35,23 +35,23 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeManagerInfo;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationMasterPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeManagerInfoPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationMasterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeManagerInfoProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -105,22 +105,23 @@ public class ZKStore implements Store {
return new ZKWatcher();
}
- private NodeManagerInfoPBImpl createNodeManagerInfo(NodeManager nodeInfo) {
- NodeManagerInfo node =
- recordFactory.newRecordInstance(NodeManagerInfo.class);
+ private NodeReportPBImpl createNodeManagerInfo(RMNode nodeInfo) {
+ NodeReport node =
+ recordFactory.newRecordInstance(NodeReport.class);
node.setNodeAddress(nodeInfo.getNodeAddress());
node.setRackName(nodeInfo.getRackName());
node.setCapability(nodeInfo.getTotalCapability());
- node.setUsed(nodeInfo.getUsedResource());
+ // TODO: FIXME
+// node.setUsed(nodeInfo.getUsedResource());
node.setNumContainers(nodeInfo.getNumContainers());
- return (NodeManagerInfoPBImpl)node;
+ return (NodeReportPBImpl)node;
}
@Override
- public synchronized void storeNode(NodeManager node) throws IOException {
+ public synchronized void storeNode(RMNode node) throws IOException {
/** create a storage node and store it in zk **/
if (!doneWithRecovery) return;
- NodeManagerInfoPBImpl nodeManagerInfo = createNodeManagerInfo(node);
+ NodeReportPBImpl nodeManagerInfo = createNodeManagerInfo(node);
byte[] bytes = nodeManagerInfo.getProto().toByteArray();
try {
zkClient.create(NODES + Integer.toString(node.getNodeID().getId()), bytes, null,
@@ -135,7 +136,7 @@ public class ZKStore implements Store {
}
@Override
- public synchronized void removeNode(NodeManager node) throws IOException {
+ public synchronized void removeNode(RMNode node) throws IOException {
if (!doneWithRecovery) return;
/** remove a storage node **/
@@ -364,7 +365,7 @@ public class ZKStore implements Store {
}
private class ZKRMState implements RMState {
- private List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
+ private List<RMNode> nodeManagers = new ArrayList<RMNode>();
private Map<ApplicationId, ApplicationInfo> applications = new
HashMap<ApplicationId, ApplicationInfo>();
@@ -372,17 +373,17 @@ public class ZKStore implements Store {
LOG.info("Restoring RM state from ZK");
}
- private synchronized List<NodeManagerInfo> listStoredNodes() throws IOException {
+ private synchronized List<NodeReport> listStoredNodes() throws IOException {
/** get the list of nodes stored in zk **/
//TODO PB
- List<NodeManagerInfo> nodes = new ArrayList<NodeManagerInfo>();
+ List<NodeReport> nodes = new ArrayList<NodeReport>();
Stat stat = new Stat();
try {
List<String> children = zkClient.getChildren(NODES, false);
for (String child: children) {
byte[] data = zkClient.getData(NODES + child, false, stat);
- NodeManagerInfoPBImpl nmImpl = new NodeManagerInfoPBImpl(
- NodeManagerInfoProto.parseFrom(data));
+ NodeReportPBImpl nmImpl = new NodeReportPBImpl(
+ NodeReportProto.parseFrom(data));
nodes.add(nmImpl);
}
} catch (InterruptedException ie) {
@@ -396,7 +397,7 @@ public class ZKStore implements Store {
}
@Override
- public List<NodeManager> getStoredNodeManagers() {
+ public List<RMNode> getStoredNodeManagers() {
return nodeManagers;
}
@@ -453,10 +454,10 @@ public class ZKStore implements Store {
}
private void load() throws IOException {
- List<NodeManagerInfo> nodeInfos = listStoredNodes();
+ List<NodeReport> nodeInfos = listStoredNodes();
final Pattern trackerPattern = Pattern.compile(".*:.*");
final Matcher m = trackerPattern.matcher("");
- for (NodeManagerInfo node: nodeInfos) {
+ for (NodeReport node: nodeInfos) {
m.reset(node.getNodeAddress());
if (!m.find()) {
LOG.info("Skipping node, bad node-address " + node.getNodeAddress());
@@ -470,10 +471,10 @@ public class ZKStore implements Store {
continue;
}
int httpPort = Integer.valueOf(m.group(1));
- NodeManager nm = new NodeManagerImpl(node.getNodeId(),
+ RMNode nm = new RMNodeImpl(node.getNodeId(), null,
hostName, cmPort, httpPort,
- RMResourceTrackerImpl.resolve(node.getNodeAddress()),
- node.getCapability());
+ ResourceTrackerService.resolve(node.getNodeAddress()),
+ node.getCapability(), null);
nodeManagers.add(nm);
}
readLastNodeId();
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,40 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+public interface RMApp extends EventHandler<RMAppEvent>{
+
+ ApplicationId getApplicationId();
+
+ RMAppState getState();
+
+ String getUser();
+
+ float getProgress();
+
+ RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId);
+
+ String getQueue();
+
+ String getName();
+
+ RMAppAttempt getCurrentAppAttempt();
+
+ ApplicationReport createAndGetApplicationReport();
+
+ ApplicationStore getApplicationStore();
+
+ long getFinishTime();
+
+ long getStartTime();
+
+ String getTrackingUrl();
+
+ StringBuilder getDiagnostics();
+
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMAppEvent extends AbstractEvent<RMAppEventType>{
+
+ private final ApplicationId appId;
+
+ public RMAppEvent(ApplicationId appId, RMAppEventType type) {
+ super(type);
+ this.appId = appId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return this.appId;
+ }
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,15 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+public enum RMAppEventType {
+ // Source: ClientRMService
+ START,
+ KILL,
+
+ // Source: RMAppAttempt
+ APP_REJECTED,
+ APP_ACCEPTED,
+ ATTEMPT_REGISTERED,
+ ATTEMPT_FINISHED, // Will send the final state
+ ATTEMPT_FAILED,
+ ATTEMPT_KILLED
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,420 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+public class RMAppImpl implements RMApp {
+
+ private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
+
+ // Immutable fields
+ private final ApplicationId applicationId;
+ private final RMContext rmContext;
+ private final Configuration conf;
+ private final String user;
+ private final String queue;
+ private final String name;
+ private final ApplicationSubmissionContext submissionContext;
+ private final String clientTokenStr;
+ private final ApplicationStore appStore;
+ private final Dispatcher dispatcher;
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+ private final Map<ApplicationAttemptId, RMAppAttempt> attempts
+ = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
+
+ // Mutable fields
+ private long startTime;
+ private long finishTime;
+ private StringBuilder diagnostics;
+ private AMLivelinessMonitor amLivelinessMonitor;
+ private YarnScheduler scheduler;
+ private int maxRetries;
+ private RMAppAttempt currentAttempt;
+
+ private static final StateMachineFactory<RMAppImpl,
+ RMAppState,
+ RMAppEventType,
+ RMAppEvent> stateMachineFactory
+ = new StateMachineFactory<RMAppImpl,
+ RMAppState,
+ RMAppEventType,
+ RMAppEvent>(RMAppState.NEW)
+
+ // Transitions from NEW state
+ .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
+ RMAppEventType.START, new StartAppAttemptTransition())
+ .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
+ new NewAppKilledTransition())
+
+ // Transitions from SUBMITTED state
+ .addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
+ RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
+ RMAppEventType.APP_ACCEPTED)
+
+ // Transitions from ACCEPTED state
+ .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
+ RMAppEventType.ATTEMPT_REGISTERED)
+ .addTransition(RMAppState.ACCEPTED,
+ EnumSet.of(RMAppState.ACCEPTED, RMAppState.FAILED),
+ RMAppEventType.ATTEMPT_FAILED,
+ new AttemptFailedTransition(RMAppState.ACCEPTED))
+ .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
+ RMAppEventType.KILL)
+
+ // Transitions from RUNNING state
+ .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
+ RMAppEventType.ATTEMPT_FINISHED)
+ .addTransition(RMAppState.RUNNING,
+ EnumSet.of(RMAppState.RESTARTING, RMAppState.FAILED),
+ RMAppEventType.ATTEMPT_FAILED,
+ new AttemptFailedTransition(RMAppState.RUNNING))
+ .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
+ RMAppEventType.KILL)
+
+ // Transitions from RESTARTING state
+ .addTransition(RMAppState.RESTARTING, RMAppState.RUNNING,
+ RMAppEventType.ATTEMPT_REGISTERED)
+ .addTransition(RMAppState.RESTARTING,
+ EnumSet.of(RMAppState.RESTARTING, RMAppState.FAILED),
+ RMAppEventType.ATTEMPT_FAILED,
+ new AttemptFailedTransition(RMAppState.RESTARTING))
+ .addTransition(RMAppState.RESTARTING, RMAppState.KILLED,
+ RMAppEventType.KILL)
+
+ // Transitions from FINISHED state
+ .addTransition(RMAppState.FINISHED, RMAppState.RUNNING,
+ RMAppEventType.KILL)
+
+ // Transitions from FAILED state
+ .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+ RMAppEventType.KILL)
+
+ // Transitions from KILLED state
+ .addTransition(
+ RMAppState.KILLED,
+ RMAppState.KILLED,
+ EnumSet.of(RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED,
+ RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_KILLED))
+
+ .installTopology();
+
+ private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
+ stateMachine;
+
+ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
+ Configuration config, String name, String user, String queue,
+ ApplicationSubmissionContext submissionContext, String clientTokenStr,
+ ApplicationStore appStore, AMLivelinessMonitor amLivelinessMonitor,
+ YarnScheduler scheduler) {
+
+ this.applicationId = applicationId;
+ this.name = name;
+ this.rmContext = rmContext;
+ this.dispatcher = rmContext.getDispatcher();
+ this.conf = config;
+ this.user = user;
+ this.queue = queue;
+ this.submissionContext = submissionContext;
+ this.clientTokenStr = clientTokenStr;
+ this.appStore = appStore;
+ this.amLivelinessMonitor = amLivelinessMonitor;
+ this.scheduler = scheduler;
+
+ this.maxRetries = conf.getInt(RMConfig.AM_MAX_RETRIES,
+ RMConfig.DEFAULT_AM_MAX_RETRIES);
+
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.readLock = lock.readLock();
+ this.writeLock = lock.writeLock();
+
+ this.stateMachine = stateMachineFactory.make(this);
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ return this.getApplicationId();
+ }
+
+ @Override
+ public RMAppState getState() {
+ this.readLock.lock();
+
+ try {
+ return this.stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public String getUser() {
+ return this.user;
+ }
+
+ @Override
+ public float getProgress() {
+ this.readLock.lock();
+
+ try {
+ if (this.currentAttempt != null) {
+ return this.currentAttempt.getProgress();
+ }
+ return 0;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
+ this.readLock.lock();
+
+ try {
+ return this.attempts.get(appAttemptId);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public String getQueue() {
+ return this.queue;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public RMAppAttempt getCurrentAppAttempt() {
+ this.readLock.lock();
+
+ try {
+ return this.currentAttempt;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public ApplicationStore getApplicationStore() {
+ return this.appStore;
+ }
+
+ private ApplicationState createApplicationState(RMAppState rmAppState) {
+ switch(rmAppState) {
+ case NEW:
+ return ApplicationState.NEW;
+ case SUBMITTED:
+ case ACCEPTED:
+ return ApplicationState.SUBMITTED;
+ case RESTARTING:
+ return ApplicationState.RESTARTING;
+ case RUNNING:
+ return ApplicationState.RUNNING;
+ case FINISHED:
+ return ApplicationState.SUCCEEDED;
+ case KILLED:
+ return ApplicationState.KILLED;
+ case FAILED:
+ return ApplicationState.FAILED;
+ }
+ throw new YarnException("Unknown state passed!");
+ }
+
+ @Override
+ public ApplicationReport createAndGetApplicationReport() {
+ this.readLock.lock();
+
+ try {
+ return BuilderUtils.newApplicationReport(this.applicationId, this.user,
+ this.queue, this.name, this.currentAttempt.getHost(),
+ this.currentAttempt.getRpcPort(), this.currentAttempt
+ .getClientToken(), createApplicationState(this.stateMachine
+ .getCurrentState()), this.diagnostics.toString(),
+ this.currentAttempt.getTrackingUrl());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getFinishTime() {
+ this.readLock.lock();
+
+ try {
+ return this.finishTime;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getStartTime() {
+ this.readLock.lock();
+
+ try {
+ return this.startTime;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public String getTrackingUrl() {
+ this.readLock.lock();
+
+ try {
+ return this.currentAttempt.getTrackingUrl();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public StringBuilder getDiagnostics() {
+ this.readLock.lock();
+
+ try {
+ return this.diagnostics;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public void handle(RMAppEvent event) {
+
+ this.writeLock.lock();
+
+ try {
+ ApplicationId appID = event.getApplicationId();
+ LOG.info("Processing event for " + appID + " of type "
+ + event.getType());
+ final RMAppState oldState = getState();
+ try {
+ /* keep the master in sync with the state machine */
+ this.stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ /* TODO fail the application on the failed transition */
+ }
+
+ if (oldState != getState()) {
+ LOG.info(appID + " State change from " + oldState + " to "
+ + getState());
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private static class RMAppTransition implements
+ SingleArcTransition<RMAppImpl, RMAppEvent> {
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ };
+
+ }
+
+ private static final class StartAppAttemptTransition extends RMAppTransition {
+ public void transition(RMAppImpl app, RMAppEvent event) {
+
+ ApplicationAttemptId appAttemptId = Records
+ .newRecord(ApplicationAttemptId.class);
+ appAttemptId.setApplicationId(app.applicationId);
+ appAttemptId.setAttemptId(app.attempts.size() + 1);
+
+ RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
+ app.clientTokenStr, app.rmContext, app.scheduler, app.submissionContext);
+ app.attempts.put(appAttemptId, attempt);
+ app.currentAttempt = attempt;
+ app.dispatcher.getEventHandler().handle(
+ new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+ };
+ }
+
+ private static final class NewAppKilledTransition extends RMAppTransition {
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ app.diagnostics.append("Application killed by user.");
+ };
+ }
+
+ private static final class AppRejectedTransition extends
+ RMAppTransition {
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event;
+ app.diagnostics.append(rejectedEvent.getMessage());
+ };
+ }
+
+ private static final class AttemptFailedTransition implements
+ MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+
+ private final RMAppState initialState;
+
+ public AttemptFailedTransition(RMAppState initialState) {
+ this.initialState = initialState;
+ }
+
+ @Override
+ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+
+ if (app.attempts.size() == app.maxRetries) {
+ app.diagnostics.append("Application " + app.getApplicationId()
+ + " failed " + app.maxRetries
+ + " times. Failing the application.");
+ return RMAppState.FAILED;
+ }
+
+ ApplicationAttemptId appAttemptId = Records
+ .newRecord(ApplicationAttemptId.class);
+ appAttemptId.setApplicationId(app.applicationId);
+ appAttemptId.setAttemptId(app.attempts.size() + 1);
+
+ // Create a new attempt.
+ RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
+ app.clientTokenStr, app.rmContext, app.scheduler,
+ app.submissionContext);
+ app.attempts.put(appAttemptId, attempt);
+ app.currentAttempt = attempt;
+ app.dispatcher.getEventHandler().handle(
+ new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+ return initialState;
+ }
+
+ }
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,17 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppRejectedEvent extends RMAppEvent {
+
+ private final String message;
+
+ public RMAppRejectedEvent(ApplicationId appId, String message) {
+ super(appId, RMAppEventType.APP_REJECTED);
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return this.message;
+ }
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,5 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+public enum RMAppState {
+ NEW, SUBMITTED, ACCEPTED, RUNNING, RESTARTING, FINISHED, FAILED, KILLED
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+
+public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {
+
+ private EventHandler dispatcher;
+
+ public AMLivelinessMonitor(Dispatcher d) {
+ super("AMLivelinessMonitor", new SystemClock());
+ this.dispatcher = d.getEventHandler();
+ }
+
+ public void init(Configuration conf) {
+ super.init(conf);
+ setExpireInterval(conf.getInt(YarnConfiguration.AM_EXPIRY_INTERVAL,
+ RMConfig.DEFAULT_AM_EXPIRY_INTERVAL));
+ setMonitorInterval(conf.getInt(RMConfig.AMLIVELINESS_MONITORING_INTERVAL,
+ RMConfig.DEFAULT_AMLIVELINESS_MONITORING_INTERVAL));
+ }
+
+ @Override
+ protected void expire(ApplicationAttemptId id) {
+ dispatcher.handle(
+ new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,35 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
+
+ ApplicationAttemptId getAppAttemptId();
+
+ RMAppAttemptState getAppAttemptState();
+
+ String getHost();
+
+ int getRpcPort();
+
+ String getTrackingUrl();
+
+ String getClientToken();
+
+ StringBuilder getDiagnostics();
+
+ float getProgress();
+
+ List<Container> pullJustFinishedContainers();
+
+ List<Container> pullNewlyAllocatedContainers();
+
+ Container getMasterContainer();
+
+ ApplicationSubmissionContext getSubmissionContext();
+}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java Wed Aug 3 11:31:34 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMAppAttemptEvent extends AbstractEvent<RMAppAttemptEventType> {
+
+ private final ApplicationAttemptId appAttemptId;
+
+ public RMAppAttemptEvent(ApplicationAttemptId appAttemptId,
+ RMAppAttemptEventType type) {
+ super(type);
+ this.appAttemptId = appAttemptId;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return this.appAttemptId;
+ }
+}
|