hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153437 [1/3] - in /hadoop/common/branches/MR-279/mapreduce: mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/ mr-cli...
Date Wed, 03 Aug 2011 11:39:57 GMT
Author: vinodkv
Date: Wed Aug  3 11:39:53 2011
New Revision: 1153437

URL: http://svn.apache.org/viewvc?rev=1153437&view=rev
Log:
Tests's compilation passes too!

Modified:
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Wed Aug  3 11:39:53 2011
@@ -325,7 +325,8 @@ public class MRApp extends MRAppMaster {
         Container container = recordFactory.newRecordInstance(Container.class);
         container.setId(cId);
         container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
-        container.setContainerManagerAddress("dummy");
+        container.getNodeId().setHost("dummy");
+        container.getNodeId().setPort(1234);
         container.setContainerToken(null);
         container.setNodeHttpAddress("localhost:9999");
         getContext().getEventHandler().handle(

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Wed Aug  3 11:39:53 2011
@@ -131,8 +131,10 @@ public class MRAppBenchmark {
                   
                   Container container = recordFactory.newRecordInstance(Container.class);
                   container.setId(cId);
-                  container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
-                  container.setContainerManagerAddress("dumm");
+                  NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
+                  nodeId.setHost("dummy");
+                  nodeId.setPort(1234);
+                  container.setNodeId(nodeId);
                   container.setContainerToken(null);
                   container.setNodeHttpAddress("localhost:9999");
                   getContext().getEventHandler()

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Wed Aug  3 11:39:53 2011
@@ -78,435 +78,429 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestRMContainerAllocator {
-  private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
-  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
-  @BeforeClass
-  public static void preTests() {
-    DefaultMetricsSystem.shutdown();
-  }
-
-  @Test
-  public void testSimple() throws Exception {
-    FifoScheduler scheduler = createScheduler();
-    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-        scheduler, new Configuration());
-
-    //add resources to scheduler
-    RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
-    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-
-    //create the container request
-    ContainerRequestEvent event1 = 
-      createReq(1, 1024, new String[]{"h1"});
-    allocator.sendRequest(event1);
-
-    //send 1 more request with different resource req
-    ContainerRequestEvent event2 = createReq(2, 1024, new String[]{"h2"});
-    allocator.sendRequest(event2);
-
-    //this tells the scheduler about the requests
-    //as nodes are not added, no allocations
-    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-
-    //send another request with different resource and priority
-    ContainerRequestEvent event3 = createReq(3, 1024, new String[]{"h3"});
-    allocator.sendRequest(event3);
-
-    //this tells the scheduler about the requests
-    //as nodes are not added, no allocations
-    assigned = allocator.schedule();
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-
-    //update resources in scheduler
-    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-
-
-    assigned = allocator.schedule();
-    checkAssignments(
-        new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
-  }
-
-  //TODO: Currently Scheduler seems to have bug where it does not work
-  //for Application asking for containers with different capabilities.
-  //@Test
-  public void testResource() throws Exception {
-    FifoScheduler scheduler = createScheduler();
-    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-        scheduler, new Configuration());
-
-    //add resources to scheduler
-    RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
-    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-
-    //create the container request
-    ContainerRequestEvent event1 = 
-      createReq(1, 1024, new String[]{"h1"});
-    allocator.sendRequest(event1);
-
-    //send 1 more request with different resource req
-    ContainerRequestEvent event2 = createReq(2, 2048, new String[]{"h2"});
-    allocator.sendRequest(event2);
-
-    //this tells the scheduler about the requests
-    //as nodes are not added, no allocations
-    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-
-    //update resources in scheduler
-    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-
-    assigned = allocator.schedule();
-    checkAssignments(
-        new ContainerRequestEvent[]{event1, event2}, assigned, false);
-  }
-
-  @Test
-  public void testMapReduceScheduling() throws Exception {
-    FifoScheduler scheduler = createScheduler();
-    Configuration conf = new Configuration();
-    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-        scheduler, conf);
-
-    //add resources to scheduler
-    RMNode nodeManager1 = addNode(scheduler, "h1", 1024);
-    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-
-    //create the container request
-    //send MAP request
-    ContainerRequestEvent event1 = 
-      createReq(1, 2048, new String[]{"h1", "h2"}, true, false);
-    allocator.sendRequest(event1);
-
-    //send REDUCE request
-    ContainerRequestEvent event2 = createReq(2, 3000, new String[]{"h1"}, false, true);
-    allocator.sendRequest(event2);
-
-    //send MAP request
-    ContainerRequestEvent event3 = createReq(3, 2048, new String[]{"h3"}, false, false);
-    allocator.sendRequest(event3);
-
-    //this tells the scheduler about the requests
-    //as nodes are not added, no allocations
-    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-
-    //update resources in scheduler
-    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-
-    assigned = allocator.schedule();
-    checkAssignments(
-        new ContainerRequestEvent[]{event1, event3}, assigned, false);
-
-    //validate that no container is assigned to h1 as it doesn't have 2048
-    for (TaskAttemptContainerAssignedEvent assig : assigned) {
-      Assert.assertFalse("Assigned count not correct", 
-          "h1".equals(assig.getContainer().getContainerManagerAddress()));
-    }
-  }
-
-
-
-  private RMNode addNode(FifoScheduler scheduler, 
-      String nodeName, int memory) {
-    NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
-    nodeId.setId(0);
-    Resource resource = recordFactory.newRecordInstance(Resource.class);
-    resource.setMemory(memory);
-    RMNode nodeManager = new RMNodeImpl(nodeId, nodeName, 0, 0,
-        ResourceTrackerService.resolve(nodeName), resource
-        );
-    scheduler.addNode(nodeManager); // Node registration
-    return nodeManager;
-  }
-
-  private FifoScheduler createScheduler() throws YarnRemoteException {
-    FifoScheduler fsc = new FifoScheduler() {
-      //override this to copy the objects
-      //otherwise FifoScheduler updates the numContainers in same objects as kept by
-      //RMContainerAllocator
-      
-      @Override
-      public synchronized Allocation allocate(ApplicationId applicationId,
-          List<ResourceRequest> ask, List<Container> release) 
-          throws IOException {
-        List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
-        for (ResourceRequest req : ask) {
-          ResourceRequest reqCopy = recordFactory.newRecordInstance(ResourceRequest.class);
-          reqCopy.setPriority(req.getPriority());
-          reqCopy.setHostName(req.getHostName());
-          reqCopy.setCapability(req.getCapability());
-          reqCopy.setNumContainers(req.getNumContainers());
-          askCopy.add(reqCopy);
-        }
-        //no need to copy release
-        return super.allocate(applicationId, askCopy, release);
-      }
-    };
-    try {
-      fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null);
-      fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class),
-          recordFactory.newRecordInstance(ApplicationMaster.class),
-          "test", null, null, StoreFactory.createVoidAppStore());
-    } catch(IOException ie) {
-      LOG.info("add application failed with ", ie);
-      assert(false);
-    }
-    return fsc;
-  }
-
-  private ContainerRequestEvent createReq(
-      int attemptid, int memory, String[] hosts) {
-    return createReq(attemptid, memory, hosts, false, false);
-  }
-  
-  private ContainerRequestEvent createReq(
-      int attemptid, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
-    ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
-    appId.setClusterTimestamp(0);
-    appId.setId(0);
-    JobId jobId = recordFactory.newRecordInstance(JobId.class);
-    jobId.setAppId(appId);
-    jobId.setId(0);
-    TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
-    taskId.setId(0);
-    taskId.setJobId(jobId);
-    if (reduce) {
-      taskId.setTaskType(TaskType.REDUCE);
-    } else {
-      taskId.setTaskType(TaskType.MAP);
-    }
-    TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
-    attemptId.setId(attemptid);
-    attemptId.setTaskId(taskId);
-    Resource containerNeed = recordFactory.newRecordInstance(Resource.class);
-    containerNeed.setMemory(memory);
-    if (earlierFailedAttempt) {
-      return ContainerRequestEvent.
-           createContainerRequestEventForFailedContainer(attemptId, containerNeed);
-    }
-    return new ContainerRequestEvent(attemptId, 
-        containerNeed, 
-        hosts, new String[] {NetworkTopology.DEFAULT_RACK});
-  }
-
-  private void checkAssignments(ContainerRequestEvent[] requests, 
-      List<TaskAttemptContainerAssignedEvent> assignments, 
-      boolean checkHostMatch) {
-    Assert.assertNotNull("Container not assigned", assignments);
-    Assert.assertEquals("Assigned count not correct", 
-        requests.length, assignments.size());
-
-    //check for uniqueness of containerIDs
-    Set<ContainerId> containerIds = new HashSet<ContainerId>();
-    for (TaskAttemptContainerAssignedEvent assigned : assignments) {
-      containerIds.add(assigned.getContainer().getId());
-    }
-    Assert.assertEquals("Assigned containers must be different", 
-        assignments.size(), containerIds.size());
-
-    //check for all assignment
-    for (ContainerRequestEvent req : requests) {
-      TaskAttemptContainerAssignedEvent assigned = null;
-      for (TaskAttemptContainerAssignedEvent ass : assignments) {
-        if (ass.getTaskAttemptID().equals(req.getAttemptID())){
-          assigned = ass;
-          break;
-        }
-      }
-      checkAssignment(req, assigned, checkHostMatch);
-    }
-  }
-
-  private void checkAssignment(ContainerRequestEvent request, 
-      TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
-    Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(),
-        assigned);
-    Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
-        assigned.getTaskAttemptID());
-    if (checkHostMatch) {
-      Assert.assertTrue("Not assigned to requested host", Arrays.asList
-          (request.getHosts()).contains(assigned.getContainer().getContainerManagerAddress()));
-    }
-
-  }
-
-  //Mock RMContainerAllocator
-  //Instead of talking to remote Scheduler,uses the local Scheduler
-  public static class LocalRMContainerAllocator extends RMContainerAllocator {
-    private static final List<TaskAttemptContainerAssignedEvent> events = 
-      new ArrayList<TaskAttemptContainerAssignedEvent>();
-
-    public static class AMRMProtocolImpl implements AMRMProtocol {
-
-      private ResourceScheduler resourceScheduler;
-
-      public AMRMProtocolImpl(ResourceScheduler resourceScheduler) {
-        this.resourceScheduler = resourceScheduler;
-      }
-
-      @Override
-      public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
-        ApplicationMaster applicationMaster = request.getApplicationMaster();
-        RegisterApplicationMasterResponse response = recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class);
-        return null;
-      }
-
-      public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException {
-        ApplicationStatus status = request.getApplicationStatus();
-        List<ResourceRequest> ask = request.getAskList();
-        List<Container> release = request.getReleaseList();
-        try {
-          AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
-          Allocation allocation = resourceScheduler.allocate(status.getApplicationId(), ask, release);
-          response.addAllNewContainers(allocation.getContainers());
-          response.setAvailableResources(allocation.getResourceLimit());
-          AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);
-          allocateResponse.setAMResponse(response);
-          return allocateResponse;
-        } catch(IOException ie) {
-          throw RPCUtil.getRemoteException(ie);
-        }
-      }
-
-      @Override
-      public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
-        ApplicationMaster applicationMaster = request.getApplicationMaster();
-        FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
-        return response;
-      }
-
-    }
-
-    private ResourceScheduler scheduler;
-    LocalRMContainerAllocator(ResourceScheduler scheduler, Configuration conf) {
-      super(null, new TestContext(events));
-      this.scheduler = scheduler;
-      super.init(conf);
-      super.start();
-    }
-
-    protected AMRMProtocol createSchedulerProxy() {
-      return new AMRMProtocolImpl(scheduler);
-    }
-
-    @Override
-    protected void register() {}
-    @Override
-    protected void unregister() {}
-
-    @Override
-    protected Resource getMinContainerCapability() {
-      Resource res = recordFactory.newRecordInstance(Resource.class);
-      res.setMemory(1024);
-      return res;
-    }
-    
-    @Override
-    protected Resource getMaxContainerCapability() {
-      Resource res = recordFactory.newRecordInstance(Resource.class);
-      res.setMemory(10240);
-      return res;
-    }
-    
-    public void sendRequest(ContainerRequestEvent req) {
-      sendRequests(Arrays.asList(new ContainerRequestEvent[]{req}));
-    }
-
-    public void sendRequests(List<ContainerRequestEvent> reqs) {
-      for (ContainerRequestEvent req : reqs) {
-        handle(req);
-      }
-    }
-
-    //API to be used by tests
-    public List<TaskAttemptContainerAssignedEvent> schedule() {
-      //run the scheduler
-      try {
-        heartbeat();
-      } catch (Exception e) {
-        LOG.error("error in heartbeat ", e);
-        throw new YarnException(e);
-      }
-
-      List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);
-      events.clear();
-      return result;
-    }
-
-    protected void startAllocatorThread() {
-      //override to NOT start thread
-    }
-
-    static class TestContext implements AppContext {
-      private List<TaskAttemptContainerAssignedEvent> events;
-      TestContext(List<TaskAttemptContainerAssignedEvent> events) {
-        this.events = events;
-      }
-      @Override
-      public Map<JobId, Job> getAllJobs() {
-        return null;
-      }
-      @Override
-      public ApplicationAttemptId getApplicationAttemptId() {
-        return recordFactory.newRecordInstance(ApplicationAttemptId.class);
-      }
-      @Override
-      public ApplicationId getApplicationID() {
-        return recordFactory.newRecordInstance(ApplicationId.class);
-      }
-      @Override
-      public EventHandler getEventHandler() {
-        return new EventHandler() {
-          @Override
-          public void handle(Event event) {
-            if (event instanceof TaskAttemptContainerAssignedEvent) {
-              events.add((TaskAttemptContainerAssignedEvent) event);
-            } //Ignoring JobCounterUpdateEvents
-          }
-        };
-      }
-      @Override
-      public Job getJob(JobId jobID) {
-        return null;
-      }
-
-      @Override
-      public String getUser() {
-        return null;
-      }
-
-      @Override
-      public Clock getClock() {
-        return null;
-      }
-
-      @Override
-      public String getApplicationName() {
-        return null;
-      }
-
-      @Override
-      public long getStartTime() {
-        return 0;
-      }
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    TestRMContainerAllocator t = new TestRMContainerAllocator();
-    t.testSimple();
-    //t.testResource();
-    t.testMapReduceScheduling();
-  }
+//  private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
+//  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+//
+//  @BeforeClass
+//  public static void preTests() {
+//    DefaultMetricsSystem.shutdown();
+//  }
+//
+//  @Test
+//  public void testSimple() throws Exception {
+//    FifoScheduler scheduler = createScheduler();
+//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+//        scheduler, new Configuration());
+//
+//    //add resources to scheduler
+//    RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
+//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
+//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
+//
+//    //create the container request
+//    ContainerRequestEvent event1 = 
+//      createReq(1, 1024, new String[]{"h1"});
+//    allocator.sendRequest(event1);
+//
+//    //send 1 more request with different resource req
+//    ContainerRequestEvent event2 = createReq(2, 1024, new String[]{"h2"});
+//    allocator.sendRequest(event2);
+//
+//    //this tells the scheduler about the requests
+//    //as nodes are not added, no allocations
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    //send another request with different resource and priority
+//    ContainerRequestEvent event3 = createReq(3, 1024, new String[]{"h3"});
+//    allocator.sendRequest(event3);
+//
+//    //this tells the scheduler about the requests
+//    //as nodes are not added, no allocations
+//    assigned = allocator.schedule();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    //update resources in scheduler
+//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
+//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
+//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
+//
+//
+//    assigned = allocator.schedule();
+//    checkAssignments(
+//        new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
+//  }
+//
+//  //TODO: Currently Scheduler seems to have bug where it does not work
+//  //for Application asking for containers with different capabilities.
+//  //@Test
+//  public void testResource() throws Exception {
+//    FifoScheduler scheduler = createScheduler();
+//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+//        scheduler, new Configuration());
+//
+//    //add resources to scheduler
+//    RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
+//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
+//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
+//
+//    //create the container request
+//    ContainerRequestEvent event1 = 
+//      createReq(1, 1024, new String[]{"h1"});
+//    allocator.sendRequest(event1);
+//
+//    //send 1 more request with different resource req
+//    ContainerRequestEvent event2 = createReq(2, 2048, new String[]{"h2"});
+//    allocator.sendRequest(event2);
+//
+//    //this tells the scheduler about the requests
+//    //as nodes are not added, no allocations
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    //update resources in scheduler
+//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
+//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
+//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
+//
+//    assigned = allocator.schedule();
+//    checkAssignments(
+//        new ContainerRequestEvent[]{event1, event2}, assigned, false);
+//  }
+//
+//  @Test
+//  public void testMapReduceScheduling() throws Exception {
+//    FifoScheduler scheduler = createScheduler();
+//    Configuration conf = new Configuration();
+//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+//        scheduler, conf);
+//
+//    //add resources to scheduler
+//    RMNode nodeManager1 = addNode(scheduler, "h1", 1024);
+//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
+//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
+//
+//    //create the container request
+//    //send MAP request
+//    ContainerRequestEvent event1 = 
+//      createReq(1, 2048, new String[]{"h1", "h2"}, true, false);
+//    allocator.sendRequest(event1);
+//
+//    //send REDUCE request
+//    ContainerRequestEvent event2 = createReq(2, 3000, new String[]{"h1"}, false, true);
+//    allocator.sendRequest(event2);
+//
+//    //send MAP request
+//    ContainerRequestEvent event3 = createReq(3, 2048, new String[]{"h3"}, false, false);
+//    allocator.sendRequest(event3);
+//
+//    //this tells the scheduler about the requests
+//    //as nodes are not added, no allocations
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    //update resources in scheduler
+//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
+//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
+//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
+//
+//    assigned = allocator.schedule();
+//    checkAssignments(
+//        new ContainerRequestEvent[]{event1, event3}, assigned, false);
+//
+//    //validate that no container is assigned to h1 as it doesn't have 2048
+//    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+//      Assert.assertFalse("Assigned count not correct", 
+//          "h1".equals(assig.getContainer().getNodeId().getHost()));
+//    }
+//  }
+//
+//
+//
+//  private RMNode addNode(FifoScheduler scheduler, 
+//      String nodeName, int memory) {
+//    NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
+//    nodeId.setHost(nodeName);
+//    nodeId.setPort(1234);
+//    Resource resource = recordFactory.newRecordInstance(Resource.class);
+//    resource.setMemory(memory);
+//    RMNode nodeManager = new RMNodeImpl(nodeId, null, nodeName, 0, 0,
+//        ResourceTrackerService.resolve(nodeName), resource);
+//    scheduler.addNode(nodeManager); // Node registration
+//    return nodeManager;
+//  }
+//
+//  private FifoScheduler createScheduler() throws YarnRemoteException {
+//    FifoScheduler fsc = new FifoScheduler() {
+//      //override this to copy the objects
+//      //otherwise FifoScheduler updates the numContainers in same objects as kept by
+//      //RMContainerAllocator
+//      
+//      @Override
+//      public synchronized void allocate(ApplicationAttemptId applicationId,
+//          List<ResourceRequest> ask) {
+//        List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+//        for (ResourceRequest req : ask) {
+//          ResourceRequest reqCopy = recordFactory.newRecordInstance(ResourceRequest.class);
+//          reqCopy.setPriority(req.getPriority());
+//          reqCopy.setHostName(req.getHostName());
+//          reqCopy.setCapability(req.getCapability());
+//          reqCopy.setNumContainers(req.getNumContainers());
+//          askCopy.add(reqCopy);
+//        }
+//        super.allocate(applicationId, askCopy);
+//      }
+//    };
+//    try {
+//      fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null);
+//      fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class),
+//          recordFactory.newRecordInstance(ApplicationMaster.class),
+//          "test", null, null, StoreFactory.createVoidAppStore());
+//    } catch(IOException ie) {
+//      LOG.info("add application failed with ", ie);
+//      assert(false);
+//    }
+//    return fsc;
+//  }
+//
+//  private ContainerRequestEvent createReq(
+//      int attemptid, int memory, String[] hosts) {
+//    return createReq(attemptid, memory, hosts, false, false);
+//  }
+//  
+//  private ContainerRequestEvent createReq(
+//      int attemptid, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
+//    ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+//    appId.setClusterTimestamp(0);
+//    appId.setId(0);
+//    JobId jobId = recordFactory.newRecordInstance(JobId.class);
+//    jobId.setAppId(appId);
+//    jobId.setId(0);
+//    TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
+//    taskId.setId(0);
+//    taskId.setJobId(jobId);
+//    if (reduce) {
+//      taskId.setTaskType(TaskType.REDUCE);
+//    } else {
+//      taskId.setTaskType(TaskType.MAP);
+//    }
+//    TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
+//    attemptId.setId(attemptid);
+//    attemptId.setTaskId(taskId);
+//    Resource containerNeed = recordFactory.newRecordInstance(Resource.class);
+//    containerNeed.setMemory(memory);
+//    if (earlierFailedAttempt) {
+//      return ContainerRequestEvent.
+//           createContainerRequestEventForFailedContainer(attemptId, containerNeed);
+//    }
+//    return new ContainerRequestEvent(attemptId, 
+//        containerNeed, 
+//        hosts, new String[] {NetworkTopology.DEFAULT_RACK});
+//  }
+//
+//  private void checkAssignments(ContainerRequestEvent[] requests, 
+//      List<TaskAttemptContainerAssignedEvent> assignments, 
+//      boolean checkHostMatch) {
+//    Assert.assertNotNull("Container not assigned", assignments);
+//    Assert.assertEquals("Assigned count not correct", 
+//        requests.length, assignments.size());
+//
+//    //check for uniqueness of containerIDs
+//    Set<ContainerId> containerIds = new HashSet<ContainerId>();
+//    for (TaskAttemptContainerAssignedEvent assigned : assignments) {
+//      containerIds.add(assigned.getContainer().getId());
+//    }
+//    Assert.assertEquals("Assigned containers must be different", 
+//        assignments.size(), containerIds.size());
+//
+//    //check for all assignment
+//    for (ContainerRequestEvent req : requests) {
+//      TaskAttemptContainerAssignedEvent assigned = null;
+//      for (TaskAttemptContainerAssignedEvent ass : assignments) {
+//        if (ass.getTaskAttemptID().equals(req.getAttemptID())){
+//          assigned = ass;
+//          break;
+//        }
+//      }
+//      checkAssignment(req, assigned, checkHostMatch);
+//    }
+//  }
+//
+//  private void checkAssignment(ContainerRequestEvent request, 
+//      TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
+//    Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(),
+//        assigned);
+//    Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
+//        assigned.getTaskAttemptID());
+//    if (checkHostMatch) {
+//      Assert.assertTrue("Not assigned to requested host", Arrays.asList(
+//          request.getHosts()).contains(
+//          assigned.getContainer().getNodeId().toString()));
+//    }
+//
+//  }
+//
+//  //Mock RMContainerAllocator
+//  //Instead of talking to remote Scheduler,uses the local Scheduler
+//  public static class LocalRMContainerAllocator extends RMContainerAllocator {
+//    private static final List<TaskAttemptContainerAssignedEvent> events = 
+//      new ArrayList<TaskAttemptContainerAssignedEvent>();
+//
+//    public static class AMRMProtocolImpl implements AMRMProtocol {
+//
+//      private ResourceScheduler resourceScheduler;
+//
+//      public AMRMProtocolImpl(ResourceScheduler resourceScheduler) {
+//        this.resourceScheduler = resourceScheduler;
+//      }
+//
+//      @Override
+//      public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
+//        RegisterApplicationMasterResponse response = recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class);
+//        return response;
+//      }
+//
+//      public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException {
+//        List<ResourceRequest> ask = request.getAskList();
+//        List<Container> release = request.getReleaseList();
+//        try {
+//          AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+//          Allocation allocation = resourceScheduler.allocate(request.getApplicationAttemptId(), ask);
+//          response.addAllNewContainers(allocation.getContainers());
+//          response.setAvailableResources(allocation.getResourceLimit());
+//          AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);
+//          allocateResponse.setAMResponse(response);
+//          return allocateResponse;
+//        } catch(IOException ie) {
+//          throw RPCUtil.getRemoteException(ie);
+//        }
+//      }
+//
+//      @Override
+//      public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
+//        FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
+//        return response;
+//      }
+//
+//    }
+//
+//    private ResourceScheduler scheduler;
+//    LocalRMContainerAllocator(ResourceScheduler scheduler, Configuration conf) {
+//      super(null, new TestContext(events));
+//      this.scheduler = scheduler;
+//      super.init(conf);
+//      super.start();
+//    }
+//
+//    protected AMRMProtocol createSchedulerProxy() {
+//      return new AMRMProtocolImpl(scheduler);
+//    }
+//
+//    @Override
+//    protected void register() {}
+//    @Override
+//    protected void unregister() {}
+//
+//    @Override
+//    protected Resource getMinContainerCapability() {
+//      Resource res = recordFactory.newRecordInstance(Resource.class);
+//      res.setMemory(1024);
+//      return res;
+//    }
+//    
+//    @Override
+//    protected Resource getMaxContainerCapability() {
+//      Resource res = recordFactory.newRecordInstance(Resource.class);
+//      res.setMemory(10240);
+//      return res;
+//    }
+//    
+//    public void sendRequest(ContainerRequestEvent req) {
+//      sendRequests(Arrays.asList(new ContainerRequestEvent[]{req}));
+//    }
+//
+//    public void sendRequests(List<ContainerRequestEvent> reqs) {
+//      for (ContainerRequestEvent req : reqs) {
+//        handle(req);
+//      }
+//    }
+//
+//    //API to be used by tests
+//    public List<TaskAttemptContainerAssignedEvent> schedule() {
+//      //run the scheduler
+//      try {
+//        heartbeat();
+//      } catch (Exception e) {
+//        LOG.error("error in heartbeat ", e);
+//        throw new YarnException(e);
+//      }
+//
+//      List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);
+//      events.clear();
+//      return result;
+//    }
+//
+//    protected void startAllocatorThread() {
+//      //override to NOT start thread
+//    }
+//
+//    static class TestContext implements AppContext {
+//      private List<TaskAttemptContainerAssignedEvent> events;
+//      TestContext(List<TaskAttemptContainerAssignedEvent> events) {
+//        this.events = events;
+//      }
+//      @Override
+//      public Map<JobId, Job> getAllJobs() {
+//        return null;
+//      }
+//      @Override
+//      public ApplicationAttemptId getApplicationAttemptId() {
+//        return recordFactory.newRecordInstance(ApplicationAttemptId.class);
+//      }
+//      @Override
+//      public ApplicationId getApplicationID() {
+//        return recordFactory.newRecordInstance(ApplicationId.class);
+//      }
+//      @Override
+//      public EventHandler getEventHandler() {
+//        return new EventHandler() {
+//          @Override
+//          public void handle(Event event) {
+//            events.add((TaskAttemptContainerAssignedEvent) event);
+//          }
+//        };
+//      }
+//      @Override
+//      public Job getJob(JobId jobID) {
+//        return null;
+//      }
+//
+//      @Override
+//      public String getUser() {
+//        return null;
+//      }
+//
+//      @Override
+//      public Clock getClock() {
+//        return null;
+//      }
+//
+//      @Override
+//      public String getApplicationName() {
+//        return null;
+//      }
+//
+//      @Override
+//      public long getStartTime() {
+//        return 0;
+//      }
+//    }
+//  }
+//
+//  public static void main(String[] args) throws Exception {
+//    TestRMContainerAllocator t = new TestRMContainerAllocator();
+//    t.testSimple();
+//    //t.testResource();
+//    t.testMapReduceScheduling();
+//  }
 }

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Wed Aug  3 11:39:53 2011
@@ -227,7 +227,7 @@ public class TestClientRedirect {
       if (amRunning) {
         application.setState(ApplicationState.RUNNING);
       } else {
-        application.setState(ApplicationState.COMPLETED);
+        application.setState(ApplicationState.SUCCEEDED);
       }
       String[] split = AMHOSTADDRESS.split(":");
       application.setHost(split[0]);

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java Wed Aug  3 11:39:53 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.Test;
@@ -116,8 +117,8 @@ public class TestMRJobsWithHistoryServic
     ApplicationId appID = TypeConverter.toYarn(job.getJobID()).getAppId();
     while (true) {
       Thread.sleep(1000);
-      if (mrCluster.getResourceManager().getRMContext().getApplications()
-          .get(appID).getState().equals(ApplicationState.COMPLETED))
+      if (mrCluster.getResourceManager().getRMContext().getRMApps()
+          .get(appID).getState().equals(RMAppState.FINISHED))
         break;
     }
     Counters counterHS = job.getCounters();

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto Wed Aug  3 11:39:53 2011
@@ -54,10 +54,12 @@ message ContainerProto {
 
 enum ApplicationStateProto {
   NEW = 1;
-  RUNNING = 2;
-  SUCCEEDED = 3;
-  KILLED = 4;
-  FAILED = 5;
+  SUBMITTED = 2;
+  RUNNING = 3;
+  RESTARTING = 4;
+  SUCCEEDED = 5;
+  FAILED = 6;
+  KILLED = 7; 
 }
 
 message ApplicationStatusProto {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java Wed Aug  3 11:39:53 2011
@@ -81,17 +81,12 @@ public class MockApps {
     final String user = newUserName();
     final String name = newAppName();
     final String queue = newQueue();
-    final Container masterContainer = null;
     return new ApplicationReport() {
       @Override public ApplicationId getApplicationId() { return id; }
       @Override public String getUser() { return user; }
       @Override public String getName() { return name; }
-      @Override public ApplicationStatus getStatus() { return status; }
       @Override public ApplicationState getState() { return state; }
       @Override public String getQueue() { return queue; }
-      @Override public Container getMasterContainer() {
-        return masterContainer;
-      }
       @Override public String getTrackingUrl() { return ""; }
       @Override
       public void setApplicationId(ApplicationId applicationId) {
@@ -99,11 +94,6 @@ public class MockApps {
         
       }
       @Override
-      public void setMasterContainer(Container container) {
-        // TODO Auto-generated method stub
-        
-      }
-      @Override
       public void setTrackingUrl(String url) {
         // TODO Auto-generated method stub
         
@@ -124,11 +114,6 @@ public class MockApps {
         
       }
       @Override
-      public void setStatus(ApplicationStatus status) {
-        // TODO Auto-generated method stub
-        
-      }
-      @Override
       public void setUser(String user) {
         // TODO Auto-generated method stub
         

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Aug  3 11:39:53 2011
@@ -169,6 +169,7 @@ public class NodeStatusUpdaterImpl exten
     this.nodeId.setPort(this.containerManagerPort);
     request.setHttpPort(this.httpPort);
     request.setResource(this.totalResource);
+    request.setNodeId(this.nodeId);
     RegistrationResponse regResponse =
         this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
     if (UserGroupInformation.isSecurityEnabled()) {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java Wed Aug  3 11:39:53 2011
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -36,12 +34,7 @@ public class LocalRMInterface implements
   
   @Override
   public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
-    String host = request.getHost();
-    int cmPort = request.getContainerManagerPort();
-    String node = host + ":" + cmPort;
-    Resource resource = request.getResource();
     RegistrationResponse registrationResponse = recordFactory.newRecordInstance(RegistrationResponse.class);
-    registrationResponse.setNodeId(recordFactory.newRecordInstance(NodeId.class));
     RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
     response.setRegistrationResponse(registrationResponse);
     return response;

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Aug  3 11:39:53 2011
@@ -75,20 +75,17 @@ public class TestNodeStatusUpdater {
 
     @Override
     public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
-      String host = request.getHost();
-      int cmPort = request.getContainerManagerPort();
-      String node = host + ":" + cmPort;
+      NodeId nodeId = request.getNodeId();
       Resource resource = request.getResource();
-      LOG.info("Registering " + node);
+      LOG.info("Registering " + nodeId.toString());
       try {
         Assert.assertEquals(InetAddress.getLocalHost().getHostAddress()
-            + ":12345", node);
+            + ":12345", nodeId.toString());
       } catch (UnknownHostException e) {
         Assert.fail(e.getMessage());
       }
       Assert.assertEquals(5 * 1024, resource.getMemory());
       RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class);
-      regResponse.setNodeId(recordFactory.newRecordInstance(NodeId.class));
       
       RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
       response.setRegistrationResponse(regResponse);
@@ -122,11 +119,9 @@ public class TestNodeStatusUpdater {
         Assert.assertEquals("Number of applications should only be one!", 1,
             nodeStatus.getAllContainers().size());
         Assert.assertEquals("Number of container for the app should be one!",
-            1, nodeStatus.getContainers(String.valueOf(applicationID.getId()))
-                .size());
-        Assert.assertEquals(2,
-            nodeStatus.getContainers(String.valueOf(applicationID.getId()))
-                .get(0).getResource().getMemory());
+            1, nodeStatus.getContainers(applicationID).size());
+        Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0)
+            .getResource().getMemory());
 
         // Checks on the NM end
         ConcurrentMap<ContainerId, Container> activeContainers =
@@ -148,14 +143,11 @@ public class TestNodeStatusUpdater {
         Assert.assertEquals("Number of applications should only be one!", 1,
             nodeStatus.getAllContainers().size());
         Assert.assertEquals("Number of container for the app should be two!",
-            2, nodeStatus.getContainers(String.valueOf(applicationID.getId()))
-                .size());
-        Assert.assertEquals(2,
-            nodeStatus.getContainers(String.valueOf(applicationID.getId()))
-                .get(0).getResource().getMemory());
-        Assert.assertEquals(3,
-            nodeStatus.getContainers(String.valueOf(applicationID.getId()))
-                .get(1).getResource().getMemory());
+            2, nodeStatus.getContainers(applicationID).size());
+        Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0)
+            .getResource().getMemory());
+        Assert.assertEquals(3, nodeStatus.getContainers(applicationID).get(1)
+            .getResource().getMemory());
 
         // Checks on the NM end
         ConcurrentMap<ContainerId, Container> activeContainers =
@@ -230,7 +222,8 @@ public class TestNodeStatusUpdater {
     System.out.println(" ----- thread already started.."
         + nm.getServiceState());
 
-    while (nm.getServiceState() == STATE.INITED) {
+    int waitCount = 0;
+    while (nm.getServiceState() == STATE.INITED || waitCount++ != 20) {
       LOG.info("Waiting for NM to start..");
       Thread.sleep(1000);
     }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Aug  3 11:39:53 2011
@@ -109,13 +109,12 @@ public class ClientRMService extends Abs
   private Map<ApplicationACL, AccessControlList> applicationACLs;
   
   public ClientRMService(RMContext rmContext,
-      AMLivelinessMonitor amLivelinessMonitor,
       ClientToAMSecretManager clientToAMSecretManager,
       YarnScheduler scheduler) {
     super(ClientRMService.class.getName());
     this.scheduler = scheduler;
     this.rmContext = rmContext;
-    this.amLivelinessMonitor = amLivelinessMonitor;
+    this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.clientToAMSecretManager = clientToAMSecretManager;
   }
   

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Aug  3 11:39:53 2011
@@ -97,7 +97,6 @@ public class ResourceManager extends Com
   private AdminService adminService;
   private ContainerAllocationExpirer containerAllocationExpirer;
   protected NMLivelinessMonitor nmLivelinessMonitor;
-  protected AMLivelinessMonitor amLivelinessMonitor;
   protected NodesListManager nodesListManager;
 
   private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -126,11 +125,11 @@ public class ResourceManager extends Com
         this.rmDispatcher);
     addService(this.containerAllocationExpirer);
 
-    this.amLivelinessMonitor = createAMLivelinessMonitor();
+    AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
     addService(amLivelinessMonitor);
 
     this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
-        this.containerAllocationExpirer, this.amLivelinessMonitor);
+        this.containerAllocationExpirer, amLivelinessMonitor);
 
     addService(nodesListManager);
 
@@ -217,7 +216,7 @@ public class ResourceManager extends Com
     return new AMLivelinessMonitor(this.rmDispatcher);
   }
 
-  private static final class ApplicationEventDispatcher implements
+  public static final class ApplicationEventDispatcher implements
       EventHandler<RMAppEvent> {
 
     private final RMContext rmContext;
@@ -241,7 +240,7 @@ public class ResourceManager extends Com
     }
   }
 
-  private static final class ApplicationAttemptEventDispatcher implements
+  public static final class ApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
 
     private final RMContext rmContext;
@@ -376,13 +375,13 @@ public class ResourceManager extends Com
   }
 
   protected ClientRMService createClientRMService() {
-    return new ClientRMService(this.rmContext, this.amLivelinessMonitor,
-        this.clientToAMSecretManager, scheduler);
+    return new ClientRMService(this.rmContext, this.clientToAMSecretManager,
+        scheduler);
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
     return new ApplicationMasterService(this.rmContext,
-        this.amLivelinessMonitor, this.appTokenSecretManager, scheduler);
+        this.appTokenSecretManager, scheduler);
   }
   
 
