hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153430 [3/9] - in /hadoop/common/branches/MR-279/mapreduce: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/loc...
Date Wed, 03 Aug 2011 11:32:10 GMT
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java Wed Aug  3 11:31:34 2011
@@ -6,21 +6,27 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerListProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationIdContainerListMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerListProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.StringContainerListMapProto;
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.mortbay.util.ajax.JSON.Convertor;
     
 public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements NodeStatus {
   NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
@@ -28,7 +34,7 @@ public class NodeStatusPBImpl extends Pr
   boolean viaProto = false;
   
   private NodeId nodeId = null;
-  private Map<String, List<Container>> containers = null;
+  private Map<ApplicationIdProto, List<Container>> containers = null;
   private NodeHealthStatus nodeHealthStatus = null;
   
   public NodeStatusPBImpl() {
@@ -110,18 +116,24 @@ public class NodeStatusPBImpl extends Pr
   }
   
   @Override
-  public Map<String, List<Container>> getAllContainers() {
+  public Map<ApplicationId, List<Container>> getAllContainers() {
     initContainers();
-    return this.containers;
+    HashMap<ApplicationId, List<Container>> returnMap = new HashMap<ApplicationId, List<Container>>(
+        this.containers.size());
+    for (Entry<ApplicationIdProto, List<Container>> entry : this.containers.entrySet()) {
+      returnMap.put(convertFromProtoFormat(entry.getKey()), entry.getValue());
+    }
+    return returnMap;
   }
 
   @Override
-  public List<Container> getContainers(String key) {
+  public List<Container> getContainers(ApplicationId applicationId) {
     initContainers();
-    if (this.containers.get(key) == null) {
-      this.containers.put(key, new ArrayList<Container>());
+    ApplicationIdProto applicationIdProto = convertToProtoFormat(applicationId);
+    if (this.containers.get(applicationIdProto) == null) {
+      this.containers.put(applicationIdProto, new ArrayList<Container>());
     }
-    return this.containers.get(key);
+    return this.containers.get(applicationIdProto);
   }
 
   private void initContainers() {
@@ -129,43 +141,45 @@ public class NodeStatusPBImpl extends Pr
       return;
     }
     NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringContainerListMapProto> list = p.getContainersList();
-    this.containers = new HashMap<String, List<Container>>();
+    List<ApplicationIdContainerListMapProto> list = p.getContainersList();
+    this.containers = new HashMap<ApplicationIdProto, List<Container>>();
 
-    for (StringContainerListMapProto c : list) {
-      this.containers.put(c.getKey(), convertFromProtoFormat(c.getValue()));
+    for (ApplicationIdContainerListMapProto c : list) {
+      this.containers.put(c.getApplicationId(), convertFromProtoFormat(c.getValue()));
     }
     
   }
   
   @Override
-  public void addAllContainers(final Map<String, List<Container>> containers) {
+  public void addAllContainers(final Map<ApplicationId, List<Container>> containers) {
     if (containers == null)
       return;
     initContainers();
-    this.containers.putAll(containers);
+    for (Entry<ApplicationId, List<Container>> entry : containers.entrySet()) {
+      this.containers.put(convertToProtoFormat(entry.getKey()), entry.getValue());
+    }
   }
   
   private void addContainersToProto() {
     maybeInitBuilder();
     builder.clearContainers();
     viaProto = false;
-    Iterable<StringContainerListMapProto> iterable = new Iterable<StringContainerListMapProto>() {
+    Iterable<ApplicationIdContainerListMapProto> iterable = new Iterable<ApplicationIdContainerListMapProto>() {
 
       @Override
-      public Iterator<StringContainerListMapProto> iterator() {
-        return new Iterator<StringContainerListMapProto>() {
+      public Iterator<ApplicationIdContainerListMapProto> iterator() {
+        return new Iterator<ApplicationIdContainerListMapProto>() {
 
-          Iterator<String> keyIter = containers.keySet().iterator();
+          Iterator<ApplicationIdProto> keyIter = containers.keySet().iterator();
           @Override
           public boolean hasNext() {
             return keyIter.hasNext();
           }
 
           @Override
-          public StringContainerListMapProto next() {
-            String key = keyIter.next();
-            return StringContainerListMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(containers.get(key))).build();
+          public ApplicationIdContainerListMapProto next() {
+            ApplicationIdProto applicationIdProto = keyIter.next();
+            return ApplicationIdContainerListMapProto.newBuilder().setApplicationId(applicationIdProto).setValue(convertToProtoFormat(containers.get(applicationIdProto))).build();
           }
 
           @Override
@@ -245,15 +259,15 @@ public class NodeStatusPBImpl extends Pr
   }
   
   @Override
-  public void setContainers(String key, List<Container> containers) {
+  public void setContainers(ApplicationId applicationId, List<Container> containers) {
     initContainers();
-    this.containers.put(key, containers);
+    this.containers.put(convertToProtoFormat(applicationId), containers);
   }
 
   @Override
-  public void removeContainers(String key) {
+  public void removeContainers(ApplicationId applicationId) {
     initContainers();
-    this.containers.remove(key);
+    this.containers.remove(applicationId);
   }
   
   @Override
@@ -270,6 +284,14 @@ public class NodeStatusPBImpl extends Pr
     return new NodeIdPBImpl(proto);
   }
 
+  private ApplicationIdProto convertToProtoFormat(ApplicationId applicationId) {
+    return ((ApplicationIdPBImpl)applicationId).getProto();
+  }
+  
+  private ApplicationId convertFromProtoFormat(ApplicationIdProto proto) {
+    return new ApplicationIdPBImpl(proto);
+  }
+
   private NodeHealthStatusProto convertToProtoFormat(
       NodeHealthStatus healthStatus) {
     return ((NodeHealthStatusPBImpl) healthStatus).getProto();

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto Wed Aug  3 11:31:34 2011
@@ -5,16 +5,10 @@ option java_generate_equals_and_hash = t
 
 import "yarn_protos.proto";
 
-message NodeHealthStatusProto {
-  optional bool is_node_healthy = 1;
-  optional string health_report = 2;
-  optional int64 last_health_report_time = 3;
-}
-
 message NodeStatusProto {
   optional NodeIdProto node_id = 1;
   optional int32 response_id = 2;
-  repeated StringContainerListMapProto containers = 3;
+  repeated ApplicationIdContainerListMapProto containers = 3;
   optional NodeHealthStatusProto nodeHealthStatus = 4;
 }
 
@@ -26,7 +20,7 @@ message RegistrationResponseProto {
 message HeartbeatResponseProto {
   optional int32 response_id = 1;
   optional bool reboot = 2;
-  repeated ContainerProto containers_to_cleanup = 3;
+  repeated ContainerIdProto containers_to_cleanup = 3;
   repeated ApplicationIdProto applications_to_cleanup = 4;
 }
 
@@ -34,8 +28,8 @@ message ContainerListProto {
   repeated ContainerProto container = 1;
 }
 
-message StringContainerListMapProto {
-  optional string key = 1;
+message ApplicationIdContainerListMapProto {
+  optional ApplicationIdProto application_id = 1;
   optional ContainerListProto value = 2;
 }
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java Wed Aug  3 11:31:34 2011
@@ -29,9 +29,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java Wed Aug  3 11:31:34 2011
@@ -20,19 +20,18 @@ package org.apache.hadoop.yarn.server.no
 
 import java.util.List;
 
-import org.apache.hadoop.yarn.api.records.Container;
-
+import org.apache.hadoop.yarn.api.records.ContainerId;
 
 public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
 
-  private List<Container> containerToCleanup;
+  private List<ContainerId> containerToCleanup;
 
-  public CMgrCompletedContainersEvent(List<Container> containersToCleanup) {
+  public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup) {
     super(ContainerManagerEventType.FINISH_CONTAINERS);
     this.containerToCleanup = containersToCleanup;
   }
 
-  public List<Container> getContainersToCleanup() {
+  public List<ContainerId> getContainersToCleanup() {
     return this.containerToCleanup;
   }
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Wed Aug  3 11:31:34 2011
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentMa
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Aug  3 11:31:34 2011
@@ -35,12 +35,12 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.YarnServerConfig;
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Aug  3 11:31:34 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -52,7 +53,6 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -201,17 +201,18 @@ public class NodeStatusUpdaterImpl exten
         Entry<ContainerId, Container> e = i.next();
         ContainerId containerId = e.getKey();
         Container container = e.getValue();
-        String applicationId = String.valueOf(containerId.getAppId().getId()); // TODO: ID? Really?
 
         List<org.apache.hadoop.yarn.api.records.Container> applicationContainers = nodeStatus
-            .getContainers(applicationId);
+            .getContainers(container.getContainerID().getAppId());
         if (applicationContainers == null) {
           applicationContainers = new ArrayList<org.apache.hadoop.yarn.api.records.Container>();
-          nodeStatus.setContainers(applicationId, applicationContainers);
+          nodeStatus.setContainers(container.getContainerID().getAppId(),
+              applicationContainers);
         }
 
         // Clone the container to send it to the RM
         org.apache.hadoop.yarn.api.records.Container c = container.cloneAndGetContainer();
+        c.setNodeId(this.nodeId);
         c.setContainerManagerAddress(this.containerManagerBindAddress);
         c.setNodeHttpAddress(this.nodeHttpAddress); // TODO: don't set everytime.
         applicationContainers.add(c);
@@ -268,8 +269,8 @@ public class NodeStatusUpdaterImpl exten
             HeartbeatResponse response =
               resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
             lastHeartBeatID = response.getResponseId();
-            List<org.apache.hadoop.yarn.api.records.Container> containersToCleanup =
-                response.getContainersToCleanupList();
+            List<ContainerId> containersToCleanup = response
+                .getContainersToCleanupList();
             if (containersToCleanup.size() != 0) {
               dispatcher.getEventHandler().handle(
                   new CMgrCompletedContainersEvent(containersToCleanup));

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Aug  3 11:31:34 2011
@@ -376,10 +376,10 @@ public class ContainerManagerImpl extend
     case FINISH_CONTAINERS:
       CMgrCompletedContainersEvent containersFinishedEvent =
           (CMgrCompletedContainersEvent) event;
-      for (org.apache.hadoop.yarn.api.records.Container container :
-            containersFinishedEvent.getContainersToCleanup()) {
+      for (ContainerId container : containersFinishedEvent
+          .getContainersToCleanup()) {
         this.dispatcher.getEventHandler().handle(
-            new ContainerKillEvent(container.getId(),
+            new ContainerKillEvent(container,
                 "Container Killed by ResourceManager"));
       }
       break;

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Wed Aug  3 11:31:34 2011
@@ -33,6 +33,12 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
 import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse;
@@ -61,7 +67,8 @@ public class AdminService extends Abstra
 
   private final Configuration conf;
   private final ResourceScheduler scheduler;
-  private final NodeTracker nodesTracker;
+  private final RMContext rmContext;
+  private final NodesListManager nodesListManager;
 
   private Server server;
   private InetSocketAddress masterServiceAddress;
@@ -71,11 +78,12 @@ public class AdminService extends Abstra
     RecordFactoryProvider.getRecordFactory(null);
 
   public AdminService(Configuration conf, ResourceScheduler scheduler, 
-      NodeTracker nodesTracker) {
+      RMContext rmContext, NodesListManager nodesListManager) {
     super(AdminService.class.getName());
     this.conf = conf;
     this.scheduler = scheduler;
-    this.nodesTracker = nodesTracker;
+    this.rmContext = rmContext;
+    this.nodesListManager = nodesListManager;
   }
 
   @Override
@@ -156,7 +164,7 @@ public class AdminService extends Abstra
       throws YarnRemoteException {
     checkAcls("refreshNodes");
     try {
-      nodesTracker.refreshNodes();
+      this.nodesListManager.refreshNodes();
       return recordFactory.newRecordInstance(RefreshNodesResponse.class);
     } catch (IOException ioe) {
       LOG.info("Exception refreshing nodes ", ioe);
@@ -199,4 +207,4 @@ public class AdminService extends Abstra
 
     return recordFactory.newRecordInstance(RefreshAdminAclsResponse.class);
   }
-}
\ No newline at end of file
+}

Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACL.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/application/ApplicationACL.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACL.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACL.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/application/ApplicationACL.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/application/ApplicationACL.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACL.java Wed Aug  3 11:31:34 2011
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.application;
+package org.apache.hadoop.yarn.server.resourcemanager;
 
 import org.apache.hadoop.classification.*;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;

Copied: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACLsManager.java (from r1153017, hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/application/ApplicationACLsManager.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACLsManager.java?p2=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACLsManager.java&p1=hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/application/ApplicationACLsManager.java&r1=1153017&r2=1153430&rev=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/application/ApplicationACLsManager.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACLsManager.java Wed Aug  3 11:31:34 2011
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.application;
+package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 
 @InterfaceAudience.Private
 public class ApplicationACLsManager {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Aug  3 11:31:34 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -56,12 +57,13 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.NodeManagerInfo;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -71,18 +73,13 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.AMLivelinessMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.application.ApplicationACL;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.application.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
 
@@ -93,11 +90,11 @@ import org.apache.hadoop.yarn.service.Ab
  */
 public class ClientRMService extends AbstractService implements
     ClientRMProtocol {
+  private static final ArrayList<ApplicationReport> EMPTY_APPS_REPORT = new ArrayList<ApplicationReport>();
+
   private static final Log LOG = LogFactory.getLog(ClientRMService.class);
 
   final private AtomicInteger applicationCounter = new AtomicInteger(0);
-
-  final private ClusterTracker clusterInfo;
   final private YarnScheduler scheduler;
   final private RMContext rmContext;
   private final ClientToAMSecretManager clientToAMSecretManager;
@@ -114,9 +111,8 @@ public class ClientRMService extends Abs
   public ClientRMService(RMContext rmContext,
       AMLivelinessMonitor amLivelinessMonitor,
       ClientToAMSecretManager clientToAMSecretManager,
-      ClusterTracker clusterInfo, YarnScheduler scheduler) {
+      YarnScheduler scheduler) {
     super(ClientRMService.class.getName());
-    this.clusterInfo = clusterInfo;
     this.scheduler = scheduler;
     this.rmContext = rmContext;
     this.amLivelinessMonitor = amLivelinessMonitor;
@@ -156,29 +152,9 @@ public class ClientRMService extends Abs
     super.start();
   }
 
-  private ApplicationReport createApplicationReport(Application application,
-      String user, String queue, String name, Container masterContainer) {
-    ApplicationMaster am = application.getMaster();
-    ApplicationReport applicationReport = 
-      recordFactory.newRecordInstance(ApplicationReport.class);
-    applicationReport.setApplicationId(am.getApplicationId());
-    applicationReport.setMasterContainer(masterContainer);
-    applicationReport.setHost(am.getHost());
-    applicationReport.setRpcPort(am.getRpcPort());
-    applicationReport.setClientToken(am.getClientToken());
-    applicationReport.setTrackingUrl(am.getTrackingUrl());
-    applicationReport.setDiagnostics(am.getDiagnostics());
-    applicationReport.setName(name);
-    applicationReport.setQueue(queue);
-    applicationReport.setState(am.getState());
-    applicationReport.setStatus(am.getStatus());
-    applicationReport.setUser(user);
-    return applicationReport;
-  }
-
   /**
    * check if the calling user has the access to application information.
-   * @param applicationId
+   * @param appAttemptId
    * @param callerUGI
    * @param owner
    * @param appACL
@@ -213,11 +189,9 @@ public class ClientRMService extends Abs
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) throws YarnRemoteException {
     ApplicationId applicationId = request.getApplicationId();
-    Application application = rmContext.getApplications().get(applicationId);
-    ApplicationReport report = (application == null) ? null
-        : createApplicationReport(application, application.getUser(),
-            application.getQueue(), application.getName(), application
-                .getMasterContainer());
+    RMApp application = rmContext.getRMApps().get(applicationId);
+    ApplicationReport report = (application == null) ? null : application
+        .createAndGetApplicationReport();
 
     GetApplicationReportResponse response = recordFactory
         .newRecordInstance(GetApplicationReportResponse.class);
@@ -225,6 +199,7 @@ public class ClientRMService extends Abs
     return response;
   }
 
+  @Override
   public SubmitApplicationResponse submitApplication(
       SubmitApplicationRequest request) throws YarnRemoteException {
     ApplicationSubmissionContext submissionContext = request
@@ -251,37 +226,18 @@ public class ClientRMService extends Abs
       ApplicationStore appStore = rmContext.getApplicationsStore()
           .createApplicationStore(submissionContext.getApplicationId(),
               submissionContext);
-      Application application = new ApplicationImpl(rmContext, getConfig(),
-          user, submissionContext, clientTokenStr, appStore,
-          this.amLivelinessMonitor);
-      if (rmContext.getApplications().putIfAbsent(
-          application.getApplicationID(), application) != null) {
-        throw new IOException("Application with id "
-            + application.getApplicationID()
+      RMApp application = new RMAppImpl(applicationId, rmContext,
+          getConfig(), submissionContext.getApplicationName(), user,
+          submissionContext.getQueue(), submissionContext, clientTokenStr,
+          appStore, this.amLivelinessMonitor, this.scheduler);
+      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
+        throw new IOException("Application with id " + applicationId
             + " is already present! Cannot add a duplicate!");
       }
 
-      /**
-       * this can throw so we need to call it synchronously to let the client
-       * know as soon as it submits. For backwards compatibility we cannot make
-       * it asynchronous
-       */
-      try {
-        scheduler.addApplication(applicationId, application.getMaster(),
-            user, application.getQueue(), submissionContext.getPriority(),
-            application.getStore());
-      } catch (IOException io) {
-        LOG.info("Failed to submit application " + applicationId, io);
-        rmContext.getDispatcher().getSyncHandler().handle(
-            new ApplicationEvent(ApplicationEventType.FAILED, applicationId));
-        throw io;
-      }
-
-      rmContext.getDispatcher().getSyncHandler().handle(
-          new ApplicationEvent(ApplicationEventType.ALLOCATE, applicationId));
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppEvent(applicationId, RMAppEventType.START));
 
-      // TODO this should happen via dispatcher. should move it out to scheudler
-      // negotiator.
       LOG.info("Application with id " + applicationId.getId()
           + " submitted by user " + user + " with " + submissionContext);
     } catch (IOException ie) {
@@ -308,7 +264,7 @@ public class ClientRMService extends Abs
       throw RPCUtil.getRemoteException(ie);
     }
 
-    Application application = rmContext.getApplications().get(applicationId);
+    RMApp application = this.rmContext.getRMApps().get(applicationId);
     // TODO: What if null
     if (!checkAccess(callerUGI, application.getUser(),
         ApplicationACL.MODIFY_APP)) {
@@ -317,8 +273,8 @@ public class ClientRMService extends Abs
           + ApplicationACL.MODIFY_APP.name() + " on " + applicationId));
     }
 
-    rmContext.getDispatcher().getEventHandler().handle(
-        new ApplicationEvent(ApplicationEventType.KILL, applicationId));
+    this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppEvent(applicationId, RMAppEventType.KILL));
 
     FinishApplicationResponse response = recordFactory
         .newRecordInstance(FinishApplicationResponse.class);
@@ -326,9 +282,14 @@ public class ClientRMService extends Abs
   }
 
   @Override
-  public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request) throws YarnRemoteException {
-    GetClusterMetricsResponse response = recordFactory.newRecordInstance(GetClusterMetricsResponse.class);
-    response.setClusterMetrics(clusterInfo.getClusterMetrics());
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnRemoteException {
+    GetClusterMetricsResponse response = recordFactory
+        .newRecordInstance(GetClusterMetricsResponse.class);
+    YarnClusterMetrics ymetrics = recordFactory
+        .newRecordInstance(YarnClusterMetrics.class);
+    ymetrics.setNumNodeManagers(this.rmContext.getRMNodes().size());
+    response.setClusterMetrics(ymetrics);
     return response;
   }
   
@@ -337,10 +298,8 @@ public class ClientRMService extends Abs
       GetAllApplicationsRequest request) throws YarnRemoteException {
 
     List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
-    for (Application application : rmContext.getApplications().values()) {
-      reports.add(createApplicationReport(application, application.getUser(),
-          application.getQueue(), application.getName(), application
-              .getMasterContainer()));
+    for (RMApp application : this.rmContext.getRMApps().values()) {
+      reports.add(application.createAndGetApplicationReport());
     }
 
     GetAllApplicationsResponse response = 
@@ -354,13 +313,12 @@ public class ClientRMService extends Abs
       throws YarnRemoteException {
     GetClusterNodesResponse response = 
       recordFactory.newRecordInstance(GetClusterNodesResponse.class);
-    List<NodeInfo> nodeInfos = clusterInfo.getAllNodeInfo();
-    List<NodeManagerInfo> nodes = 
-      new ArrayList<NodeManagerInfo>(nodeInfos.size());
-    for (NodeInfo nodeInfo : nodeInfos) {
-      nodes.add(createNodeManagerInfo(nodeInfo));
+    Collection<RMNode> nodes = this.rmContext.getRMNodes().values();
+    List<NodeReport> nodeReports = new ArrayList<NodeReport>(nodes.size());
+    for (RMNode nodeInfo : nodes) {
+      nodeReports.add(createNodeReports(nodeInfo));
     }
-    response.setNodeManagerList(nodes);
+    response.setNodeReports(nodeReports);
     return response;
   }
 
@@ -371,10 +329,19 @@ public class ClientRMService extends Abs
       recordFactory.newRecordInstance(GetQueueInfoResponse.class);
     try {
       QueueInfo queueInfo = 
-        scheduler.getQueueInfo(request.getQueueName(), 
-            request.getIncludeApplications(), 
+        scheduler.getQueueInfo(request.getQueueName(),  
             request.getIncludeChildQueues(), 
             request.getRecursive());
+      List<ApplicationReport> appReports = EMPTY_APPS_REPORT;
+      if (request.getIncludeApplications()) {
+        Collection<RMApp> apps = this.rmContext.getRMApps().values();
+        appReports = new ArrayList<ApplicationReport>(
+            apps.size());
+        for (RMApp app : apps) {
+          appReports.add(app.createAndGetApplicationReport());
+        }
+      }
+      queueInfo.setApplications(appReports);
       response.setQueueInfo(queueInfo);
     } catch (IOException ioe) {
       LOG.info("Failed to getQueueInfo for " + request.getQueueName(), ioe);
@@ -384,13 +351,21 @@ public class ClientRMService extends Abs
     return response;
   }
 
-  private NodeManagerInfo createNodeManagerInfo(NodeInfo nodeInfo) {
-    NodeManagerInfo node = 
-      recordFactory.newRecordInstance(NodeManagerInfo.class);
+  private NodeReport createNodeReports(RMNode nodeInfo) {
+    NodeReport node = 
+      recordFactory.newRecordInstance(NodeReport.class);
     node.setNodeAddress(nodeInfo.getNodeAddress());
     node.setRackName(nodeInfo.getRackName());
     node.setCapability(nodeInfo.getTotalCapability());
-    node.setUsed(nodeInfo.getUsedResource());
+    node.setNodeHealthStatus(nodeInfo.getNodeHealthStatus());
+    List<Container> containers = nodeInfo.getRunningContainers();
+    int userdResource = 0;
+    for (Container c : containers) {
+      userdResource += c.getResource().getMemory();
+    }
+    Resource usedRsrc = recordFactory.newRecordInstance(Resource.class);
+    usedRsrc.setMemory(userdResource);
+    node.setUsed(usedRsrc);
     node.setNumContainers(nodeInfo.getNumContainers());
     return node;
   }

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,35 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+
+public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId> {
+
+  private EventHandler dispatcher;
+  
+  public NMLivelinessMonitor(Dispatcher d) {
+    super("NMLivelinessMonitor", new SystemClock());
+    this.dispatcher = d.getEventHandler();
+  }
+
+  public void init(Configuration conf) {
+    super.init(conf);
+    setExpireInterval(conf.getInt(RMConfig.NM_EXPIRY_INTERVAL,
+        RMConfig.DEFAULT_NM_EXPIRY_INTERVAL));
+    setMonitorInterval(conf.getInt(
+        RMConfig.NMLIVELINESS_MONITORING_INTERVAL,
+        RMConfig.DEFAULT_NMLIVELINESS_MONITORING_INTERVAL));
+  }
+
+  @Override
+  protected void expire(NodeId id) {
+    dispatcher.handle(
+        new RMNodeEvent(id, RMNodeEventType.EXPIRE)); 
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,86 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class NodesListManager extends AbstractService{
+
+  private static final Log LOG = LogFactory.getLog(NodesListManager.class);
+
+  private HostsFileReader hostsReader;
+  private Configuration conf;
+
+  public NodesListManager() {
+    super(NodesListManager.class.getName());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+
+    this.conf = conf;
+
+    // Read the hosts/exclude files to restrict access to the RM
+    try {
+      this.hostsReader = 
+        new HostsFileReader(
+            conf.get(RMConfig.RM_NODES_INCLUDE_FILE, 
+                RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE),
+            conf.get(RMConfig.RM_NODES_EXCLUDE_FILE, 
+                RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE)
+                );
+      printConfiguredHosts();
+    } catch (IOException ioe) {
+      LOG.warn("Failed to init hostsReader, disabling", ioe);
+      try {
+        this.hostsReader = 
+          new HostsFileReader(RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE, 
+              RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE);
+      } catch (IOException ioe2) {
+        // Should *never* happen
+        this.hostsReader = null;
+        throw new YarnException(ioe2);
+      }
+    }
+    super.init(conf);
+  }
+
+  private void printConfiguredHosts() {
+    if (!LOG.isDebugEnabled()) {
+      return;
+    }
+    
+    LOG.debug("hostsReader: in=" + conf.get(RMConfig.RM_NODES_INCLUDE_FILE, 
+        RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE) + " out=" +
+        conf.get(RMConfig.RM_NODES_EXCLUDE_FILE, 
+            RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE));
+    for (String include : hostsReader.getHosts()) {
+      LOG.debug("include: " + include);
+    }
+    for (String exclude : hostsReader.getExcludedHosts()) {
+      LOG.debug("exclude: " + exclude);
+    }
+  }
+
+  public void refreshNodes() throws IOException {
+    synchronized (hostsReader) {
+      hostsReader.refresh();
+      printConfiguredHosts();
+    }
+  }
+
+  public boolean isValidNode(String hostName) {
+    synchronized (hostsReader) {
+      Set<String> hostsList = hostsReader.getHosts();
+      Set<String> excludeList = hostsReader.getExcludedHosts();
+      return ((hostsList.isEmpty() || hostsList.contains(hostName)) && 
+          !excludeList.contains(hostName));
+    }
+  }
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java Wed Aug  3 11:31:34 2011
@@ -33,10 +33,10 @@ public class RMConfig {
       + "application.max.retries";
   public static final int DEFAULT_ZK_TIMEOUT = 60000;
   public static final int DEFAULT_AM_MAX_RETRIES = 3;
-  public static final long DEFAULT_AM_EXPIRY_INTERVAL = 60000L;
+  public static final int DEFAULT_AM_EXPIRY_INTERVAL = 60000;
   public static final String NM_EXPIRY_INTERVAL = YarnConfiguration.RM_PREFIX
       + "nodemanager.expiry.interval";
-  public static final long DEFAULT_NM_EXPIRY_INTERVAL = 600000L;
+  public static final int DEFAULT_NM_EXPIRY_INTERVAL = 600000;
   public static final String DEFAULT_ADMIN_BIND_ADDRESS = "0.0.0.0:8141";
   public static final String RESOURCE_SCHEDULER = YarnConfiguration.RM_PREFIX
       + "scheduler";
@@ -44,11 +44,15 @@ public class RMConfig {
   public static final String AMLIVELINESS_MONITORING_INTERVAL =
       YarnConfiguration.RM_PREFIX
           + "amliveliness-monitor.monitoring-interval";
-  public static final long DEFAULT_AMLIVELINESS_MONITORING_INTERVAL = 1000;
+  public static final int DEFAULT_AMLIVELINESS_MONITORING_INTERVAL = 1000;
+  public static final String CONTAINER_LIVELINESS_MONITORING_INTERVAL
+    = YarnConfiguration.RM_PREFIX
+      + "amliveliness-monitor.monitoring-interval";
+  public static final int DEFAULT_CONTAINER_LIVELINESS_MONITORING_INTERVAL = 1000;
   public static final String NMLIVELINESS_MONITORING_INTERVAL =
       YarnConfiguration.RM_PREFIX
           + "nmliveliness-monitor.monitoring-interval";
-  public static final long DEFAULT_NMLIVELINESS_MONITORING_INTERVAL = 1000;
+  public static final int DEFAULT_NMLIVELINESS_MONITORING_INTERVAL = 1000;
   
   public static final String RM_RESOURCE_TRACKER_THREADS =
     YarnConfiguration.RM_PREFIX + "resource.tracker.threads";

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,34 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public interface RMContext {
+
+  Dispatcher getDispatcher();
+
+  NodeStore getNodeStore();
+
+  ApplicationsStore getApplicationsStore();
+
+  ConcurrentMap<ApplicationId, RMApp> getRMApps();
+
+  ConcurrentMap<ContainerId, RMContainer> getRMContainers();
+
+  ConcurrentMap<NodeId, RMNode> getRMNodes();
+
+  AMLivelinessMonitor getAMLivelinessMonitor();
+
+  ContainerAllocationExpirer getContainerAllocationExpirer();
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,84 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class RMContextImpl implements RMContext {
+
+  private final Dispatcher rmDispatcher;
+  private final Store store;
+
+  private final ConcurrentMap<ApplicationId, RMApp> applications
+    = new ConcurrentHashMap<ApplicationId, RMApp>();
+
+  private final ConcurrentMap<ContainerId, RMContainer> containers
+    = new ConcurrentHashMap<ContainerId, RMContainer>();
+
+  private final ConcurrentMap<NodeId, RMNode> nodes
+    = new ConcurrentHashMap<NodeId, RMNode>();
+
+  private AMLivelinessMonitor amLivelinessMonitor;
+  private ContainerAllocationExpirer containerAllocationExpirer;
+
+  public RMContextImpl(Store store, Dispatcher rmDispatcher,
+      ContainerAllocationExpirer containerAllocationExpirer,
+      AMLivelinessMonitor amLivelinessMonitor) {
+    this.store = store;
+    this.rmDispatcher = rmDispatcher;
+    this.containerAllocationExpirer = containerAllocationExpirer;
+    this.amLivelinessMonitor = amLivelinessMonitor;
+  }
+  
+  @Override
+  public Dispatcher getDispatcher() {
+    return this.rmDispatcher;
+  }
+
+  @Override
+  public NodeStore getNodeStore() {
+   return store;
+  }
+
+  @Override
+  public ApplicationsStore getApplicationsStore() {
+    return store;
+  }
+
+  @Override
+  public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
+    return this.applications;
+  }
+
+  @Override
+  public ConcurrentMap<ContainerId, RMContainer> getRMContainers() {
+    return this.containers;
+  }
+
+  @Override
+  public ConcurrentMap<NodeId, RMNode> getRMNodes() {
+    return this.nodes;
+  }
+
+  @Override
+  public ContainerAllocationExpirer getContainerAllocationExpirer() {
+    return this.containerAllocationExpirer;
+  }
+
+  @Override
+  public AMLivelinessMonitor getAMLivelinessMonitor() {
+    return this.amLivelinessMonitor;
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNodeRemovalListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNodeRemovalListener.java?rev=1153430&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNodeRemovalListener.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNodeRemovalListener.java Wed Aug  3 11:31:34 2011
@@ -0,0 +1,7 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public interface RMNodeRemovalListener {
+  void RMNodeRemoved(NodeId nodeId);
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1153430&r1=1153429&r2=1153430&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Aug  3 11:31:34 2011
@@ -20,11 +20,8 @@ package org.apache.hadoop.yarn.server.re
 
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.avro.AvroRuntimeException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -32,29 +29,44 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationTrackerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.ams.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeTracker;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 
@@ -66,126 +78,147 @@ public class ResourceManager extends Com
   private static final Log LOG = LogFactory.getLog(ResourceManager.class);
   public static final long clusterTimeStamp = System.currentTimeMillis();
   private YarnConfiguration conf;
+
+  protected ClientToAMSecretManager clientToAMSecretManager =
+      new ClientToAMSecretManager();
   
-  private ApplicationsManager applicationsManager;
-  
-  private ContainerTokenSecretManager containerTokenSecretManager =
+  protected ContainerTokenSecretManager containerTokenSecretManager =
       new ContainerTokenSecretManager();
 
-  private ApplicationTokenSecretManager appTokenSecretManager =
+  protected ApplicationTokenSecretManager appTokenSecretManager =
       new ApplicationTokenSecretManager();
 
-  private ResourceScheduler scheduler;
-  private ResourceTrackerService resourceTracker;
+  private Dispatcher rmDispatcher;
+
+  protected ResourceScheduler scheduler;
   private ClientRMService clientRM;
-  private ApplicationMasterService masterService;
+  protected ApplicationMasterService masterService;
+  private ApplicationMasterLauncher applicationMasterLauncher;
   private AdminService adminService;
+  private ContainerAllocationExpirer containerAllocationExpirer;
+  protected NMLivelinessMonitor nmLivelinessMonitor;
+  protected AMLivelinessMonitor amLivelinessMonitor;
+  protected NodesListManager nodesListManager;
+
   private final AtomicBoolean shutdown = new AtomicBoolean(false);
   private WebApp webApp;
   private RMContext rmContext;
   private final Store store;
+  protected ResourceTrackerService resourceTracker;
   
   public ResourceManager(Store store) {
     super("ResourceManager");
     this.store = store;
+    this.nodesListManager = new NodesListManager();
   }
   
   public RMContext getRMContext() {
     return this.rmContext;
   }
   
-  public interface RMContext {
-    public RMDispatcherImpl getDispatcher();
-    public NodeStore getNodeStore();
-    public ApplicationsStore getApplicationsStore();
-    public ConcurrentMap<ApplicationId, Application> getApplications();
-  }
-  
-  public static class RMContextImpl implements RMContext {
-    private final RMDispatcherImpl asmEventDispatcher;
-    private final Store store;
-    private final ConcurrentMap<ApplicationId, Application> applications = 
-      new ConcurrentHashMap<ApplicationId, Application>();
-
-    public RMContextImpl(Store store) {
-      this.asmEventDispatcher = new RMDispatcherImpl();
-      this.store = store;
-    }
-    
-    @Override
-    public RMDispatcherImpl getDispatcher() {
-      return this.asmEventDispatcher;
-    }
+  @Override
+  public synchronized void init(Configuration conf) {
 
-    @Override
-    public NodeStore getNodeStore() {
-     return store;
-    }
+    this.rmDispatcher = new AsyncDispatcher();
+    addIfService(this.rmDispatcher);
 
-    @Override
-    public ApplicationsStore getApplicationsStore() {
-      return store;
-    }
+    this.containerAllocationExpirer = new ContainerAllocationExpirer(
+        this.rmDispatcher);
+    addService(this.containerAllocationExpirer);
+
+    this.amLivelinessMonitor = createAMLivelinessMonitor();
+    addService(amLivelinessMonitor);
+
+    this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
+        this.containerAllocationExpirer, this.amLivelinessMonitor);
+
+    addService(nodesListManager);
 
-    @Override
-    public ConcurrentMap<ApplicationId, Application> getApplications() {
-      return this.applications;
-    }
-  }
-  
-  
-  @Override
-  public synchronized void init(Configuration conf) {
-    
-    this.rmContext = new RMContextImpl(this.store);
-    addService(rmContext.getDispatcher());
     // Initialize the config
     this.conf = new YarnConfiguration(conf);
     // Initialize the scheduler
-    this.scheduler = 
-      ReflectionUtils.newInstance(
-          conf.getClass(RMConfig.RESOURCE_SCHEDULER, 
-              FifoScheduler.class, ResourceScheduler.class), 
-          this.conf);
+    this.scheduler = createScheduler();
+    this.rmDispatcher.register(SchedulerEventType.class, scheduler);
 
-    // Register event handler for ApplicationEvents.
-    this.rmContext.getDispatcher().register(ApplicationEventType.class,
+    // Register event handler for RmAppEvents
+    this.rmDispatcher.register(RMAppEventType.class,
         new ApplicationEventDispatcher(this.rmContext));
 
-    this.rmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
+    // Register event handler for RmAppAttemptEvents
+    this.rmDispatcher.register(RMAppAttemptEventType.class,
+        new ApplicationAttemptEventDispatcher(this.rmContext));
+
+    // Register event handler for RmNodes
+    this.rmDispatcher.register(RMNodeEventType.class,
+        new NodeEventDispatcher(this.rmContext));
+
+    // Register event handler for RMContainer
+    this.rmDispatcher.register(RMContainerEventType.class,
+        new ContainerEventDispatcher(this.rmContext));
+
     //TODO change this to be random
     this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
         .createSecretKey("Dummy".getBytes()));
 
-    applicationsManager = createApplicationsManager();
-    addService(applicationsManager);
-    
-    resourceTracker = createResourceTrackerService();
+    this.nmLivelinessMonitor = createNMLivelinessMonitor();
+    addService(this.nmLivelinessMonitor);
+
+    this.resourceTracker = createResourceTrackerService();
     addService(resourceTracker);
   
     try {
-      this.scheduler.reinitialize(this.conf, 
-          this.containerTokenSecretManager, 
-          resourceTracker.getResourceTracker());
+      this.scheduler.reinitialize(this.conf,
+          this.containerTokenSecretManager, this.rmContext);
     } catch (IOException ioe) {
       throw new RuntimeException("Failed to initialize scheduler", ioe);
     }
-    
+
     clientRM = createClientRMService();
     addService(clientRM);
     
     masterService = createApplicationMasterService();
     addService(masterService) ;
     
-    adminService = 
-      createAdminService(conf, scheduler, resourceTracker.getResourceTracker());
+    adminService = createAdminService();
     addService(adminService);
 
+    this.applicationMasterLauncher = createAMLauncher();
+    addService(applicationMasterLauncher);
+
     super.init(conf);
   }
 
-  public static final class ApplicationEventDispatcher implements
-      EventHandler<ApplicationEvent> {
+  protected void addIfService(Object object) {
+    if (object instanceof Service) {
+      addService((Service) object);
+    }
+  }
+
+  protected ResourceScheduler createScheduler() {
+    return 
+    ReflectionUtils.newInstance(
+        conf.getClass(RMConfig.RESOURCE_SCHEDULER, 
+            FifoScheduler.class, ResourceScheduler.class), 
+        this.conf);
+  }
+
+  protected ApplicationMasterLauncher createAMLauncher() {
+    return new ApplicationMasterLauncher(
+        this.appTokenSecretManager, this.clientToAMSecretManager,
+        this.rmContext);
+  }
+
+  private NMLivelinessMonitor createNMLivelinessMonitor() {
+    return new NMLivelinessMonitor(this.rmContext
+        .getDispatcher());
+  }
+
+  protected AMLivelinessMonitor createAMLivelinessMonitor() {
+    return new AMLivelinessMonitor(this.rmDispatcher);
+  }
+
+  private static final class ApplicationEventDispatcher implements
+      EventHandler<RMAppEvent> {
 
     private final RMContext rmContext;
 
@@ -194,37 +227,118 @@ public class ResourceManager extends Com
     }
 
     @Override
-    public void handle(ApplicationEvent event) {
+    public void handle(RMAppEvent event) {
       ApplicationId appID = event.getApplicationId();
-      ApplicationImpl application = (ApplicationImpl) this.rmContext
-          .getApplications().get(appID);
-      try {
-        application.handle(event);
-      } catch (Throwable t) {
-        LOG.error("Error in handling event type " + event.getType()
-            + " for application " + event.getApplicationId(), t);
+      RMApp rmApp = this.rmContext.getRMApps().get(appID);
+      if (rmApp != null) {
+        try {
+          rmApp.handle(event);
+        } catch (Throwable t) {
+          LOG.error("Error in handling event type " + event.getType()
+              + " for application " + appID, t);
+        }
       }
     }
   }
 
+  private static final class ApplicationAttemptEventDispatcher implements
+      EventHandler<RMAppAttemptEvent> {
+
+    private final RMContext rmContext;
+
+    public ApplicationAttemptEventDispatcher(RMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    @Override
+    public void handle(RMAppAttemptEvent event) {
+      ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
+      ApplicationId appAttemptId = appAttemptID.getApplicationId();
+      RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId);
+      if (rmApp != null) {
+        RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID);
+        if (rmAppAttempt != null) {
+          try {
+            rmAppAttempt.handle(event);
+          } catch (Throwable t) {
+            LOG.error("Error in handling event type " + event.getType()
+                + " for applicationAttempt " + appAttemptId, t);
+          }
+        }
+      }
+    }
+  }
+
+  private static final class NodeEventDispatcher implements
+      EventHandler<RMNodeEvent> {
+
+    private final RMContext rmContext;
+
+    public NodeEventDispatcher(RMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    @Override
+    public void handle(RMNodeEvent event) {
+      NodeId nodeId = event.getNodeId();
+      RMNode node = this.rmContext.getRMNodes().get(nodeId);
+      if (node != null) {
+        try {
+          ((EventHandler<RMNodeEvent>) node).handle(event);
+        } catch (Throwable t) {
+          LOG.error("Error in handling event type " + event.getType()
+              + " for node " + nodeId, t);
+        }
+      }
+    }
+  }
+
+  private static final class ContainerEventDispatcher implements
+      EventHandler<RMContainerEvent> {
+
+    private final RMContext rmContext;
+
+    public ContainerEventDispatcher(RMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    @Override
+    public void handle(RMContainerEvent event) {
+      ContainerId containerId = event.getContainerId();
+      RMContainer container = this.rmContext.getRMContainers().get(containerId);
+      if (container != null) {
+        try {
+          ((EventHandler<RMContainerEvent>) container).handle(event);
+        } catch (Throwable t) {
+          LOG.error("Error in handling event type " + event.getType()
+              + " for container " + containerId, t);
+        }
+      }
+    }
+  }
+
+  protected void startWepApp() {
+    webApp = WebApps.$for("yarn", masterService).at(
+        conf.get(YarnConfiguration.RM_WEBAPP_BIND_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_WEBAPP_BIND_ADDRESS)).
+      start(new RMWebApp(this));
+
+  }
+
   @Override
   public void start() {
     try {
       doSecureLogin();
     } catch(IOException ie) {
-      throw new AvroRuntimeException("Failed to login", ie);
+      throw new YarnException("Failed to login", ie);
     }
 
-    webApp = WebApps.$for("yarn", masterService).at(
-      conf.get(YarnConfiguration.RM_WEBAPP_BIND_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_WEBAPP_BIND_ADDRESS)).
-    start(new RMWebApp(this));
-
+    startWepApp();
     DefaultMetricsSystem.initialize("ResourceManager");
 
     super.start();
 
-    synchronized(shutdown) {
+    /*synchronized(shutdown) {
       try {
         while(!shutdown.get()) {
           shutdown.wait();
@@ -232,7 +346,7 @@ public class ResourceManager extends Com
       } catch(InterruptedException ie) {
         LOG.info("Interrupted while waiting", ie);
       }
-    }
+    }*/
   }
   
   protected void doSecureLogin() throws IOException {
@@ -245,11 +359,11 @@ public class ResourceManager extends Com
     if (webApp != null) {
       webApp.stop();
     }
-  
-    synchronized(shutdown) {
+
+    /*synchronized(shutdown) {
       shutdown.set(true);
       shutdown.notifyAll();
-    }
+    }*/
 
     DefaultMetricsSystem.shutdown();
 
@@ -257,47 +371,29 @@ public class ResourceManager extends Com
   }
   
   protected ResourceTrackerService createResourceTrackerService() {
-    return new ResourceTrackerService(
-        new RMResourceTrackerImpl(this.containerTokenSecretManager, 
-            this.rmContext));
-  }
-  
-  protected ApplicationsManager createApplicationsManager() {
-    return new ApplicationsManagerImpl(
-        this.appTokenSecretManager, this.scheduler, this.rmContext);
+    return new ResourceTrackerService(this.rmContext, this.nodesListManager,
+        this.nmLivelinessMonitor, this.containerTokenSecretManager);
   }
 
   protected ClientRMService createClientRMService() {
-    return new ClientRMService(this.rmContext, this.applicationsManager
-        .getAmLivelinessMonitor(), this.applicationsManager
-        .getClientToAMSecretManager(), resourceTracker.getResourceTracker(),
-        scheduler);
+    return new ClientRMService(this.rmContext, this.amLivelinessMonitor,
+        this.clientToAMSecretManager, scheduler);
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
-    return new ApplicationMasterService(this.appTokenSecretManager,
-        scheduler, this.rmContext);
+    return new ApplicationMasterService(this.rmContext,
+        this.amLivelinessMonitor, this.appTokenSecretManager, scheduler);
   }
   
 
-  protected AdminService createAdminService(Configuration conf, 
-      ResourceScheduler scheduler, NodeTracker nodesTracker) {
-    return new AdminService(conf, scheduler, nodesTracker);
+  protected AdminService createAdminService() {
+    return new AdminService(conf, scheduler, rmContext, this.nodesListManager);
   }
 
   @Private
   public ClientRMService getClientRMService() {
     return this.clientRM;
   }
-
-  /**
-   * return applications manager.
-   * @return
-   */
-  @Private
-  public ApplicationsManager getApplicationsManager() {
-    return applicationsManager;
-  }
   
   /**
    * return the scheduler.
@@ -307,20 +403,18 @@ public class ResourceManager extends Com
   public ResourceScheduler getResourceScheduler() {
     return this.scheduler;
   }
-  
+
   /**
    * return the resource tracking component.
    * @return
    */
   @Private
-  public RMResourceTrackerImpl getResourceTracker() {
-    return this.resourceTracker.getResourceTracker();
+  public ResourceTrackerService getResourceTrackerService() {
+    return this.resourceTracker;
   }
-  
 
   @Override
   public void recover(RMState state) throws Exception {
-    applicationsManager.recover(state);
     resourceTracker.recover(state);
     scheduler.recover(state);
   }



Mime
View raw message