hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [36/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ResourceContext;
+import org.apache.hadoop.yarn.webapp.WebApp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+
+/**
+ * The RM webapp
+ */
+public class RMWebApp extends WebApp {
+  static final String APP_ID = "app.id";
+  static final String QUEUE_NAME = "queue.name";
+
+  private final ResourceManager rm;
+
+  public RMWebApp(ResourceManager rm) {
+    this.rm = rm;
+  }
+
+  @Override
+  public void setup() {
+    if (rm != null) {
+      bind(ResourceManager.class).toInstance(rm);
+      bind(ApplicationsManager.class).toInstance(rm.getApplicationsManager());
+      bind(ResourceContext.class).toInstance(rm.getResourceTracker());
+    }
+    route("/", RmController.class);
+    route("/nodes", RmController.class, "nodes");
+    route("/apps", RmController.class);
+    route("/cluster", RmController.class, "info");
+    route(pajoin("/app", APP_ID), RmController.class, "app");
+    route("/scheduler", RmController.class, "scheduler");
+    route(pajoin("/queue", QUEUE_NAME), RmController.class, "queue");
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,122 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Inject;
+
+import java.util.Date;
+
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.yarn.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp.*;
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+
+// Do NOT rename/refactor this to RMView as it will wreak havoc
+// on Mac OS HFS as its case-insensitive!
+public class RmController extends Controller {
+  @Inject RmController(RequestContext ctx) { super(ctx); }
+
+  @Override public void index() {
+    setTitle("Applications");
+  }
+
+  public void info() {
+    setTitle("About the Cluster");
+    long ts = ResourceManager.clusterTimeStamp;
+    ResourceManager rm = injector().getInstance(ResourceManager.class);
+    info("Cluster overview").
+      _("Cluster ID:", ts).
+      _("ResourceManager state:", rm.getServiceState()).
+      _("ResourceManager started on:", new Date(ts)).
+      _("ResourceManager version:", "FIXAPI: 1.0-SNAPSHOT").
+      _("Hadoop version:", VersionInfo.getBuildVersion());
+    render(InfoPage.class);
+  }
+
+  public void app() {
+    String aid = $(APP_ID);
+    if (aid.isEmpty()) {
+      setStatus(response().SC_BAD_REQUEST);
+      setTitle("Bad request: requires application ID");
+      return;
+    }
+    ApplicationID appID = Apps.toAppID(aid);
+    ApplicationsManager asm = injector().getInstance(ApplicationsManager.class);
+    Application app = asm.getApplication(appID);
+    if (app == null) {
+      // TODO: handle redirect to jobhistory server
+      setStatus(response().SC_NOT_FOUND);
+      setTitle("Application not found: "+ aid);
+      return;
+    }
+    setTitle(join("Application ", aid));
+    CharSequence master = app.master();
+    String ui = master == null ? "UNASSIGNED"
+                               : join(master, ':', app.httpPort());
+
+    ResponseInfo info = info("Application Overview").
+      _("User:", app.user()).
+      _("Name:", app.name()).
+      _("State:", app.state()).
+      _("Started:", "FIXAPI!").
+      _("Elapsed:", "FIXAPI!").
+      _("Master UI:", master == null ? "#" : join("http://", ui), ui);
+    if (app.isFinished()) {
+      info._("History:", "FIXAPI!");
+    }
+    render(InfoPage.class);
+  }
+
+  public void nodes() {
+    render(NodesPage.class);
+  }
+
+  public void scheduler() {
+    ResourceManager rm = injector().getInstance(ResourceManager.class);
+    ResourceScheduler rs = rm.getResourceScheduler();
+    if (rs == null || rs instanceof CapacityScheduler) {
+      setTitle("Capacity Scheduler");
+      render(CapacitySchedulerPage.class);
+      return;
+    }
+    setTitle("Default Scheduler");
+    render(DefaultSchedulerPage.class);
+  }
+
+  public void queue() {
+    setTitle(join("Queue ", get(QUEUE_NAME, "unknown")));
+  }
+
+  public void submit() {
+    setTitle("Application Submission Not Allowed");
+  }
+
+  public void json() {
+    renderJSON(AppsList.class);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,80 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.TwoColumnLayout;
+
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+// Do NOT rename/refactor this to RMView as it will wreak havoc
+// on Mac OS HFS
+public class RmView extends TwoColumnLayout {
+  static final int MAX_DISPLAY_ROWS = 100;  // direct table rendering
+  static final int MAX_FAST_ROWS = 1000;    // inline js array
+  static final int MAX_INLINE_ROWS = 2000;  // ajax load
+
+  @Override
+  protected void preHead(Page.HTML<_> html) {
+    commonPreHead(html);
+    set(DATATABLES_ID, "apps");
+    set(initID(DATATABLES, "apps"), appsTableInit());
+    setTableStyles(html, "apps");
+  }
+
+  protected void commonPreHead(Page.HTML<_> html) {
+    html.meta_http("refresh", "20");
+    set(ACCORDION_ID, "nav");
+    set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
+    set(THEMESWITCHER_ID, "themeswitcher");
+  }
+
+  @Override
+  protected Class<? extends SubView> nav() {
+    return NavBlock.class;
+  }
+
+  @Override
+  protected Class<? extends SubView> content() {
+    return AppsBlock.class;
+  }
+
+  private String appsTableInit() {
+    AppsList list = injector().getInstance(AppsList.class);
+    StringBuilder init = tableInit().
+        append(", aoColumns:[{sType:'title-numeric'}, null, null, null, null,").
+        append("{sType:'title-numeric', bSearchable:false}, null]");
+    String rows = $("rowlimit");
+    int rowLimit = rows.isEmpty() ? MAX_DISPLAY_ROWS : Integer.parseInt(rows);
+    if (list.apps.size() < rowLimit) {
+      list.rendering = Render.HTML;
+      return init.append('}').toString();
+    }
+    if (list.apps.size() > MAX_FAST_ROWS) {
+      tableInitProgress(init, list.apps.size() * 6);
+    }
+    if (list.apps.size() > MAX_INLINE_ROWS) {
+      list.rendering = Render.JS_LOAD;
+      return init.append(", sAjaxSource:'").append(url("apps", "json")).
+          append("'}").toString();
+    }
+    list.rendering = Render.JS_ARRAY;
+    return init.append(", aaData:appsData}").toString();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,407 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+
+@Private
+public class Application {
+  private static final Log LOG = LogFactory.getLog(Application.class);
+  
+  private AtomicInteger taskCounter = new AtomicInteger(0);
+
+  final private String user;
+  final private String queue;
+  final private ApplicationID applicationId;
+  final private ResourceManager resourceManager;
+  
+  final private Map<Priority, Resource> requestSpec = 
+    new TreeMap<Priority, Resource>(
+        new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+  
+  final private Map<Priority, Map<String, ResourceRequest>> requests = 
+    new TreeMap<Priority, Map<String, ResourceRequest>>(
+        new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+  
+  final Map<Priority, Set<Task>> tasks = 
+    new TreeMap<Priority, Set<Task>>(
+        new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+  
+  final private Set<ResourceRequest> ask = 
+    new TreeSet<ResourceRequest>(
+        new org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceRequest.Comparator());
+  final private Set<Container> release = 
+    new TreeSet<Container>(
+        new org.apache.hadoop.yarn.server.resourcemanager.resource.Container.Comparator());
+
+  final private Map<String, NodeManager> nodes = 
+    new HashMap<String, NodeManager>();
+  
+  Resource used = new Resource();
+  
+  public Application(String user, ResourceManager resourceManager)
+      throws AvroRemoteException {
+    this(user, "default", resourceManager);
+  }
+  
+  public Application(String user, String queue, ResourceManager resourceManager)
+  throws AvroRemoteException {
+    this.user = user;
+    this.queue = queue;
+    this.resourceManager = resourceManager;
+    this.applicationId =
+      this.resourceManager.getApplicationsManager().getNewApplicationID();
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public ApplicationID getApplicationId() {
+    return applicationId;
+  }
+
+  public static String resolve(String hostName) {
+    return NetworkTopology.DEFAULT_RACK;
+  }
+  
+  public int getNextTaskId() {
+    return taskCounter.incrementAndGet();
+  }
+  
+  public Resource getUsedResources() {
+    return used;
+  }
+  
+  public synchronized void submit() throws IOException {
+    ApplicationSubmissionContext context = new ApplicationSubmissionContext();
+    context.applicationId = applicationId;
+    context.user = this.user;
+    context.queue = this.queue;
+    resourceManager.getApplicationsManager().submitApplication(context);
+  }
+  
+  public synchronized void addResourceRequestSpec(
+      Priority priority, Resource capability) {
+    Resource currentSpec = requestSpec.put(priority, capability);
+    if (currentSpec != null) {
+      throw new IllegalStateException("Resource spec already exists for " +
+      		"priority " + priority.priority + " - " + currentSpec.memory);
+    }
+  }
+  
+  public synchronized void addNodeManager(String host, NodeManager nodeManager) {
+    nodes.put(host, nodeManager);
+  }
+  
+  private synchronized NodeManager getNodeManager(String host) {
+    return nodes.get(host);
+  }
+  
+  public synchronized void addTask(Task task) {
+    Priority priority = task.getPriority();
+    Map<String, ResourceRequest> requests = this.requests.get(priority);
+    if (requests == null) {
+      requests = new HashMap<String, ResourceRequest>();
+      this.requests.put(priority, requests);
+      LOG.info("DEBUG --- Added" +
+      		" priority=" + priority + 
+      		" application=" + applicationId);
+    }
+    
+    final Resource capability = requestSpec.get(priority);
+    
+    // Note down the task
+    Set<Task> tasks = this.tasks.get(priority);
+    if (tasks == null) {
+      tasks = new HashSet<Task>();
+      this.tasks.put(priority, tasks);
+    }
+    tasks.add(task);
+    
+    LOG.info("Added task " + task.getTaskId() + " to application " + 
+        applicationId + " at priority " + priority);
+    
+    LOG.info("DEBUG --- addTask:" +
+    		" application=" + applicationId + 
+    		" #asks=" + ask.size());
+    
+    // Create resource requests
+    for (String host : task.getHosts()) {
+      // Data-local
+      addResourceRequest(priority, requests, host, capability);
+    }
+        
+    // Rack-local
+    for (String rack : task.getRacks()) {
+      addResourceRequest(priority, requests, rack, capability);
+    }
+      
+    // Off-switch
+    addResourceRequest(priority, requests, 
+        org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager.ANY, 
+        capability);
+  }
+  
+  public synchronized void finishTask(Task task) throws IOException {
+    Set<Task> tasks = this.tasks.get(task.getPriority());
+    if (!tasks.remove(task)) {
+      throw new IllegalStateException(
+          "Finishing unknown task " + task.getTaskId() + 
+          " from application " + applicationId);
+    }
+    
+    NodeManager nodeManager = task.getNodeManager();
+    ContainerID containerId = task.getContainerId();
+    task.stop();
+    nodeManager.stopContainer(containerId);
+    
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        used, requestSpec.get(task.getPriority()));
+    
+    LOG.info("Finished task " + task.getTaskId() + 
+        " of application " + applicationId + 
+        " on node " + nodeManager.getHostName() + 
+        ", currently using " + used + " resources");
+  }
+  
+  private synchronized void addResourceRequest(
+      Priority priority, Map<String, ResourceRequest> requests, 
+      String resourceName, Resource capability) {
+    ResourceRequest request = requests.get(resourceName);
+    if (request == null) {
+      request = 
+        org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceRequest.create(
+            priority, resourceName, capability, 1);
+      requests.put(resourceName, request);
+    } else {
+      ++request.numContainers;
+    }
+    
+    // Note this down for next interaction with ResourceManager
+    ask.remove(request);
+    ask.add(
+        org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceRequest.create(
+            request)); // clone to ensure the RM doesn't manipulate the same obj
+    
+    LOG.info("DEBUG --- addResourceRequest:" +
+    		" applicationId=" + applicationId.id +
+    		" priority=" + priority.priority + 
+        " resourceName=" + resourceName + 
+        " capability=" + capability +
+        " numContainers=" + request.numContainers + 
+        " #asks=" + ask.size());
+  }
+  
+  public synchronized List<Container> getResources() throws IOException {
+    LOG.info("DEBUG --- getResources begin:" +
+        " application=" + applicationId + 
+        " #ask=" + ask.size() +
+        " #release=" + release.size());
+    for (ResourceRequest request : ask) {
+      LOG.info("DEBUG --- getResources:" +
+          " application=" + applicationId + 
+          " ask-request=" + request);
+    }
+    for (Container c : release) {
+      LOG.info("DEBUG --- getResources:" +
+          " application=" + applicationId + 
+          " release=" + c);
+    }
+    
+    // Get resources from the ResourceManager
+    List<Container> response = 
+      resourceManager.getResourceScheduler().allocate(applicationId, 
+          new ArrayList<ResourceRequest>(ask), 
+          new ArrayList<Container>(release));
+    
+    List<Container> containers = new ArrayList<Container>(response.size());
+    for (Container container : response) {
+      if (container.state != ContainerState.COMPLETE) {
+        containers.add(
+            org.apache.hadoop.yarn.server.resourcemanager.resource.Container.create(
+                container));
+      }
+    }
+    // Clear state for next interaction with ResourceManager
+    ask.clear();
+    release.clear();
+    
+    LOG.info("DEBUG --- getResources() for " + applicationId + ":" +
+    		" ask=" + ask.size() + 
+    		" release= "+ release.size() + 
+    		" recieved=" + containers.size());
+    
+    return containers;
+  }
+  
+  public synchronized void assign(List<Container> containers) 
+  throws IOException {
+    
+    int numContainers = containers.size();
+    // Schedule in priority order
+    for (Priority priority : requests.keySet()) {
+      assign(priority, NodeType.DATA_LOCAL, containers);
+      assign(priority, NodeType.RACK_LOCAL, containers);
+      assign(priority, NodeType.OFF_SWITCH, containers);
+
+      if (containers.isEmpty()) { 
+        break;
+      }
+    }
+    
+    int assignedContainers = numContainers - containers.size();
+    LOG.info("Application " + applicationId + " assigned " + 
+        assignedContainers + "/" + numContainers);
+    if (assignedContainers < numContainers) {
+      // Release
+      release.addAll(containers);
+    }
+  }
+  
+  public synchronized void schedule() throws IOException {
+    assign(getResources());
+  }
+  
+  private synchronized void assign(Priority priority, NodeType type, 
+      List<Container> containers) throws IOException {
+    for (Iterator<Container> i=containers.iterator(); i.hasNext();) {
+      Container container = i.next();
+      String host = container.hostName.toString();
+      
+      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.equals(
+          requestSpec.get(priority), container.resource)) { 
+        // See which task can use this container
+        for (Iterator<Task> t=tasks.get(priority).iterator(); t.hasNext();) {
+          Task task = t.next();
+          if (task.getState() == State.PENDING && task.canSchedule(type, host)) {
+            NodeManager nodeManager = getNodeManager(host);
+            
+            task.start(nodeManager, container.id);
+            i.remove();
+            
+            // Track application resource usage
+            org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+                used, container.resource);
+            
+            LOG.info("Assigned container (" + container + ") of type " + type +
+                " to task " + task.getTaskId() + " at priority " + priority + 
+                " on node " + nodeManager.getHostName() +
+                ", currently using " + used + " resources");
+
+            // Update resource requests
+            updateResourceRequests(requests.get(priority), type, task);
+
+            // Launch the container
+            nodeManager.startContainer(createCLC(container));
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  private void updateResourceRequests(Map<String, ResourceRequest> requests, 
+      NodeType type, Task task) {
+    if (type == NodeType.DATA_LOCAL) {
+      for (String host : task.getHosts()) {
+        LOG.info("DEBUG --- updateResourceRequests:" +
+            " application=" + applicationId +
+        		" type=" + type + 
+        		" host=" + host + 
+        		" request=" + ((requests == null) ? "null" : requests.get(host)));
+        updateResourceRequest(requests.get(host));
+      }
+    }
+    
+    if (type == NodeType.DATA_LOCAL || type == NodeType.RACK_LOCAL) {
+      for (String rack : task.getRacks()) {
+        LOG.info("DEBUG --- updateResourceRequests:" +
+            " application=" + applicationId +
+            " type=" + type + 
+            " rack=" + rack + 
+            " request=" + ((requests == null) ? "null" : requests.get(rack)));
+        updateResourceRequest(requests.get(rack));
+      }
+    }
+    
+    updateResourceRequest(
+        requests.get(
+            org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager.ANY)
+            );
+    
+    LOG.info("DEBUG --- updateResourceRequests:" +
+        " application=" + applicationId +
+    		" #asks=" + ask.size());
+  }
+  
+  private void updateResourceRequest(ResourceRequest request) {
+    --request.numContainers;
+
+    // Note this for next interaction with ResourceManager
+    ask.remove(request);
+    ask.add(
+        org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceRequest.create(
+        request)); // clone to ensure the RM doesn't manipulate the same obj
+
+    LOG.info("DEBUG --- updateResourceRequest:" +
+        " application=" + applicationId +
+    		" request=" + request);
+  }
+
+  private ContainerLaunchContext createCLC(Container container) {
+    ContainerLaunchContext clc = new ContainerLaunchContext();
+    clc.id = container.id;
+    clc.user = this.user;
+    clc.resource = container.resource;
+    return clc;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,121 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * Test helper to generate mock nodes
+ */
+public class MockNodes {
+  private static int NODE_ID = 0;
+
+  public static List<NodeInfo> newNodes(int racks, int nodesPerRack,
+                                        Resource perNode) {
+    List<NodeInfo> list = Lists.newArrayList();
+    for (int i = 0; i < racks; ++i) {
+      for (int j = 0; j < nodesPerRack; ++j) {
+        list.add(newNodeInfo(i, perNode));
+      }
+    }
+    return list;
+  }
+
+  public static NodeID newNodeID(int id) {
+    NodeID nid = new NodeID();
+    nid.id = id;
+    return nid;
+  }
+
+  public static Resource newResource(int mem) {
+    Resource rs = new Resource();
+    rs.memory = mem;
+    return rs;
+  }
+
+  public static Resource newUsedResource(Resource total) {
+    Resource rs = new Resource();
+    rs.memory = (int)(Math.random() * total.memory);
+    return rs;
+  }
+
+  public static Resource newAvailResource(Resource total, Resource used) {
+    Resource rs = new Resource();
+    rs.memory = total.memory - used.memory;
+    return rs;
+  }
+
+  public static NodeInfo newNodeInfo(int rack, final Resource perNode) {
+    final String rackName = "rack"+ rack;
+    final int nid = NODE_ID++;
+    final NodeID nodeID = newNodeID(nid);
+    final String hostName = "host"+ nid;
+    final Resource used = newUsedResource(perNode);
+    final Resource avail = newAvailResource(perNode, used);
+    final int containers = (int)(Math.random() * 8);
+    return new NodeInfo() {
+      @Override
+      public NodeID getNodeID() {
+        return nodeID;
+      }
+
+      @Override
+      public String getHostName() {
+        return hostName;
+      }
+
+      @Override
+      public Resource getTotalCapability() {
+        return perNode;
+      }
+
+      @Override
+      public String getRackName() {
+        return rackName;
+      }
+
+      @Override
+      public Node getNode() {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public Resource getAvailableResource() {
+        return avail;
+      }
+
+      @Override
+      public Resource getUsedResource() {
+        return used;
+      }
+
+      @Override
+      public int getNumContainers() {
+        return containers;
+      }
+    };
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,228 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.ContainerStatus;
+import org.apache.hadoop.yarn.HeartbeatResponse;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.NodeStatus;
+import org.apache.hadoop.yarn.RegistrationResponse;
+import org.apache.hadoop.yarn.Resource;
+
+@Private
+public class NodeManager implements ContainerManager {
+  private static final Log LOG = LogFactory.getLog(NodeManager.class);
+  
+  final private String hostName;
+  final private String rackName;
+  final private NodeID nodeId;
+  final private Resource capability;
+  Resource available = new Resource();
+  Resource used = new Resource();
+
+  final RMResourceTrackerImpl resourceTracker;
+  final NodeInfo nodeInfo;
+  final Map<CharSequence, List<Container>> containers = 
+    new HashMap<CharSequence, List<Container>>();
+  
+  public NodeManager(String hostName, String rackName, int memory, 
+      RMResourceTrackerImpl resourceTracker) throws IOException {
+    this.hostName = hostName;
+    this.rackName = rackName;
+    this.resourceTracker = resourceTracker;
+    this.capability = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
+          memory);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        available, capability);
+
+    RegistrationResponse response =
+        resourceTracker.registerNodeManager(hostName, capability);
+    this.nodeId = response.nodeID;
+    this.nodeInfo = resourceTracker.getNodeManager(nodeId);
+   
+    // Sanity check
+    Assert.assertEquals(memory, 
+       nodeInfo.getAvailableResource().memory);
+  }
+  
+  public String getHostName() {
+    return hostName;
+  }
+
+  public String getRackName() {
+    return rackName;
+  }
+
+  public NodeID getNodeId() {
+    return nodeId;
+  }
+
+  public Resource getCapability() {
+    return capability;
+  }
+
+  public Resource getAvailable() {
+    return available;
+  }
+  
+  public Resource getUsed() {
+    return used;
+  }
+  
+  int responseID = 0;
+  
+  public void heartbeat() throws AvroRemoteException {
+    NodeStatus nodeStatus = 
+      org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeStatus.createNodeStatus(
+          nodeId, containers);
+    nodeStatus.responseId = responseID;
+    HeartbeatResponse response = resourceTracker.nodeHeartbeat(nodeStatus);
+    responseID = response.responseId;
+  }
+
+  @Override
+  synchronized public Void cleanupContainer(ContainerID containerID)
+      throws AvroRemoteException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  synchronized public Void startContainer(ContainerLaunchContext containerLaunchContext)
+      throws AvroRemoteException {
+    String applicationId = String.valueOf(containerLaunchContext.id.appID.id);
+
+    List<Container> applicationContainers = containers.get(applicationId);
+    if (applicationContainers == null) {
+      applicationContainers = new ArrayList<Container>();
+      containers.put(applicationId, applicationContainers);
+    }
+    
+    // Sanity check
+    for (Container container : applicationContainers) {
+      if (container.id.compareTo(containerLaunchContext.id) == 0) {
+        throw new IllegalStateException(
+            "Container " + containerLaunchContext.id + 
+            " already setup on node " + hostName);
+      }
+    }
+
+    Container container = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Container.create(
+          containerLaunchContext.id, 
+          hostName, containerLaunchContext.resource);
+    applicationContainers.add(container);
+    
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        available, containerLaunchContext.resource);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        used, containerLaunchContext.resource);
+    
+    LOG.info("DEBUG --- startContainer:" +
+        " node=" + hostName +
+        " application=" + applicationId + 
+        " container=" + container +
+        " available=" + available +
+        " used=" + used);
+
+    return null;
+  }
+
+  synchronized public void checkResourceUsage() {
+    LOG.info("Checking resource usage for " + hostName);
+    Assert.assertEquals(available.memory, 
+        nodeInfo.getAvailableResource().memory);
+    Assert.assertEquals(used.memory, 
+        nodeInfo.getUsedResource().memory);
+  }
+  
+  @Override
+  synchronized public Void stopContainer(ContainerID containerID) throws AvroRemoteException {
+    String applicationId = String.valueOf(containerID.appID.id);
+    
+    // Mark the container as COMPLETE
+    List<Container> applicationContainers = containers.get(applicationId);
+    for (Container c : applicationContainers) {
+      if (c.id.compareTo(containerID) == 0) {
+        c.state = ContainerState.COMPLETE;
+      }
+    }
+    
+    // Send a heartbeat
+    heartbeat();
+    
+    // Remove container and update status
+    int ctr = 0;
+    Container container = null;
+    for (Iterator<Container> i=applicationContainers.iterator(); i.hasNext();) {
+      container = i.next();
+      if (container.id.compareTo(containerID) == 0) {
+        i.remove();
+        ++ctr;
+      }
+    }
+    
+    if (ctr != 1) {
+      throw new IllegalStateException("Container " + containerID + 
+          " stopped " + ctr + " times!");
+    }
+    
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.addResource(
+        available, container.resource);
+    org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.subtractResource(
+        used, container.resource);
+
+    LOG.info("DEBUG --- stopContainer:" +
+        " node=" + hostName +
+        " application=" + applicationId + 
+        " container=" + containerID +
+        " available=" + available +
+        " used=" + used);
+
+    return null;
+  }
+
+  @Override
+  synchronized public ContainerStatus getContainerStatus(ContainerID containerID)
+      throws AvroRemoteException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,145 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+ * 
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.Priority;
+
+public class Task {
+  private static final Log LOG = LogFactory.getLog(Task.class);
+  
+  public enum State {PENDING, ALLOCATED, RUNNING, COMPLETE};
+  
+  final private ApplicationID applicationId;
+  final private int taskId;
+  final private Priority priority;
+  
+  final private Set<String> hosts = new HashSet<String>();
+  final private Set<String> racks = new HashSet<String>();
+  
+  private ContainerID containerId;
+  private org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager;
+  
+  private State state;
+  
+  public Task(Application application, Priority priority, String[] hosts) {
+    this.applicationId = application.getApplicationId();
+    this.priority = priority;
+    
+    taskId = application.getNextTaskId();
+    state = State.PENDING;
+    
+    // Special case: Don't care about locality
+    if (!(hosts.length == 1 && 
+        hosts[0].equals(
+            org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager.ANY))) {
+      for (String host : hosts) {
+        this.hosts.add(host);
+        this.racks.add(Application.resolve(host));
+      }
+    }
+    LOG.info("Task " + taskId + " added to application " + this.applicationId + 
+        " with " + this.hosts.size() + " hosts, " + racks.size() + " racks");
+  }
+  
+  public int getTaskId() {
+    return taskId;
+  }
+
+  public Priority getPriority() {
+    return priority;
+  }
+  
+  public org.apache.hadoop.yarn.server.resourcemanager.NodeManager getNodeManager() {
+    return nodeManager;
+  }
+  
+  public ContainerID getContainerId() {
+    return containerId;
+  }
+  
+  public ApplicationID getApplicationID() {
+    return applicationId;
+  }
+
+  public String[] getHosts() {
+    return hosts.toArray(new String[hosts.size()]);
+  }
+  
+  public String[] getRacks() {
+    return racks.toArray(new String[racks.size()]);
+  }
+  
+  public boolean canSchedule(NodeType type, String hostName) {
+    if (type == NodeType.DATA_LOCAL) { 
+      return hosts.contains(hostName);
+    } else if (type == NodeType.RACK_LOCAL) {
+      return racks.contains(Application.resolve(hostName));
+    } 
+    
+    return true;
+  }
+  
+  public void start(NodeManager nodeManager, ContainerID containerId) {
+    this.nodeManager = nodeManager;
+    this.containerId = containerId;
+    setState(State.RUNNING);
+  }
+  
+  public void stop() {
+    if (getState() != State.RUNNING) {
+      throw new IllegalStateException("Trying to stop a non-running task: " + 
+          getTaskId() + " of application " + getApplicationID());
+    }
+    this.nodeManager = null;
+    this.containerId = null;
+    setState(State.COMPLETE);
+  }
+  
+  public State getState() {
+    return state;
+  }
+
+  private void setState(State state) {
+    this.state = state;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof Task) {
+      return ((Task)obj).taskId == this.taskId;
+    }
+    return super.equals(obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return taskId;
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,212 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.Resource;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestResourceManager extends TestCase {
+  private static final Log LOG = LogFactory.getLog(TestResourceManager.class);
+  
+  private ResourceManager resourceManager = null;
+  
+  @Before
+  public void setUp() throws Exception {
+    resourceManager = new ResourceManager();
+    resourceManager.init(new Configuration());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  private org.apache.hadoop.yarn.server.resourcemanager.NodeManager 
+  registerNode(String hostName, String rackName, int memory) 
+  throws IOException {
+    return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
+        hostName, rackName, memory, resourceManager.getResourceTracker());
+  }
+  
+  @Test
+  public void testResourceManagerInitialization() throws IOException {
+    LOG.info("--- START: testResourceManagerInitialization ---");
+        
+    final int memory = 16 * 1024;
+    
+    // Register node1
+    String host1 = "host1";
+    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = 
+      registerNode(host1, NetworkTopology.DEFAULT_RACK, memory);
+    nm1.heartbeat();
+    
+    // Register node2
+    String host2 = "host2";
+    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = 
+      registerNode(host2, NetworkTopology.DEFAULT_RACK, memory);
+    nm2.heartbeat();
+
+    LOG.info("--- END: testResourceManagerInitialization ---");
+  }
+
+  @Test
+  public void testApplicationSubmission() throws IOException {
+    LOG.info("--- START: testApplicationSubmission ---");
+        
+    final int memory = 16 * 1024;
+    
+    // Register node1
+    String host1 = "host1";
+    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = 
+      registerNode(host1, NetworkTopology.DEFAULT_RACK, memory);
+    nm1.heartbeat();
+    
+    // Register node 2
+    String host2 = "host1";
+    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = 
+      registerNode(host2, NetworkTopology.DEFAULT_RACK, memory);
+    nm2.heartbeat();
+    
+    // Submit an application
+    Application application = new Application("user1", resourceManager);
+    application.submit();
+    
+    LOG.info("--- END: testApplicationSubmission ---");
+  }
+
+  @Test
+  public void testResourceAllocation() throws IOException {
+    LOG.info("--- START: testResourceAllocation ---");
+        
+    final int memory = 4 * 1024;
+    
+    // Register node1
+    String host1 = "host1";
+    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = 
+      registerNode(host1, NetworkTopology.DEFAULT_RACK, memory);
+    nm1.heartbeat();
+    
+    // Register node2
+    String host2 = "host2";
+    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = 
+      registerNode(host2, NetworkTopology.DEFAULT_RACK, memory/2);
+    nm2.heartbeat();
+
+    // Submit an application
+    Application application = new Application("user1", resourceManager);
+    application.submit();
+    
+    application.addNodeManager(host1, nm1);
+    application.addNodeManager(host2, nm2);
+    
+    // Application resource requirements
+    final int memory1 = 1024;
+    Resource capability1 = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
+          memory1); 
+    Priority priority1 = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1);
+    application.addResourceRequestSpec(priority1, capability1);
+    
+    Task t1 = new Task(application, priority1, new String[] {host1, host2});
+    application.addTask(t1);
+        
+    final int memory2 = 2048;
+    Resource capability2 = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.createResource(
+          memory2); 
+    Priority priority0 = 
+      org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); // higher
+    application.addResourceRequestSpec(priority0, capability2);
+    
+    // Send resource requests to the scheduler
+    application.schedule();
+    
+    // Send a heartbeat to kick the tires on the Scheduler
+    nm1.heartbeat();
+    
+    // Get allocations from the scheduler
+    application.schedule();
+    
+    nm1.heartbeat();
+    checkResourceUsage(nm1, nm2);
+    
+    LOG.info("Adding new tasks...");
+    
+    Task t2 = new Task(application, priority1, new String[] {host1, host2});
+    application.addTask(t2);
+
+    Task t3 = new Task(application, priority0, new String[] {NodeManager.ANY});
+    application.addTask(t3);
+
+    // Send resource requests to the scheduler
+    application.schedule();
+    checkResourceUsage(nm1, nm2);
+    
+    // Send a heartbeat to kick the tires on the Scheduler
+    LOG.info("Sending hb from host2");
+    nm2.heartbeat();
+    
+    LOG.info("Sending hb from host1");
+    nm1.heartbeat();
+    
+    // Get allocations from the scheduler
+    LOG.info("Trying to allocate...");
+    application.schedule();
+
+    nm1.heartbeat();
+    nm2.heartbeat();
+    checkResourceUsage(nm1, nm2);
+    
+    // Complete tasks
+    LOG.info("Finishing up tasks...");
+    application.finishTask(t1);
+    application.finishTask(t2);
+    application.finishTask(t3);
+    
+    // Send heartbeat
+    nm1.heartbeat();
+    nm2.heartbeat();
+    checkResourceUsage(nm1, nm2);
+    
+    LOG.info("--- END: testResourceAllocation ---");
+  }
+
+  private void checkResourceUsage(
+      org.apache.hadoop.yarn.server.resourcemanager.NodeManager... nodes ) {
+    for (org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager : nodes) {
+      nodeManager.checkResourceUsage();
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/* a test case that tests the launch failure of a AM */
+public class TestAMLaunchFailure extends TestCase {
+  private static final Log LOG = LogFactory.getLog(TestAMLaunchFailure.class);
+  ApplicationsManagerImpl asmImpl;
+  YarnScheduler scheduler = new DummyYarnScheduler();
+  ApplicationTokenSecretManager applicationTokenSecretManager = 
+    new ApplicationTokenSecretManager();
+
+  private ASMContext context;
+
+  private static class DummyYarnScheduler implements YarnScheduler {
+    private Container container = new Container();
+
+    @Override
+    public List<Container> allocate(ApplicationID applicationId,
+    List<ResourceRequest> ask, List<Container> release) throws IOException {
+      return Arrays.asList(container);
+    }
+
+    @Override
+    public void addApplication(ApplicationID applicationId, String user,
+    String queue, Priority priority) throws IOException {
+    }
+
+    @Override
+    public void removeApplication(ApplicationID applicationId)
+    throws IOException {
+    }
+  }
+
+  private class DummyApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+    public DummyApplicationTracker() {
+      context.getDispatcher().register(ApplicationTrackerEventType.class, this);
+    }
+    @Override
+    public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+    }
+  }
+
+  public class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
+
+    private  class DummyApplicationMasterLauncher implements EventHandler<ASMEvent<AMLauncherEventType>> {
+      private Object notify = new Object();
+      private AppContext app;
+      
+      public DummyApplicationMasterLauncher(ASMContext context) {
+        context.getDispatcher().register(AMLauncherEventType.class, this);
+        new TestThread().start();
+      }
+      @Override
+      public void handle(ASMEvent<AMLauncherEventType> appEvent) {
+        switch(appEvent.getType()) {
+        case LAUNCH:
+          app = appEvent.getAppContext();
+          synchronized (notify) {
+            notify.notify();
+          }
+          break;
+        }
+      }
+
+      private class TestThread extends Thread {
+        public void run() {
+          synchronized(notify) {
+            try {
+              notify.wait();
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            context.getDispatcher().getEventHandler().handle(
+            new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
+            app));
+          }
+        }
+      }
+    }
+
+    public ExtApplicationsManagerImpl(
+    ApplicationTokenSecretManager applicationTokenSecretManager,
+    YarnScheduler scheduler) {
+      super(applicationTokenSecretManager, scheduler, context);
+    }
+
+    @Override
+    protected EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher(
+    ApplicationTokenSecretManager tokenSecretManager) {
+      return new DummyApplicationMasterLauncher(context);
+    }
+  }
+
+
+  @Before
+  public void setUp() {
+    context = new ResourceManager.ASMContextImpl();
+    asmImpl = new ExtApplicationsManagerImpl(applicationTokenSecretManager, scheduler);
+    Configuration conf = new Configuration();
+    new DummyApplicationTracker();
+    conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 3000L);
+    asmImpl.init(conf);
+    asmImpl.start();  
+  }
+
+  @After
+  public void tearDown() {
+    asmImpl.stop();
+  }
+
+  private ApplicationSubmissionContext createDummyAppContext(ApplicationID appID) {
+    ApplicationSubmissionContext context = new ApplicationSubmissionContext();
+    context.applicationId = appID;
+    return context;
+  }
+
+  @Test
+  public void testAMLaunchFailure() throws Exception {
+    ApplicationID appID = asmImpl.getNewApplicationID();
+    ApplicationSubmissionContext context = createDummyAppContext(appID);
+    asmImpl.submitApplication(context);
+    ApplicationMaster master = asmImpl.getApplicationMaster(appID);
+
+    while (master.state != ApplicationState.FAILED) {
+      Thread.sleep(200);
+      master = asmImpl.getApplicationMaster(appID);
+    }
+    assertTrue(master.state == ApplicationState.FAILED);
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,132 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.AMResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Priority;
+import org.apache.hadoop.yarn.ResourceRequest;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestAMRMRPCResponseId extends TestCase {
+  ApplicationMasterService amService = null;
+  ApplicationTokenSecretManager appTokenManager = new ApplicationTokenSecretManager();
+  DummyApplicationsManager applicationsManager;
+  DummyScheduler scheduler;
+
+  private ASMContext context;
+  private class DummyApplicationsManager extends ApplicationsManagerImpl {
+    public DummyApplicationsManager(
+        ApplicationTokenSecretManager applicationTokenSecretManager,
+        YarnScheduler scheduler, ASMContext asmContext) {
+      super(applicationTokenSecretManager, scheduler, asmContext);      
+    }
+    @Override
+    public void registerApplicationMaster(ApplicationMaster applicationMaster)
+    throws IOException {
+    }
+    @Override
+    public void applicationHeartbeat(ApplicationStatus status)
+    throws IOException {      
+    }
+    @Override
+    public void finishApplicationMaster(ApplicationMaster applicationMaster)
+    throws IOException {  
+    }
+  }
+  
+  
+  private class DummyScheduler implements YarnScheduler {
+    @Override
+    public List<Container> allocate(ApplicationID applicationId,
+        List<ResourceRequest> ask, List<Container> release) throws IOException {
+      return null;
+    }
+    @Override
+    public void addApplication(ApplicationID applicationId, String user,
+        String queue, Priority priority) throws IOException {
+    }
+    @Override
+    public void removeApplication(ApplicationID applicationId)
+        throws IOException {
+    }
+  }
+  
+  @Before
+  public void setUp() {
+    context = new ResourceManager.ASMContextImpl();
+    scheduler = new DummyScheduler();
+    applicationsManager = new DummyApplicationsManager(new 
+        ApplicationTokenSecretManager(), scheduler, context);
+    amService = new ApplicationMasterService(
+        appTokenManager, applicationsManager, scheduler, context);
+    Configuration conf = new Configuration();
+    applicationsManager.init(conf);
+    amService.init(conf);
+  }
+  
+  @After
+  public void tearDown() {
+    
+  }
+  
+  public void testARRMResponseId() throws Exception {
+    ApplicationID applicationID = applicationsManager.getNewApplicationID();
+    ApplicationSubmissionContext context = new ApplicationSubmissionContext();
+    context.applicationId = applicationID;
+    applicationsManager.submitApplication(context);
+    ApplicationMaster applicationMaster = new ApplicationMaster();
+    applicationMaster.applicationId = applicationID;
+    applicationMaster.status = new ApplicationStatus();
+    amService.registerApplicationMaster(applicationMaster);
+    ApplicationStatus status = new ApplicationStatus();
+    status.applicationId = applicationID;
+    AMResponse response = amService.allocate(status, null, null);
+    assertTrue(response.responseId == 1);
+    assertFalse(response.reboot);
+    status.responseID = response.responseId;
+    response = amService.allocate(status, null, null);
+    assertTrue(response.responseId == 2);
+    /* try resending */
+    response = amService.allocate(status, null, null);
+    assertTrue(response.responseId == 2);
+    
+    /** try sending old **/
+    status.responseID = 0;
+    response = amService.allocate(status, null, null);
+    assertTrue(response.reboot);
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,308 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestASMStateMachine extends TestCase {
+  private static final Log LOG = LogFactory.getLog(TestASMStateMachine.class);
+  ASMContext context = new ResourceManager.ASMContextImpl();
+  EventHandler handler;
+  private boolean snreceivedCleanUp = false;
+  private boolean snAllocateReceived = false;
+  private boolean launchCalled = false;
+  private boolean addedApplication = false;
+  private boolean removedApplication = false;
+  private boolean launchCleanupCalled = false;
+  private AtomicInteger waitForState = new AtomicInteger();
+
+  @Before
+  public void setUp() {
+    handler = context.getDispatcher().getEventHandler();
+    new DummyAMLaunchEventHandler();
+    new DummySNEventHandler();
+    new ApplicationTracker();
+    new MockAppplicationMasterInfo();
+  }
+
+  @After
+  public void tearDown() {
+
+  }
+
+  private class DummyAMLaunchEventHandler implements EventHandler<ASMEvent<AMLauncherEventType>> {
+    AppContext appcontext;
+    AtomicInteger amsync = new AtomicInteger(0);
+
+    public DummyAMLaunchEventHandler() {
+      context.getDispatcher().register(AMLauncherEventType.class, this);
+      new Responder().start();
+    }
+
+    private class Responder extends Thread {
+      public void run() {
+        try {
+          synchronized (amsync) {
+            while(amsync.get() == 0) {
+              amsync.wait();
+            }
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      
+      context.getDispatcher().getEventHandler().handle(
+      new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
+      appcontext));
+      synchronized(waitForState) {
+        waitForState.addAndGet(1);
+        waitForState.notify();
+      }
+    }
+  }
+
+  @Override
+  public void handle(ASMEvent<AMLauncherEventType> event) {
+    switch(event.getType()) {
+    case LAUNCH:
+      launchCalled = true;
+      appcontext = event.getAppContext();
+      synchronized(amsync) {
+        amsync.addAndGet(1);
+        amsync.notify();
+      }
+      break;
+    case CLEANUP:
+      launchCleanupCalled = true;
+      break;
+    }
+  }
+}
+
+private class DummySNEventHandler implements EventHandler<ASMEvent<SNEventType>> {
+  AppContext appContext;
+  AtomicInteger snsync = new AtomicInteger(0);
+
+  public DummySNEventHandler() {
+    context.getDispatcher().register(SNEventType.class, this);
+    new Responder().start();
+  }
+
+  private class Responder extends Thread {
+    public void run() {
+      synchronized (snsync) {
+        try {
+          while (snsync.get() == 0) {
+            snsync.wait();
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      context.getDispatcher().getEventHandler().handle(
+      new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED,
+      appContext));
+      synchronized(waitForState) {
+        waitForState.addAndGet(1);
+        waitForState.notify();
+      }
+    }
+  }
+
+  @Override
+  public void handle(ASMEvent<SNEventType> event) {
+    switch(event.getType()) {
+    case CLEANUP:
+      snreceivedCleanUp = true;
+      break;
+    case SCHEDULE:
+      snAllocateReceived = true;
+      appContext = event.getAppContext();
+      synchronized (snsync) {
+        snsync.addAndGet(1);
+        snsync.notify();
+      }
+      break;
+    }
+  }
+
+}
+
+private static class StatusContext implements AppContext {
+  @Override
+  public ApplicationSubmissionContext getSubmissionContext() {
+    return null;
+  }
+  @Override
+  public Resource getResource() {
+    return null;
+  }
+  @Override
+  public ApplicationID getApplicationID() {
+    return null;
+  }
+  @Override
+  public ApplicationStatus getStatus() {
+    ApplicationStatus status = new ApplicationStatus();
+    status.lastSeen = -99;
+    return status;
+  }
+  @Override
+  public ApplicationMaster getMaster() {
+    return null;
+  }
+  @Override
+  public Container getMasterContainer() {
+    return null;
+  }
+  @Override
+  public String getUser() {
+    return null;
+  }
+  @Override
+  public long getLastSeen() {
+    return 0;
+  }
+  @Override
+  public String getName() {
+    return null;
+  }
+  @Override
+  public String getQueue() {
+    return null;
+  }
+}
+
+private class ApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+  public ApplicationTracker() {
+    context.getDispatcher().register(ApplicationTrackerEventType.class, this);
+  }
+
+  @Override
+  public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+    switch (event.getType()) {
+    case ADD:
+      addedApplication = true;
+      break;
+    case REMOVE:
+      removedApplication = true;
+      break;
+    }
+  }
+}
+
+private class MockAppplicationMasterInfo implements EventHandler<ASMEvent<ApplicationEventType>> {
+
+  MockAppplicationMasterInfo() {
+    context.getDispatcher().register(ApplicationEventType.class, this);
+  }
+  @Override
+  public void handle(ASMEvent<ApplicationEventType> event) {
+    LOG.info("The event type is " + event.getType());
+  }
+}
+
+public void waitForState(ApplicationState state, ApplicationMasterInfo info) {
+  synchronized(waitForState) {
+    try {
+      while (waitForState.get() == 0) {   
+        waitForState.wait(10000L);
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+  Assert.assertEquals(state, info.getState());
+}
+
+/* Test the state machine. 
+ * 
+ */
+@Test
+public void testStateMachine() {
+  ApplicationSubmissionContext submissioncontext = new ApplicationSubmissionContext();
+  submissioncontext.applicationId = new ApplicationID();
+  submissioncontext.applicationId.id = 1;
+  submissioncontext.applicationId.clusterTimeStamp = System.currentTimeMillis();
+
+  ApplicationMasterInfo masterInfo 
+  = new ApplicationMasterInfo(handler, "dummyuser", submissioncontext, "dummyToken");
+
+  context.getDispatcher().register(ApplicationEventType.class, masterInfo);
+  handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
+  ALLOCATE, masterInfo));
+
+  waitForState(ApplicationState.ALLOCATED, masterInfo);
+  waitForState.getAndDecrement();
+  handler.handle(new ASMEvent<ApplicationEventType>(
+  ApplicationEventType.LAUNCH, masterInfo));
+
+  waitForState(ApplicationState.LAUNCHED, masterInfo);
+  Assert.assertTrue(snAllocateReceived);
+  Assert.assertTrue(launchCalled);
+  Assert.assertTrue(addedApplication);
+  handler.handle(new ASMEvent<ApplicationEventType>(
+  ApplicationEventType.REGISTERED, masterInfo));
+  Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
+  handler.handle(new ASMEvent<ApplicationEventType>(
+  ApplicationEventType.STATUSUPDATE, new StatusContext()));
+
+  /* check if the state is still RUNNING */
+
+  Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
+
+  handler.handle(
+  new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH, masterInfo));
+
+  Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
+  /* check if clean up is called for everyone */
+  Assert.assertTrue(launchCleanupCalled);
+  Assert.assertTrue(snreceivedCleanUp);
+  Assert.assertTrue(removedApplication);
+
+  /* check if expiry doesnt make it failed */
+  handler.handle(
+  new ASMEvent<ApplicationEventType>(ApplicationEventType.EXPIRE, masterInfo));
+  Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());   
+}
+}



Mime
View raw message