@@ -413,6 +412,11 @@ public class ResourceManager extends Com
     return this.resourceTracker;
   }
 
+  @Private
+  public ApplicationMasterService getApplicationMasterService() {
+    return this.masterService;
+  }
+
   @Override
   public void recover(RMState state) throws Exception {
     resourceTracker.recover(state);

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java Wed Aug  3 11:39:53 2011
@@ -83,10 +83,9 @@ AMRMProtocol, EventHandler<ApplicationMa
   private final RMContext rmContext;
   
   public ApplicationMasterService(RMContext rmContext,
-      AMLivelinessMonitor amLivelinessMonitor,
       ApplicationTokenSecretManager appTokenManager, YarnScheduler scheduler) {
     super(ApplicationMasterService.class.getName());
-    this.amLivelinessMonitor = amLivelinessMonitor;
+    this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.appTokenManager = appTokenManager;
     this.rScheduler = scheduler;
     this.reboot.setReboot(true);

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Aug  3 11:39:53 2011
@@ -53,6 +53,9 @@ public class RMAppImpl implements RMApp 
   private final String clientTokenStr;
   private final ApplicationStore appStore;
   private final Dispatcher dispatcher;
+  private final YarnScheduler scheduler;
+  private final StringBuilder diagnostics = new StringBuilder();
+  private final int maxRetries;
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private final Map<ApplicationAttemptId, RMAppAttempt> attempts
@@ -61,10 +64,7 @@ public class RMAppImpl implements RMApp 
   // Mutable fields
   private long startTime;
   private long finishTime;
-  private StringBuilder diagnostics;
   private AMLivelinessMonitor amLivelinessMonitor;
-  private YarnScheduler scheduler;
-  private int maxRetries;
   private RMAppAttempt currentAttempt;
 
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
@@ -267,12 +267,20 @@ public class RMAppImpl implements RMApp 
     this.readLock.lock();
 
     try {
+      String clientToken = "N/A";
+      String trackingUrl = "N/A";
+      String host = "N/A";
+      int rpcPort = -1;
+      if (this.currentAttempt != null) {
+        trackingUrl = this.currentAttempt.getTrackingUrl();
+        clientToken = this.currentAttempt.getClientToken();
+        host = this.currentAttempt.getHost();
+        rpcPort = this.currentAttempt.getRpcPort();
+      }
       return BuilderUtils.newApplicationReport(this.applicationId, this.user,
-          this.queue, this.name, this.currentAttempt.getHost(),
-          this.currentAttempt.getRpcPort(), this.currentAttempt
-              .getClientToken(), createApplicationState(this.stateMachine
-              .getCurrentState()), this.diagnostics.toString(),
-          this.currentAttempt.getTrackingUrl());
+          this.queue, this.name, host, rpcPort, clientToken,
+          createApplicationState(this.stateMachine.getCurrentState()),
+          this.diagnostics.toString(), trackingUrl);
     } finally {
       this.readLock.unlock();
     }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug  3 11:39:53 2011
@@ -212,7 +212,6 @@ public class FifoScheduler implements Re
     // Sanity check
     normalizeRequests(ask);
     
-    Resource limit = null;
     synchronized (application) {
 
       LOG.debug("allocate: pre-update" +

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Wed Aug  3 11:39:53 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.util.Records;
 
 @Private
 public class Application {
@@ -59,9 +61,11 @@ public class Application {
   
   private AtomicInteger taskCounter = new AtomicInteger(0);
 
+  private AtomicInteger numAttempts = new AtomicInteger(0);
   final private String user;
   final private String queue;
   final private ApplicationId applicationId;
+  final private ApplicationAttemptId applicationAttemptId;
   final private ResourceManager resourceManager;
   private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   
@@ -80,27 +84,25 @@ public class Application {
   final private Set<ResourceRequest> ask = 
     new TreeSet<ResourceRequest>(
         new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
-  final private Set<Container> release = 
-    new TreeSet<Container>(
-        new org.apache.hadoop.yarn.util.BuilderUtils.ContainerComparator());
 
   final private Map<String, NodeManager> nodes = 
     new HashMap<String, NodeManager>();
   
   Resource used = recordFactory.newRecordInstance(Resource.class);
   
-  public Application(String user, ResourceManager resourceManager)
-      throws YarnRemoteException {
+  public Application(String user, ResourceManager resourceManager) {
     this(user, "default", resourceManager);
   }
   
-  public Application(String user, String queue, ResourceManager resourceManager)
-  throws YarnRemoteException {
+  public Application(String user, String queue, ResourceManager resourceManager) {
     this.user = user;
     this.queue = queue;
     this.resourceManager = resourceManager;
     this.applicationId =
       this.resourceManager.getClientRMService().getNewApplicationId();
+    this.applicationAttemptId = Records.newRecord(ApplicationAttemptId.class);
+    this.applicationAttemptId.setApplicationId(this.applicationId);
+    this.applicationAttemptId.setAttemptId(this.numAttempts.getAndIncrement());
   }
 
   public String getUser() {
@@ -230,7 +232,7 @@ public class Application {
     ResourceRequest request = requests.get(resourceName);
     if (request == null) {
       request = 
-        org.apache.hadoop.yarn.util.BuilderUtils.create(
+        org.apache.hadoop.yarn.util.BuilderUtils.newResourceRequest(
             priority, resourceName, capability, 1);
       requests.put(resourceName, request);
     } else {
@@ -240,7 +242,7 @@ public class Application {
     // Note this down for next interaction with ResourceManager
     ask.remove(request);
     ask.add(
-        org.apache.hadoop.yarn.util.BuilderUtils.create(
+        org.apache.hadoop.yarn.util.BuilderUtils.newResourceRequest(
             request)); // clone to ensure the RM doesn't manipulate the same obj
     
     LOG.info("DEBUG --- addResourceRequest:" +
@@ -255,40 +257,26 @@ public class Application {
   public synchronized List<Container> getResources() throws IOException {
     LOG.info("DEBUG --- getResources begin:" +
         " application=" + applicationId + 
-        " #ask=" + ask.size() +
-        " #release=" + release.size());
+        " #ask=" + ask.size());
     for (ResourceRequest request : ask) {
       LOG.info("DEBUG --- getResources:" +
           " application=" + applicationId + 
           " ask-request=" + request);
     }
-    for (Container c : release) {
-      LOG.info("DEBUG --- getResources:" +
-          " application=" + applicationId + 
-          " release=" + c);
-    }
     
     // Get resources from the ResourceManager
-    List<Container> response = 
-      resourceManager.getResourceScheduler().allocate(applicationId, 
-          new ArrayList<ResourceRequest>(ask), 
-          new ArrayList<Container>(release)).getContainers();
-    
-    List<Container> containers = new ArrayList<Container>(response.size());
-    for (Container container : response) {
-      if (container.getState() != ContainerState.COMPLETE) {
-        containers.add(
-            org.apache.hadoop.yarn.util.BuilderUtils.clone(
-                container));
-      }
-    }
+    resourceManager.getResourceScheduler().allocate(applicationAttemptId,
+        new ArrayList<ResourceRequest>(ask));
+    
+    List<Container> containers = resourceManager.getRMContext().getRMApps()
+        .get(applicationId).getRMAppAttempt(applicationAttemptId)
+        .pullNewlyAllocatedContainers();
+
     // Clear state for next interaction with ResourceManager
     ask.clear();
-    release.clear();
     
     LOG.info("DEBUG --- getResources() for " + applicationId + ":" +
     		" ask=" + ask.size() + 
-    		" release= "+ release.size() + 
     		" recieved=" + containers.size());
     
     return containers;
@@ -312,10 +300,6 @@ public class Application {
     int assignedContainers = numContainers - containers.size();
     LOG.info("Application " + applicationId + " assigned " + 
         assignedContainers + "/" + numContainers);
-    if (assignedContainers < numContainers) {
-      // Release
-      release.addAll(containers);
-    }
   }
   
   public synchronized void schedule() throws IOException {
@@ -326,7 +310,7 @@ public class Application {
       List<Container> containers) throws IOException {
     for (Iterator<Container> i=containers.iterator(); i.hasNext();) {
       Container container = i.next();
-      String host = container.getContainerManagerAddress();
+      String host = container.getNodeId().toString();
       
       if (Resources.equals(requestSpec.get(priority), container.getResource())) { 
         // See which task can use this container
@@ -400,7 +384,7 @@ public class Application {
     // Note this for next interaction with ResourceManager
     ask.remove(request);
     ask.add(
-        org.apache.hadoop.yarn.util.BuilderUtils.create(
+        org.apache.hadoop.yarn.util.BuilderUtils.newResourceRequest(
         request)); // clone to ensure the RM doesn't manipulate the same obj
 
     LOG.info("DEBUG --- updateResourceRequest:" +

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Wed Aug  3 11:39:53 2011
@@ -11,6 +11,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -63,7 +64,7 @@ public class MockAM {
     amRMProtocol.registerApplicationMaster(req);
   }
 
-  public List<Container> allocate( 
+  public AMResponse allocate( 
       String host, int memory, int numContainers, 
       List<ContainerId> releases) throws Exception {
     List reqs = createReq(host, memory, 1, numContainers);
@@ -101,7 +102,7 @@ public class MockAM {
     return req;
   }
 
-  public List<Container> allocate(
+  public AMResponse allocate(
       List<Container> releases, List<ResourceRequest> resourceRequest) 
       throws Exception {
     AllocateRequest req = Records.newRecord(AllocateRequest.class);
@@ -110,7 +111,7 @@ public class MockAM {
     req.addAllAsks(resourceRequest);
     req.addAllReleases(releases);
     AllocateResponse resp = amRMProtocol.allocate(req);
-    return resp.getAMResponse().getContainerList();
+    return resp.getAMResponse();
   }
 
   public void unregisterAppAttempt() throws Exception {

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Wed Aug  3 11:39:53 2011
@@ -24,13 +24,16 @@ import java.util.Map;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 
@@ -54,9 +57,10 @@ public class MockNodes {
     return list;
   }
 
-  public static NodeId newNodeID(int id) {
+  public static NodeId newNodeID(String host, int port) {
     NodeId nid = recordFactory.newRecordInstance(NodeId.class);
-    nid.setId(id);
+    nid.setHost(host);
+    nid.setPort(port);
     return nid;
   }
 
@@ -81,8 +85,9 @@ public class MockNodes {
   public static RMNode newNodeInfo(int rack, final Resource perNode) {
     final String rackName = "rack"+ rack;
     final int nid = NODE_ID++;
-    final NodeId nodeID = newNodeID(nid);
     final String hostName = "host"+ nid;
+    final int port = 123;
+    final NodeId nodeID = newNodeID(hostName, port);
     final String httpAddress = "localhost:0";
     final NodeHealthStatus nodeHealthStatus =
         recordFactory.newRecordInstance(NodeHealthStatus.class);
@@ -121,16 +126,6 @@ public class MockNodes {
       }
 
       @Override
-      public Resource getAvailableResource() {
-        return avail;
-      }
-
-      @Override
-      public Resource getUsedResource() {
-        return used;
-      }
-
-      @Override
       public int getNumContainers() {
         return containers;
       }
@@ -141,30 +136,6 @@ public class MockNodes {
       }
 
       @Override
-      public void allocateContainer(ApplicationId applicationId,
-          List<Container> containers) {
-      }
-
-      @Override
-      public AppSchedulingInfo getReservedApplication() {
-        return null;
-      }
-
-      @Override
-      public Resource getReservedResource() {
-        return null;
-      }
-
-      @Override
-      public void reserveResource(AppSchedulingInfo application, Priority priority,
-          Resource resource) {
-      }
-
-      @Override
-      public void unreserveResource(AppSchedulingInfo application, Priority priority) {
-      }
-
-      @Override
       public List<Container> getRunningContainers() {
         // TODO Auto-generated method stub
         return null;
@@ -187,28 +158,27 @@ public class MockNodes {
       }
 
       @Override
-      public boolean releaseContainer(Container container) {
+      public RMNodeState getState() {
         // TODO Auto-generated method stub
-        return false;
+        return null;
       }
 
       @Override
-      public void updateHealthStatus(NodeHealthStatus healthStatus) {
+      public List<ApplicationId> pullAppsToCleanup() {
         // TODO Auto-generated method stub
-        
+        return null;
       }
 
       @Override
-      public NodeResponse
-          statusUpdate(Map<String, List<Container>> containers) {
+      public List<ContainerId> pullContainersToCleanUp() {
         // TODO Auto-generated method stub
         return null;
       }
 
       @Override
-      public void finishedApplication(ApplicationId applicationId) {
+      public HeartbeatResponse getLastHeartBeatResponse() {
         // TODO Auto-generated method stub
-        
+        return null;
       }
     };
   }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Aug  3 11:39:53 2011
@@ -110,7 +110,7 @@ public class MockRM extends ResourceMana
 
   @Override
   protected ClientRMService createClientRMService() {
-    return new ClientRMService(getRMContext(), amLivelinessMonitor,
+    return new ClientRMService(getRMContext(),
         clientToAMSecretManager, getResourceScheduler()) {
       @Override
       public void start() {
@@ -141,7 +141,7 @@ public class MockRM extends ResourceMana
   @Override
   protected ApplicationMasterService createApplicationMasterService() {
     return new ApplicationMasterService(getRMContext(),
-        this.amLivelinessMonitor, this.appTokenSecretManager, scheduler){
+        this.appTokenSecretManager, scheduler) {
       @Override
       public void start() {
         //override to not start rpc handler

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1153437&r1=1153436&r2=1153437&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Wed Aug  3 11:39:53 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
 @Private
@@ -71,9 +73,9 @@ public class NodeManager implements Cont
   Resource used = recordFactory.newRecordInstance(Resource.class);
 
   final ResourceTrackerService resourceTrackerService;
-  final RMNode nodeInfo;
-  final Map<String, List<Container>> containers = 
-    new HashMap<String, List<Container>>();
+  final SchedulerNode schedulerNode;
+  final Map<ApplicationId, List<Container>> containers = 
+    new HashMap<ApplicationId, List<Container>>();
   
   public NodeManager(String hostName, int containerManagerPort, int httpPort,
       String rackName, int memory,
@@ -86,20 +88,21 @@ public class NodeManager implements Cont
     this.capability = Resources.createResource(memory);
     Resources.addTo(available, capability);
 
+    this.nodeId = recordFactory.newRecordInstance(NodeId.class);
+    this.nodeId.setHost(hostName);
+    this.nodeId.setPort(containerManagerPort);
     RegisterNodeManagerRequest request = recordFactory
         .newRecordInstance(RegisterNodeManagerRequest.class);
-    request.setContainerManagerPort(containerManagerPort);
-    request.setHost(hostName);
     request.setHttpPort(httpPort);
     request.setResource(capability);
     RegistrationResponse response = resourceTrackerService
         .registerNodeManager(request).getRegistrationResponse();
-    this.nodeId = response.getNodeId();
-    this.nodeInfo = rmContext.getNodesCollection().getNodeInfo(nodeId);
+    this.schedulerNode = new SchedulerNode(rmContext.getRMNodes().get(
+        this.nodeId));
    
     // Sanity check
     Assert.assertEquals(memory, 
-       nodeInfo.getAvailableResource().getMemory());
+       schedulerNode.getAvailableResource().getMemory());
   }
   
   public String getHostName() {
@@ -145,7 +148,8 @@ public class NodeManager implements Cont
   synchronized public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
     ContainerLaunchContext containerLaunchContext = request.getContainerLaunchContext();
     
-    String applicationId = String.valueOf(containerLaunchContext.getContainerId().getAppId().getId());
+    ApplicationId applicationId = containerLaunchContext.getContainerId()
+        .getAppId();
 
     List<Container> applicationContainers = containers.get(applicationId);
     if (applicationContainers == null) {
@@ -186,9 +190,9 @@ public class NodeManager implements Cont
   synchronized public void checkResourceUsage() {
     LOG.info("Checking resource usage for " + containerManagerAddress);
     Assert.assertEquals(available.getMemory(), 
-        nodeInfo.getAvailableResource().getMemory());
+        schedulerNode.getAvailableResource().getMemory());
     Assert.assertEquals(used.getMemory(), 
-        nodeInfo.getUsedResource().getMemory());
+        schedulerNode.getUsedResource().getMemory());
   }
   
   @Override
@@ -250,11 +254,11 @@ public class NodeManager implements Cont
   }
 
   public static org.apache.hadoop.yarn.server.api.records.NodeStatus createNodeStatus(
-      NodeId nodeId, Map<String, List<Container>> containers) {
+      NodeId nodeId, Map<ApplicationId, List<Container>> containers) {
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
     org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
     nodeStatus.setNodeId(nodeId);
-    nodeStatus.addAllNewContainers(containers);
+    nodeStatus.addAllContainers(containers);
     NodeHealthStatus nodeHealthStatus = 
       recordFactory.newRecordInstance(NodeHealthStatus.class);
     nodeHealthStatus.setIsNodeHealthy(true);



Mime
View raw message