Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1097605&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Thu Apr 28 20:51:21 2011
@@ -0,0 +1,326 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeManagerInfo;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationMasterPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeManagerInfoPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeManagerInfoProto;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class ZKStore implements Store {
+ private final Configuration conf;
+ private final ZooKeeper zkClient;
+ private static final Log LOG = LogFactory.getLog(ZKStore.class);
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private static final String NODES = "nodes/";
+ private static final String APPS = "apps/";
+ private static final String ZK_PATH_SEPARATOR = "/";
+ private static final String NODE_ID = "nodeid";
+ private static final String APP_MASTER = "master";
+ private static final String LAST_CONTAINER_ID = "last_containerid";
+ private final String ZK_ADDRESS;
+ private final int ZK_TIMEOUT;
+
+ /** TODO make this generic **/
+ private NodeIdPBImpl nodeId = new NodeIdPBImpl();
+
+ /**
+ * TODO fix this for later to handle all kinds of events
+ * of connection and session events.
+ *
+ */
+ private class ZKWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent arg0) {
+ }
+ }
+
+ public ZKStore(Configuration conf) throws IOException {
+ this.conf = conf;
+ this.ZK_ADDRESS = conf.get(YarnConfiguration.ZK_ADDRESS);
+ this.ZK_TIMEOUT = conf.getInt(YarnConfiguration.ZK_SESSION_TIMEOUT,
+ YarnConfiguration.DEFAULT_ZK_TIMEOUT);
+ zkClient = new ZooKeeper(this.ZK_ADDRESS,
+ this.ZK_TIMEOUT,
+ createZKWatcher()
+ );
+ this.nodeId.setId(0);
+ }
+
+ protected Watcher createZKWatcher() {
+ return new ZKWatcher();
+ }
+
+ private NodeManagerInfoPBImpl createNodeManagerInfo(NodeManager nodeInfo) {
+ NodeManagerInfo node =
+ recordFactory.newRecordInstance(NodeManagerInfo.class);
+ node.setNodeAddress(nodeInfo.getNodeAddress());
+ node.setRackName(nodeInfo.getRackName());
+ node.setCapability(nodeInfo.getTotalCapability());
+ node.setUsed(nodeInfo.getUsedResource());
+ node.setNumContainers(nodeInfo.getNumContainers());
+ return (NodeManagerInfoPBImpl)node;
+ }
+
+ @Override
+ public synchronized void storeNode(NodeManager node) throws IOException {
+ /** create a storage node and store it in zk **/
+ NodeManagerInfoPBImpl nodeManagerInfo = createNodeManagerInfo(node);
+ byte[] bytes = nodeManagerInfo.getProto().toByteArray();
+ try {
+ zkClient.create(NODES + Integer.toString(node.getNodeID().getId()), bytes, null,
+ CreateMode.PERSISTENT);
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException("Interrupted");
+ } catch(KeeperException ke) {
+ LOG.info("Keeper exception", ke);
+ throw convertToIOException(ke);
+ }
+ }
+
+ @Override
+ public synchronized void removeNode(NodeManager node) throws IOException {
+ /** remove a storage node **/
+ try {
+ zkClient.delete(NODES + Integer.toString(node.getNodeID().getId()), -1);
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException("Interrupted");
+ } catch(KeeperException ke) {
+ LOG.info("Keeper exception", ke);
+ throw convertToIOException(ke);
+ }
+
+ }
+
+ private static IOException convertToIOException(KeeperException ke) {
+ IOException io = new IOException();
+ io.setStackTrace(ke.getStackTrace());
+ return io;
+ }
+
+ @Override
+ public synchronized NodeId getNextNodeId() throws IOException {
+ int num = nodeId.getId();
+ num++;
+ nodeId.setId(num);
+ try {
+ zkClient.setData(NODES + NODE_ID, nodeId.getProto().toByteArray() , -1);
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ throw convertToIOException(ke);
+ }
+ return nodeId;
+ }
+
+ private String containerPathFromContainerId(ContainerId containerId) {
+ String appString = ConverterUtils.toString(containerId.getAppId());
+ return appString + "/" + containerId.getId();
+ }
+
+ @Override
+ public synchronized void storeContainer(Container container) throws IOException {
+ ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
+ try {
+ zkClient.create(APPS + containerPathFromContainerId(container.getId())
+ , containerPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ LOG.info("Keeper exception", ke);
+ throw convertToIOException(ke);
+ }
+ }
+
+ @Override
+ public synchronized void removeContainer(Container container) throws IOException {
+ ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
+ try {
+ zkClient.delete(APPS + containerPathFromContainerId(container.getId()),
+ -1);
+ } catch(InterruptedException ie) {
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ LOG.info("Keeper exception", ke);
+ throw convertToIOException(ke);
+ }
+ }
+
+ @Override
+ public synchronized void storeApplication(ApplicationId application, ApplicationSubmissionContext
+ context, ApplicationMaster master) throws IOException {
+ ApplicationSubmissionContextPBImpl contextPBImpl = (ApplicationSubmissionContextPBImpl) context;
+ String appString = APPS + ConverterUtils.toString(application);
+ ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
+
+ try {
+ zkClient.create(appString, contextPBImpl.getProto()
+ .toByteArray(), null, CreateMode.PERSISTENT);
+ zkClient.create(appString + ZK_PATH_SEPARATOR + APP_MASTER,
+ masterPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ LOG.info("Keeper exception", ke);
+ throw convertToIOException(ke);
+ }
+ }
+
+ @Override
+ public synchronized void removeApplication(ApplicationId application) throws IOException {
+ try {
+ zkClient.delete(APPS + ConverterUtils.toString(application), -1);
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ LOG.info("Keeper Exception", ke);
+ throw convertToIOException(ke);
+ }
+ }
+
+ @Override
+ public synchronized RMState restore() throws IOException {
+ ZKRMState rmState = new ZKRMState();
+ rmState.load();
+ return rmState;
+ }
+
+ private class ZKRMState implements RMState {
+ List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
+ List<ApplicationSubmissionContext> appSubmissionContexts = new
+ ArrayList<ApplicationSubmissionContext>();
+ List<ApplicationMaster> masters =
+ new ArrayList<ApplicationMaster>();
+ List<NodeManagerInfo> nodes = new ArrayList<NodeManagerInfo>();
+
+ public ZKRMState() {
+ LOG.info("Restoring RM state from ZK");
+ }
+
+ private synchronized List<NodeManagerInfo> listStoredNodes() throws IOException {
+ /** get the list of nodes stored in zk **/
+ //TODO PB
+ Stat stat = new Stat();
+ try {
+ List<String> children = zkClient.getChildren(NODES, false);
+ for (String child: children) {
+ byte[] data = zkClient.getData(NODES + child, false, stat);
+ NodeManagerInfoPBImpl nmImpl = new NodeManagerInfoPBImpl(
+ NodeManagerInfoProto.parseFrom(data));
+ nodes.add(nmImpl);
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted" , ie);
+ throw new InterruptedIOException("Interrupted");
+ } catch(KeeperException ke) {
+ LOG.error("Failed to list nodes", ke);
+ throw convertToIOException(ke);
+ }
+ return nodes;
+ }
+
+ @Override
+ public List<NodeManager> getStoredNodeManagers() throws IOException {
+ return nodeManagers;
+ }
+
+ @Override
+ public List<ApplicationSubmissionContext> getStoredSubmissionContexts() {
+ return appSubmissionContexts;
+ }
+
+ @Override
+ public NodeId getLastLoggedNodeId() {
+ return null;
+ }
+
+ private void readLastNodeId() throws IOException {
+ Stat stat = new Stat();
+ try {
+ byte[] data = zkClient.getData(NODES + NODE_ID, false, stat);
+ nodeId = new NodeIdPBImpl(NodeIdProto.parseFrom(data));
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ LOG.info("Keeper Exception", ke);
+ throw convertToIOException(ke);
+ }
+ }
+
+ private void load() throws IOException {
+ List<NodeManagerInfo> nodeInfos = listStoredNodes();
+ for (NodeManagerInfo node: nodeInfos) {
+ NodeManager nm = new NodeManagerImpl(node.getNodeId(),
+ node.getNodeAddress(), node.getHttpAddress(), RMResourceTrackerImpl
+ .resolve(node.getNodeAddress()), node.getCapability());
+ nodeManagers.add(nm);
+ }
+ readLastNodeId();
+ /* make sure we get all the containers */
+
+ }
+ @Override
+ public List<ApplicationMaster> getStoredAMs() throws IOException {
+ return masters;
+ }
+ }
+
+ @Override
+ public void updateApplicationState(ApplicationId applicationId,
+ ApplicationMaster master) throws IOException {
+
+ }
+}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java Thu Apr 28 20:51:21 2011
@@ -19,8 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
/**
* Node managers information on available resources
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java Thu Apr 28 20:51:21 2011
@@ -22,10 +22,10 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
public class NodeStatus {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Thu Apr 28 20:51:21 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.net.NetworkTopo
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -57,7 +58,6 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Thu Apr 28 20:51:21 2011
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationStatus;
import org.apache.hadoop.yarn.api.records.Container;
@@ -53,9 +54,6 @@ import org.apache.hadoop.yarn.server.res
@Evolving
public class Application {
private static final Log LOG = LogFactory.getLog(Application.class);
-
- private AtomicInteger containerCtr = new AtomicInteger(0);
-
final ApplicationId applicationId;
final Queue queue;
final String user;
@@ -75,11 +73,14 @@ public class Application {
/* Allocated by scheduler */
List<Container> allocated = new ArrayList<Container>();
Set<NodeInfo> applicationOnNodes = new HashSet<NodeInfo>();
+ ApplicationMaster master;
- public Application(ApplicationId applicationId, Queue queue, String user) {
+ public Application(ApplicationId applicationId, ApplicationMaster master,
+ Queue queue, String user) {
this.applicationId = applicationId;
this.queue = queue;
this.user = user;
+ this.master = master;
}
public ApplicationId getApplicationId() {
@@ -99,7 +100,9 @@ public class Application {
}
public int getNewContainerId() {
- return containerCtr.incrementAndGet();
+ int i = master.getContainerCount();
+ master.setContainerCount(++i);
+ return master.getContainerCount();
}
/**
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Thu Apr 28 20:51:21 2011
@@ -35,15 +35,15 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
/**
- * This class is used by ClusterInfo to keep track of all the applications/containers
+ * This class is used to keep track of all the applications/containers
* running on a node.
*
*/
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Thu Apr 28 20:51:21 2011
@@ -25,8 +25,8 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
/**
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Thu Apr 28 20:51:21 2011
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -50,12 +51,13 @@ public interface YarnScheduler {
/**
* A new application has been submitted to the ResourceManager
* @param applicationId application which has been submitted
+ * @param master the application master
* @param user application user
* @param queue queue to which the applications is being submitted
* @param priority application priority
*/
- public void addApplication(ApplicationId applicationId, String user,
- String queue, Priority priority)
+ public void addApplication(ApplicationId applicationId, ApplicationMaster master,
+ String user, String queue, Priority priority)
throws IOException;
/**
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Apr 28 20:51:21 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -213,7 +214,7 @@ implements ResourceScheduler, CapacitySc
}
@Override
- public void addApplication(ApplicationId applicationId,
+ public void addApplication(ApplicationId applicationId, ApplicationMaster master,
String user, String queueName, Priority priority)
throws IOException {
Queue queue = queues.get(queueName);
@@ -228,7 +229,7 @@ implements ResourceScheduler, CapacitySc
" submitted by user " + user + " to non-leaf queue: " + queueName);
}
- Application application = new Application(applicationId, queue, user);
+ Application application = new Application(applicationId, master, queue, user);
try {
queue.submitApplication(application, user, queueName, priority);
} catch (AccessControlException ace) {
@@ -410,7 +411,7 @@ implements ResourceScheduler, CapacitySc
switch(event.getType()) {
case ADD:
try {
- addApplication(event.getAppContext().getApplicationID(),
+ addApplication(event.getAppContext().getApplicationID(), event.getAppContext().getMaster(),
event.getAppContext().getUser(), event.getAppContext().getQueue(),
event.getAppContext().getSubmissionContext().getPriority());
} catch(IOException ie) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Apr 28 20:51:21 2011
@@ -218,13 +218,11 @@ public class ParentQueue implements Queu
@Override
public float getAbsoluteMaximumCapacity() {
- // TODO Auto-generated method stub
return 0;
}
@Override
public float getMaximumCapacity() {
- // TODO Auto-generated method stub
return 0;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Apr 28 20:51:21 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -204,11 +205,11 @@ public class FifoScheduler implements Re
}
@Override
- public synchronized void addApplication(ApplicationId applicationId,
+ public synchronized void addApplication(ApplicationId applicationId, ApplicationMaster master,
String user, String unusedQueue, Priority unusedPriority)
throws IOException {
applications.put(applicationId,
- new Application(applicationId, DEFAULT_QUEUE, user));
+ new Application(applicationId, master, DEFAULT_QUEUE, user));
LOG.info("Application Submission: " + applicationId.getId() + " from " + user +
", currently active: " + applications.size());
}
@@ -473,7 +474,8 @@ public class FifoScheduler implements Re
switch(event.getType()) {
case ADD:
try {
- addApplication(event.getAppContext().getApplicationID(), event.getAppContext().getUser(),
+ addApplication(event.getAppContext().getApplicationID(), event.getAppContext().getMaster(),
+ event.getAppContext().getUser(),
event.getAppContext().getQueue(), event.getAppContext().getSubmissionContext().getPriority());
} catch(IOException ie) {
LOG.error("Unable to add application " + event.getAppContext().getApplicationID(), ie);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Thu Apr 28 20:51:21 2011
@@ -23,11 +23,11 @@ import com.google.common.collect.Lists;
import java.util.List;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
/**
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Thu Apr 28 20:51:21 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java Thu Apr 28 20:51:21 2011
@@ -43,11 +43,12 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.After;
import org.junit.Before;
@@ -62,7 +63,7 @@ public class TestAMLaunchFailure extends
ApplicationTokenSecretManager applicationTokenSecretManager =
new ApplicationTokenSecretManager();
- private ASMContext context;
+ private RMContext context;
private static class DummyYarnScheduler implements YarnScheduler {
private Container container = recordFactory.newRecordInstance(Container.class);
@@ -74,11 +75,6 @@ public class TestAMLaunchFailure extends
}
@Override
- public void addApplication(ApplicationId applicationId, String user,
- String queue, Priority priority) throws IOException {
- }
-
- @Override
public void removeApplication(ApplicationId applicationId)
throws IOException {
}
@@ -94,6 +90,14 @@ public class TestAMLaunchFailure extends
public List<QueueUserACLInfo> getQueueUserAclInfo() {
return null;
}
+
+ @Override
+ public void addApplication(ApplicationId applicationId,
+ ApplicationMaster master, String user, String queue, Priority priority)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
}
private class DummyApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
@@ -111,7 +115,7 @@ public class TestAMLaunchFailure extends
private AtomicInteger notify = new AtomicInteger();
private AppContext app;
- public DummyApplicationMasterLauncher(ASMContext context) {
+ public DummyApplicationMasterLauncher(RMContext context) {
context.getDispatcher().register(AMLauncherEventType.class, this);
new TestThread().start();
}
@@ -163,9 +167,11 @@ public class TestAMLaunchFailure extends
@Before
public void setUp() {
- context = new ResourceManager.ASMContextImpl();
- asmImpl = new ExtApplicationsManagerImpl(applicationTokenSecretManager, scheduler);
+ context = new ResourceManager.RMContextImpl(new MemStore());
Configuration conf = new Configuration();
+ context.getDispatcher().init(conf);
+ context.getDispatcher().start();
+ asmImpl = new ExtApplicationsManagerImpl(applicationTokenSecretManager, scheduler);
new DummyApplicationTracker();
conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 3000L);
conf.setInt(YarnConfiguration.AM_MAX_RETRIES, 1);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java Thu Apr 28 20:51:21 2011
@@ -41,8 +41,9 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.After;
import org.junit.Before;
@@ -54,11 +55,11 @@ public class TestAMRMRPCResponseId exten
DummyApplicationsManager applicationsManager;
DummyScheduler scheduler;
- private ASMContext context;
+ private RMContext context;
private class DummyApplicationsManager extends ApplicationsManagerImpl {
public DummyApplicationsManager(
ApplicationTokenSecretManager applicationTokenSecretManager,
- YarnScheduler scheduler, ASMContext asmContext) {
+ YarnScheduler scheduler, RMContext asmContext) {
super(applicationTokenSecretManager, scheduler, asmContext);
}
@Override
@@ -84,11 +85,6 @@ public class TestAMRMRPCResponseId exten
}
@Override
- public void addApplication(ApplicationId applicationId, String user,
- String queue, Priority priority) throws IOException {
- }
-
- @Override
public void removeApplication(ApplicationId applicationId)
throws IOException {
}
@@ -104,11 +100,19 @@ public class TestAMRMRPCResponseId exten
public List<QueueUserACLInfo> getQueueUserAclInfo() {
return null;
}
+
+ @Override
+ public void addApplication(ApplicationId applicationId,
+ ApplicationMaster master, String user, String queue, Priority priority)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
}
@Before
public void setUp() {
- context = new ResourceManager.ASMContextImpl();
+ context = new ResourceManager.RMContextImpl(new MemStore());
scheduler = new DummyScheduler();
applicationsManager = new DummyApplicationsManager(new
ApplicationTokenSecretManager(), scheduler, context);
@@ -117,6 +121,8 @@ public class TestAMRMRPCResponseId exten
Configuration conf = new Configuration();
applicationsManager.init(conf);
amService.init(conf);
+ context.getDispatcher().init(conf);
+ context.getDispatcher().start();
}
@After
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Thu Apr 28 20:51:21 2011
@@ -12,7 +12,6 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationState;
@@ -31,13 +30,13 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
@@ -55,7 +54,7 @@ import org.junit.Test;
public class TestAMRestart extends TestCase {
private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
ApplicationsManagerImpl appImpl;
- ASMContext asmContext = new ResourceManager.ASMContextImpl();
+ RMContext asmContext = new ResourceManager.RMContextImpl(new MemStore());
ApplicationTokenSecretManager appTokenSecretManager =
new ApplicationTokenSecretManager();
DummyResourceScheduler scheduler;
@@ -75,7 +74,7 @@ public class TestAMRestart extends TestC
private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
public ExtApplicationsManagerImpl(
ApplicationTokenSecretManager applicationTokenSecretManager,
- YarnScheduler scheduler, ASMContext asmContext) {
+ YarnScheduler scheduler, RMContext asmContext) {
super(applicationTokenSecretManager, scheduler, asmContext);
}
@@ -183,10 +182,7 @@ public class TestAMRestart extends TestC
public void reinitialize(Configuration conf,
ContainerTokenSecretManager secretManager) {
}
- @Override
- public void addApplication(ApplicationId applicationId, String user,
- String queue, Priority priority) throws IOException {
- }
+
@Override
public void removeApplication(ApplicationId applicationId)
throws IOException {
@@ -201,6 +197,13 @@ public class TestAMRestart extends TestC
public List<QueueUserACLInfo> getQueueUserAclInfo() {
return null;
}
+ @Override
+ public void addApplication(ApplicationId applicationId,
+ ApplicationMaster master, String user, String queue, Priority priority)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
}
@Before
@@ -208,10 +211,13 @@ public class TestAMRestart extends TestC
appID = recordFactory.newRecordInstance(ApplicationId.class);
appID.setClusterTimestamp(System.currentTimeMillis());
appID.setId(1);
+ Configuration conf = new Configuration();
scheduler = new DummyResourceScheduler();
+ asmContext.getDispatcher().init(conf);
+ asmContext.getDispatcher().start();
asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext);
- Configuration conf = new Configuration();
+
conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
conf.setInt(YarnConfiguration.AM_MAX_RETRIES, maxFailures);
appImpl.init(conf);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java Thu Apr 28 20:51:21 2011
@@ -1,20 +1,20 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
@@ -25,6 +25,7 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationState;
@@ -36,12 +37,13 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -49,7 +51,7 @@ import org.junit.Test;
public class TestASMStateMachine extends TestCase {
private static final Log LOG = LogFactory.getLog(TestASMStateMachine.class);
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- ASMContext context = new ResourceManager.ASMContextImpl();
+ RMContext context = new ResourceManager.RMContextImpl(new MemStore());
EventHandler handler;
private boolean snreceivedCleanUp = false;
private boolean snAllocateReceived = false;
@@ -61,6 +63,8 @@ public class TestASMStateMachine extends
@Before
public void setUp() {
+ context.getDispatcher().init(new Configuration());
+ context.getDispatcher().start();
handler = context.getDispatcher().getEventHandler();
new DummyAMLaunchEventHandler();
new DummySNEventHandler();
@@ -79,237 +83,187 @@ public class TestASMStateMachine extends
public DummyAMLaunchEventHandler() {
context.getDispatcher().register(AMLauncherEventType.class, this);
- new Responder().start();
- }
-
- private class Responder extends Thread {
- public void run() {
- try {
- synchronized (amsync) {
- while(amsync.get() == 0) {
- amsync.wait();
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- context.getDispatcher().getEventHandler().handle(
- new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
- appcontext));
- synchronized(waitForState) {
- waitForState.addAndGet(1);
- waitForState.notify();
- }
}
- }
- @Override
- public void handle(ASMEvent<AMLauncherEventType> event) {
- switch(event.getType()) {
- case LAUNCH:
- launchCalled = true;
- appcontext = event.getAppContext();
- synchronized(amsync) {
- amsync.addAndGet(1);
- amsync.notify();
+ @Override
+ public void handle(ASMEvent<AMLauncherEventType> event) {
+ switch(event.getType()) {
+ case LAUNCH:
+ launchCalled = true;
+ appcontext = event.getAppContext();
+ context.getDispatcher().getEventHandler().handle(
+ new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
+ appcontext));
+ break;
+ case CLEANUP:
+ launchCleanupCalled = true;
+ break;
}
- break;
- case CLEANUP:
- launchCleanupCalled = true;
- break;
}
}
-}
-private class DummySNEventHandler implements EventHandler<ASMEvent<SNEventType>> {
- AppContext appContext;
- AtomicInteger snsync = new AtomicInteger(0);
-
- public DummySNEventHandler() {
- context.getDispatcher().register(SNEventType.class, this);
- new Responder().start();
- }
+ private class DummySNEventHandler implements EventHandler<ASMEvent<SNEventType>> {
+ AppContext appContext;
+ AtomicInteger snsync = new AtomicInteger(0);
- private class Responder extends Thread {
- public void run() {
- synchronized (snsync) {
- try {
- while (snsync.get() == 0) {
- snsync.wait();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- context.getDispatcher().getEventHandler().handle(
- new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED,
- appContext));
- synchronized(waitForState) {
- waitForState.addAndGet(1);
- waitForState.notify();
- }
+ public DummySNEventHandler() {
+ context.getDispatcher().register(SNEventType.class, this);
}
- }
- @Override
- public void handle(ASMEvent<SNEventType> event) {
- switch(event.getType()) {
- case CLEANUP:
- snreceivedCleanUp = true;
- break;
- case SCHEDULE:
- snAllocateReceived = true;
- appContext = event.getAppContext();
- synchronized (snsync) {
- snsync.addAndGet(1);
- snsync.notify();
+ @Override
+ public void handle(ASMEvent<SNEventType> event) {
+ switch(event.getType()) {
+ case CLEANUP:
+ snreceivedCleanUp = true;
+ break;
+ case SCHEDULE:
+ snAllocateReceived = true;
+ appContext = event.getAppContext();
+ context.getDispatcher().getEventHandler().handle(
+ new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED,
+ appContext));
+ break;
}
- break;
}
- }
-
-}
-private static class StatusContext implements AppContext {
- @Override
- public ApplicationSubmissionContext getSubmissionContext() {
- return null;
- }
- @Override
- public Resource getResource() {
- return null;
- }
- @Override
- public ApplicationId getApplicationID() {
- return null;
- }
- @Override
- public ApplicationStatus getStatus() {
- ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
- status.setLastSeen(-99);
- return status;
- }
- @Override
- public ApplicationMaster getMaster() {
- return null;
- }
- @Override
- public Container getMasterContainer() {
- return null;
}
- @Override
- public String getUser() {
- return null;
- }
- @Override
- public long getLastSeen() {
- return 0;
- }
- @Override
- public String getName() {
- return null;
- }
- @Override
- public String getQueue() {
- return null;
- }
- @Override
- public int getFailedCount() {
- return 0;
- }
-}
-private class ApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
- public ApplicationTracker() {
- context.getDispatcher().register(ApplicationTrackerEventType.class, this);
+ private static class StatusContext implements AppContext {
+ @Override
+ public ApplicationSubmissionContext getSubmissionContext() {
+ return null;
+ }
+ @Override
+ public Resource getResource() {
+ return null;
+ }
+ @Override
+ public ApplicationId getApplicationID() {
+ return null;
+ }
+ @Override
+ public ApplicationStatus getStatus() {
+ ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
+ status.setLastSeen(-99);
+ return status;
+ }
+ @Override
+ public ApplicationMaster getMaster() {
+ return null;
+ }
+ @Override
+ public Container getMasterContainer() {
+ return null;
+ }
+ @Override
+ public String getUser() {
+ return null;
+ }
+ @Override
+ public long getLastSeen() {
+ return 0;
+ }
+ @Override
+ public String getName() {
+ return null;
+ }
+ @Override
+ public String getQueue() {
+ return null;
+ }
+ @Override
+ public int getFailedCount() {
+ return 0;
+ }
}
- @Override
- public void handle(ASMEvent<ApplicationTrackerEventType> event) {
- switch (event.getType()) {
- case ADD:
- addedApplication = true;
- break;
- case REMOVE:
- removedApplication = true;
- break;
+ private class ApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+ public ApplicationTracker() {
+ context.getDispatcher().register(ApplicationTrackerEventType.class, this);
+ }
+
+ @Override
+ public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+ switch (event.getType()) {
+ case ADD:
+ addedApplication = true;
+ break;
+ case REMOVE:
+ removedApplication = true;
+ break;
+ }
}
}
-}
-private class MockAppplicationMasterInfo implements EventHandler<ASMEvent<ApplicationEventType>> {
+ private class MockAppplicationMasterInfo implements EventHandler<ASMEvent<ApplicationEventType>> {
- MockAppplicationMasterInfo() {
- context.getDispatcher().register(ApplicationEventType.class, this);
- }
- @Override
- public void handle(ASMEvent<ApplicationEventType> event) {
- LOG.info("The event type is " + event.getType());
+ MockAppplicationMasterInfo() {
+ context.getDispatcher().register(ApplicationEventType.class, this);
+ }
+ @Override
+ public void handle(ASMEvent<ApplicationEventType> event) {
+ LOG.info("The event type is " + event.getType());
+ }
}
-}
-public void waitForState(ApplicationState state, ApplicationMasterInfo info) {
- synchronized(waitForState) {
- try {
- while (waitForState.get() == 0) {
- waitForState.wait(10000L);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
+ private void waitForState( ApplicationState
+ finalState, ApplicationMasterInfo masterInfo) throws Exception {
+ int count = 0;
+ while(masterInfo.getState() != finalState && count < 10) {
+ Thread.sleep(500);
+ count++;
}
+ assertTrue(masterInfo.getState() == finalState);
+ }
+
+ /* Test the state machine.
+ *
+ */
+ @Test
+ public void testStateMachine() throws Exception {
+ ApplicationSubmissionContext submissioncontext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ submissioncontext.setApplicationId(recordFactory.newRecordInstance(ApplicationId.class));
+ submissioncontext.getApplicationId().setId(1);
+ submissioncontext.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
+
+ ApplicationMasterInfo masterInfo
+ = new ApplicationMasterInfo(context, "dummyuser", submissioncontext, "dummyToken");
+
+ context.getDispatcher().register(ApplicationEventType.class, masterInfo);
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
+ ALLOCATE, masterInfo));
+
+ waitForState(ApplicationState.ALLOCATED, masterInfo);
+ handler.handle(new ASMEvent<ApplicationEventType>(
+ ApplicationEventType.LAUNCH, masterInfo));
+
+ waitForState(ApplicationState.LAUNCHED, masterInfo);
+ Assert.assertTrue(snAllocateReceived);
+ Assert.assertTrue(launchCalled);
+ Assert.assertTrue(addedApplication);
+ handler.handle(new ASMEvent<ApplicationEventType>(
+ ApplicationEventType.REGISTERED, masterInfo));
+ waitForState(ApplicationState.RUNNING, masterInfo);
+ Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
+ handler.handle(new ASMEvent<ApplicationEventType>(
+ ApplicationEventType.STATUSUPDATE, new StatusContext()));
+
+ /* check if the state is still RUNNING */
+
+ Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
+
+ handler.handle(
+ new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH, masterInfo));
+ waitForState(ApplicationState.COMPLETED, masterInfo);
+ Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
+ /* check if clean up is called for everyone */
+ Assert.assertTrue(launchCleanupCalled);
+ Assert.assertTrue(snreceivedCleanUp);
+ Assert.assertTrue(removedApplication);
+
+ /* check if expiry doesnt make it failed */
+ handler.handle(
+ new ASMEvent<ApplicationEventType>(ApplicationEventType.EXPIRE, masterInfo));
+ Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
}
- Assert.assertEquals(state, info.getState());
-}
-
-/* Test the state machine.
- *
- */
-@Test
-public void testStateMachine() {
- ApplicationSubmissionContext submissioncontext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- submissioncontext.setApplicationId(recordFactory.newRecordInstance(ApplicationId.class));
- submissioncontext.getApplicationId().setId(1);
- submissioncontext.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
-
- ApplicationMasterInfo masterInfo
- = new ApplicationMasterInfo(handler, "dummyuser", submissioncontext, "dummyToken");
-
- context.getDispatcher().register(ApplicationEventType.class, masterInfo);
- handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
- ALLOCATE, masterInfo));
-
- waitForState(ApplicationState.ALLOCATED, masterInfo);
- waitForState.getAndDecrement();
- handler.handle(new ASMEvent<ApplicationEventType>(
- ApplicationEventType.LAUNCH, masterInfo));
-
- waitForState(ApplicationState.LAUNCHED, masterInfo);
- Assert.assertTrue(snAllocateReceived);
- Assert.assertTrue(launchCalled);
- Assert.assertTrue(addedApplication);
- handler.handle(new ASMEvent<ApplicationEventType>(
- ApplicationEventType.REGISTERED, masterInfo));
- Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
- handler.handle(new ASMEvent<ApplicationEventType>(
- ApplicationEventType.STATUSUPDATE, new StatusContext()));
-
- /* check if the state is still RUNNING */
-
- Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
-
- handler.handle(
- new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH, masterInfo));
-
- Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
- /* check if clean up is called for everyone */
- Assert.assertTrue(launchCleanupCalled);
- Assert.assertTrue(snreceivedCleanUp);
- Assert.assertTrue(removedApplication);
-
- /* check if expiry doesnt make it failed */
- handler.handle(
- new ASMEvent<ApplicationEventType>(ApplicationEventType.EXPIRE, masterInfo));
- Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
-}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java Thu Apr 28 20:51:21 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -45,14 +46,14 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
@@ -77,15 +78,18 @@ public class TestApplicationCleanup exte
private ExtASM asm;
private static final int memoryNeeded = 100;
- private final ASMContext context = new ResourceManager.ASMContextImpl();
+ private final RMContext context = new ResourceManager.RMContextImpl(new MemStore());
@Before
public void setUp() {
new DummyApplicationTracker();
scheduler = new FifoScheduler();
context.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
+ Configuration conf = new Configuration();
+ context.getDispatcher().init(conf);
+ context.getDispatcher().start();
asm = new ExtASM(new ApplicationTokenSecretManager(), scheduler);
- asm.init(new Configuration());
+ asm.init(conf);
}
@After
@@ -115,30 +119,8 @@ public class TestApplicationCleanup exte
private AtomicInteger notify = new AtomicInteger(0);
private AppContext appContext;
- public DummyApplicationMasterLauncher(ASMContext context) {
+ public DummyApplicationMasterLauncher(RMContext context) {
context.getDispatcher().register(AMLauncherEventType.class, this);
- new Responder().start();
- }
-
- private class Responder extends Thread {
- public void run() {
- synchronized(notify) {
- try {
- while (notify.get() == 0) {
- notify.wait();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- context.getDispatcher().getEventHandler().
- handle(new ASMEvent<ApplicationEventType>(
- ApplicationEventType.LAUNCHED, appContext));
- synchronized(waitForState) {
- waitForState.addAndGet(1);
- waitForState.notify();
- }
- }
}
@Override
@@ -152,11 +134,9 @@ public class TestApplicationCleanup exte
LOG.info("Launcher Launch called");
launcherLaunchCalled = true;
appContext = appEvent.getAppContext();
- synchronized (notify) {
- notify.addAndGet(1);
- notify.notify();
- LOG.info("Done notifying launcher ");
- }
+ context.getDispatcher().getEventHandler().
+ handle(new ASMEvent<ApplicationEventType>(
+ ApplicationEventType.LAUNCHED, appContext));
break;
default:
break;
@@ -167,27 +147,8 @@ public class TestApplicationCleanup exte
private class DummySchedulerNegotiator implements EventHandler<ASMEvent<SNEventType>> {
private AtomicInteger snnotify = new AtomicInteger(0);
AppContext acontext;
- public DummySchedulerNegotiator(ASMContext context) {
+ public DummySchedulerNegotiator(RMContext context) {
context.getDispatcher().register(SNEventType.class, this);
- new Responder().start();
- }
-
- private class Responder extends Thread {
- public void run() {
- LOG.info("Waiting for notify");
- synchronized(snnotify) {
- try {
- while(snnotify.get() == 0) {
- snnotify.wait();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- context.getDispatcher().getEventHandler().
- handle(new ASMEvent<ApplicationEventType>(
- ApplicationEventType.ALLOCATED, acontext));
- }
}
@Override
@@ -200,11 +161,9 @@ public class TestApplicationCleanup exte
case SCHEDULE:
schedulerScheduleCalled = true;
acontext = appEvent.getAppContext();
- LOG.info("Schedule received");
- synchronized(snnotify) {
- snnotify.addAndGet(1);
- snnotify.notify();
- }
+ context.getDispatcher().getEventHandler().
+ handle(new ASMEvent<ApplicationEventType>(
+ ApplicationEventType.ALLOCATED, acontext));
default:
break;
}
@@ -229,19 +188,17 @@ public class TestApplicationCleanup exte
}
}
-
- private void waitForState(ApplicationState state, ApplicationMasterInfo masterInfo) {
- synchronized(waitForState) {
- try {
- while(waitForState.get() == 0) {
- waitForState.wait(10000L);
- }
- } catch (InterruptedException e) {
- LOG.info("Interrupted thread " , e);
- }
+
+ private void waitForState(ApplicationState
+ finalState, ApplicationMasterInfo masterInfo) throws Exception {
+ int count = 0;
+ while(masterInfo.getState() != finalState && count < 10) {
+ Thread.sleep(500);
+ count++;
}
- Assert.assertEquals(state, masterInfo.getState());
+ assertTrue(masterInfo.getState() == finalState);
}
+
private ResourceRequest createNewResourceRequest(int capability, int i) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
@@ -300,6 +257,9 @@ public class TestApplicationCleanup exte
(firstNodeMemory - (2*memoryNeeded)));
ApplicationMasterInfo masterInfo = asm.getApplicationMasterInfo(appID);
asm.finishApplication(appID);
+ while (asm.launcherCleanupCalled != true) {
+ Thread.sleep(500);
+ }
assertTrue(asm.launcherCleanupCalled == true);
assertTrue(asm.launcherLaunchCalled == true);
assertTrue(asm.schedulerCleanupCalled == true);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java Thu Apr 28 20:51:21 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.Assert;
import junit.framework.TestCase;
@@ -25,18 +27,20 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +54,7 @@ public class TestApplicationMasterExpiry
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
AMTracker tracker;
- private final ASMContext context = new ResourceManager.ASMContextImpl();
+ private final RMContext context = new ResourceManager.RMContextImpl(new MemStore());
@Before
public void setUp() {
@@ -60,6 +64,8 @@ public class TestApplicationMasterExpiry
new ApplicationEventTypeListener();
tracker = new AMTracker(context);
Configuration conf = new Configuration();
+ context.getDispatcher().init(conf);
+ context.getDispatcher().start();
conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
tracker.init(conf);
tracker.start();
@@ -79,7 +85,7 @@ public class TestApplicationMasterExpiry
}
}
- private Object expiry = new Object();
+ private AtomicInteger expiry = new AtomicInteger();
private boolean expired = false;
private class ApplicationEventTypeListener implements EventHandler<ASMEvent<ApplicationEventType>> {
@@ -93,7 +99,7 @@ public class TestApplicationMasterExpiry
expired = true;
LOG.info("Received expiry from application " + event.getAppContext().getApplicationID());
synchronized(expiry) {
- expiry.notify();
+ expiry.addAndGet(1);
}
}
}
@@ -117,6 +123,16 @@ public class TestApplicationMasterExpiry
}
}
+ private void waitForState(ApplicationMasterInfo masterInfo, ApplicationState
+ finalState) throws Exception {
+ int count = 0;
+ while(masterInfo.getState() != finalState && count < 10) {
+ Thread.sleep(500);
+ count++;
+ }
+ assertTrue(masterInfo.getState() == finalState);
+ }
+
@Test
public void testAMExpiry() throws Exception {
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
@@ -130,10 +146,13 @@ public class TestApplicationMasterExpiry
ApplicationMasterInfo masterInfo = tracker.get(context.getApplicationId());
this.context.getDispatcher().getEventHandler().handle(
new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED, masterInfo));
+ waitForState(masterInfo, ApplicationState.LAUNCHING);
this.context.getDispatcher().getEventHandler().handle(
new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED, masterInfo));
synchronized(expiry) {
- expiry.wait(10000);
+ while (expiry.get() == 0) {
+ expiry.wait(1000);
+ }
}
Assert.assertTrue(expired);
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java Thu Apr 28 20:51:21 2011
@@ -33,10 +33,11 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -58,7 +59,7 @@ public class TestApplicationMasterLaunch
Object doneLaunching = new Object();
AtomicInteger launched = new AtomicInteger();
AtomicInteger cleanedUp = new AtomicInteger();
- private ASMContext context = new ResourceManager.ASMContextImpl();
+ private RMContext context = new ResourceManager.RMContextImpl(new MemStore());
private class DummyASM implements EventHandler<ASMEvent<ApplicationEventType>> {
@Override
@@ -126,9 +127,13 @@ public class TestApplicationMasterLaunch
public void setUp() {
asmHandle = new DummyASM();
amLauncher = new DummyApplicationMasterLauncher(applicationTokenSecretManager,
- clientToAMSecretManager, asmHandle);
- amLauncher.init(new Configuration());
+ clientToAMSecretManager, asmHandle);
+ Configuration conf = new Configuration();
+ context.getDispatcher().init(conf);
+ amLauncher.init(conf);
+ context.getDispatcher().start();
amLauncher.start();
+
}
@After
@@ -143,8 +148,7 @@ public class TestApplicationMasterLaunch
context.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
context.getApplicationId().setId(1);
context.setUser("dummyuser");
- ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(this.context.
- getDispatcher().getEventHandler(),
+ ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(this.context,
"dummyuser", context,
"dummyclienttoken");
amLauncher.handle(new ASMEvent<AMLauncherEventType>(AMLauncherEventType.LAUNCH,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1097605&r1=1097604&r2=1097605&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Thu Apr 28 20:51:21 2011
@@ -23,9 +23,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
@@ -33,17 +36,16 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
@@ -53,16 +55,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
public class TestSchedulerNegotiator extends TestCase {
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private SchedulerNegotiator schedulerNegotiator;
private DummyScheduler scheduler;
private final int testNum = 99999;
- private final ASMContext context = new ResourceManager.ASMContextImpl();
+ private final RMContext context = new ResourceManager.RMContextImpl(new MemStore());
ApplicationMasterInfo masterInfo;
private EventHandler handler;
@@ -97,10 +96,7 @@ public class TestSchedulerNegotiator ext
@Override
public void handle(ASMEvent<ApplicationTrackerEventType> event) {
}
- @Override
- public void addApplication(ApplicationId applicationId, String user,
- String queue, Priority priority) throws IOException {
- }
+
@Override
public void removeApplication(ApplicationId applicationId)
throws IOException {
@@ -115,15 +111,25 @@ public class TestSchedulerNegotiator ext
public List<QueueUserACLInfo> getQueueUserAclInfo() {
return null;
}
+ @Override
+ public void addApplication(ApplicationId applicationId,
+ ApplicationMaster master, String user, String queue, Priority priority)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
}
@Before
public void setUp() {
scheduler = new DummyScheduler();
schedulerNegotiator = new SchedulerNegotiator(context, scheduler);
- schedulerNegotiator.init(new Configuration());
+ Configuration conf = new Configuration();
+ schedulerNegotiator.init(conf);
schedulerNegotiator.start();
handler = context.getDispatcher().getEventHandler();
+ context.getDispatcher().init(conf);
+ context.getDispatcher().start();
}
@After
@@ -144,6 +150,12 @@ public class TestSchedulerNegotiator ext
Assert.assertEquals(state, info.getState());
}
+ private class DummyEventHandler implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+ @Override
+ public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+ }
+ }
+
@Test
public void testSchedulerNegotiator() throws Exception {
ApplicationSubmissionContext submissionContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
@@ -152,9 +164,10 @@ public class TestSchedulerNegotiator ext
submissionContext.getApplicationId().setId(1);
masterInfo =
- new ApplicationMasterInfo(this.context.getDispatcher().getEventHandler(),
+ new ApplicationMasterInfo(this.context,
"dummy", submissionContext, "dummyClientToken");
context.getDispatcher().register(ApplicationEventType.class, masterInfo);
+ context.getDispatcher().register(ApplicationTrackerEventType.class, masterInfo);
handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
ALLOCATE, masterInfo));
waitForState(ApplicationState.ALLOCATED, masterInfo);
|