hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153445 [2/2] - in /hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager: ./ recovery/ rmapp/attempt/ rmcontainer/ rmnode/ scheduler/ scheduler/cap...
Date Wed, 03 Aug 2011 11:46:49 GMT
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153445&r1=1153444&r2=1153445&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Wed Aug  3 11:46:46 2011
@@ -37,10 +37,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -61,10 +61,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -72,9 +72,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerFinishedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -169,11 +170,11 @@ public class FifoScheduler implements Re
   };
 
   public synchronized Resource getUsedResource(NodeId nodeId) {
-    return nodes.get(nodeId).getUsedResource();
+    return getNode(nodeId).getUsedResource();
   }
 
   public synchronized Resource getAvailableResource(NodeId nodeId) {
-    return nodes.get(nodeId).getAvailableResource();
+    return getNode(nodeId).getAvailableResource();
   }
 
   @Override
@@ -206,19 +207,26 @@ public class FifoScheduler implements Re
     }
   }
 
+  private static final Allocation EMPTY_ALLOCATION = 
+      new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
   @Override
-  public synchronized void allocate(ApplicationAttemptId applicationAttemptId,
-      List<ResourceRequest> ask) {
+  public Allocation allocate(ApplicationAttemptId applicationAttemptId,
+      List<ResourceRequest> ask, List<Container> release) {
     SchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
-      return;
+      return EMPTY_ALLOCATION;
     }
 
     // Sanity check
     normalizeRequests(ask);
-    
+
+    // Release containers
+    for (Container releasedContainer : release) {
+      completedContainer(releasedContainer, RMContainerEventType.RELEASED);
+    }
+
     synchronized (application) {
 
       LOG.debug("allocate: pre-update" +
@@ -237,16 +245,13 @@ public class FifoScheduler implements Re
       LOG.debug("allocate:" +
           " applicationId=" + applicationAttemptId + 
           " #ask=" + ask.size());
+      
+      return new Allocation(
+          application.pullNewlyAllocatedContainers(), 
+          application.getHeadroom());
     }
   }
 
-  @Override
-  public Resource getResourceLimit(ApplicationAttemptId applicationAttemptId) {
-    SchedulerApp application = getApplication(applicationAttemptId);
-    // TODO: What if null?
-    return application.getHeadroom();
-  }
-  
   private void normalizeRequests(List<ResourceRequest> asks) {
     for (ResourceRequest ask : asks) {
       normalizeRequest(ask);
@@ -256,9 +261,9 @@ public class FifoScheduler implements Re
   private void normalizeRequest(ResourceRequest ask) {
     int memory = ask.getCapability().getMemory();
     // FIXME: TestApplicationCleanup is relying on unnormalized behavior.
-    //ask.capability.memory = MINIMUM_MEMORY *
     memory = MINIMUM_MEMORY *
     ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
+    ask.setCapability(Resources.createResource(memory));
   }
 
   private synchronized SchedulerApp getApplication(
@@ -266,6 +271,10 @@ public class FifoScheduler implements Re
     return applications.get(applicationAttemptId);
   }
 
+  private synchronized SchedulerNode getNode(NodeId nodeId) {
+    return nodes.get(nodeId);
+  }
+  
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
       String queueName, String user) {
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
@@ -297,7 +306,7 @@ public class FifoScheduler implements Re
     // Remove the application
     applications.remove(applicationAttemptId);
   }
-
+  
   /**
    * Heart of the scheduler...
    * 
@@ -473,15 +482,18 @@ public class FifoScheduler implements Re
       Math.min(assignableContainers, availableContainers);
 
     if (assignedContainers > 0) {
-      List<Container> containers =
-        new ArrayList<Container>(assignedContainers);
       for (int i=0; i < assignedContainers; ++i) {
+        // Create the container
         Container container =
             BuilderUtils.newContainer(recordFactory,
                 application.getApplicationAttemptId(),
                 application.getNewContainerId(),
                 node.getRMNode().getNodeID(), node.getRMNode().getNodeAddress(),
                 node.getRMNode().getHttpAddress(), capability);
+        RMContainer rmContainer = 
+            new RMContainerImpl(container, 
+                application.getApplicationAttemptId(), node.getNodeID(), 
+                null, this.rmContext.getContainerAllocationExpirer());
         
         // If security is enabled, send the container-tokens too.
         if (UserGroupInformation.isSecurityEnabled()) {
@@ -499,100 +511,84 @@ public class FifoScheduler implements Re
           containerToken.setService(container.getNodeId().toString());
           container.setContainerToken(containerToken);
         }
-        containers.add(container);
+        
+        // Allocate!
+        application.allocate(type, node, priority, request, 
+            Collections.singletonList(rmContainer));
+        node.allocateContainer(application.getApplicationId(), 
+            container);
       }
-      application.allocate(type, node, priority, request, containers);
-      addAllocatedContainers(node, application.getApplicationAttemptId(),
-          containers);
+      
+      // Update total usage
       Resources.addTo(usedResource,
           Resources.multiply(capability, assignedContainers));
     }
+    
     return assignedContainers;
   }
 
-  private synchronized void killContainers(List<Container> containers) {
-    applicationCompletedContainers(containers);
-  }
-
-  private synchronized void applicationCompletedContainers(List<Container> containers)
{
-    for (Container c : containers) {
-      applicationCompletedContainer(c);
-    }
-  }
-
-  private synchronized void applicationCompletedContainer(Container c) {
-      SchedulerApp app = applications.get(c.getId().getAppAttemptId());
-      /** this is possible, since an application can be removed from scheduler but
-       * the nodemanger is just updating about a completed container.
-       */
-      if (app != null) {
-        app.completedContainer(c, c.getResource());
-      }
-  }
-
-  private List<Container> getCompletedContainers(Map<String, List<Container>>
allContainers) {
-    if (allContainers == null) {
-      return new ArrayList<Container>();
-    }
-    List<Container> completedContainers = new ArrayList<Container>();
-    // Iterate through the running containers and update their status
-    for (Map.Entry<String, List<Container>> e : 
-      allContainers.entrySet()) {
-      for (Container c: e.getValue()) {
-        if (c.getState() == ContainerState.COMPLETE) {
-          completedContainers.add(c);
+  private synchronized void nodeUpdate(RMNode rmNode,
+      Map<ApplicationId, List<Container>> containers) {
+    SchedulerNode node = getNode(rmNode.getNodeID());
+    
+    // Process completed containers
+    for (List<Container> appContainers : containers.values()) {
+      for (Container container : appContainers) {
+        if (container.getContainerStatus().getState() == ContainerState.RUNNING
+            || container.getContainerStatus().getState() == ContainerState.INITIALIZING)
{
+          launchContainer(container, node);
+        } else { // has to COMPLETE
+          completedContainer(container, RMContainerEventType.FINISHED);
         }
       }
     }
-    return completedContainers;
-  }
-
-  private synchronized void nodeUpdate(RMNode rmNode,
-      Map<String, List<Container>> containers) {
-    SchedulerNode node = this.nodes.get(rmNode.getNodeID());
-    node.statusUpdate(containers);
-
-    applicationCompletedContainers(getCompletedContainers(containers));
 
-    LOG.info("Node heartbeat " + rmNode.getNodeID() + " resource = " + node.getAvailableResource());
+    LOG.info("Node heartbeat " + rmNode.getNodeID() + 
+        " available resource = " + node.getAvailableResource());
+    
     if (Resources.greaterThanOrEqual(node.getAvailableResource(),
         minimumAllocation)) {
       assignContainers(node);
     }
+    
     metrics.setAvailableResourcesToQueue(
         Resources.subtract(clusterResource, usedResource));
     LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
         + node.getAvailableResource());
-
-    // TODO: Add the list of containers to be preempted when we support
   }  
 
   @Override
   public synchronized void handle(SchedulerEvent event) {
     switch(event.getType()) {
     case NODE_ADDED:
+    {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
-      break;
+    }
+    break;
     case NODE_REMOVED:
+    {
       NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
       removeNode(nodeRemovedEvent.getRemovedRMNode());
-      break;
+    }
+    break;
     case NODE_UPDATE:
-      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
-      Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
-      for (Map.Entry<ApplicationId, List<Container>> entry : contAppMapping.entrySet())
{
-        conts.put(entry.getKey().toString(), entry.getValue());
-      }
-      nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
-      break;
+    {
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = 
+      (NodeUpdateSchedulerEvent)event;
+      nodeUpdate(nodeUpdatedEvent.getRMNode(), 
+          nodeUpdatedEvent.getContainers());
+    }
+    break;
     case APP_ADDED:
+    {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
           .getQueue(), appAddedEvent.getUser());
-      break;
+    }
+    break;
     case APP_REMOVED:
+    {
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
       try {
         doneApplication(appRemovedEvent.getApplicationAttemptID(),
@@ -601,30 +597,82 @@ public class FifoScheduler implements Re
         LOG.error("Unable to remove application "
             + appRemovedEvent.getApplicationAttemptID(), ie);
       }
-      break;
-    case CONTAINER_FINISHED:
-      ContainerFinishedSchedulerEvent containerFinishedEvent = (ContainerFinishedSchedulerEvent)
event;
-      Container container = containerFinishedEvent.getContainer();
-      applicationCompletedContainer(container);
-      this.rmContext.getRMContainers().remove(container.getId());
-      releaseContainer(container.getId().getAppId(), container);
-      break;
+    }
+    break;
+    case CONTAINER_EXPIRED:
+    {
+      ContainerExpiredSchedulerEvent containerExpiredEvent = 
+          (ContainerExpiredSchedulerEvent) event;
+      completedContainer(containerExpiredEvent.getContainer(), 
+          RMContainerEventType.EXPIRE);
+    }
+    break;
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }
   }
 
+  private void launchContainer(Container container, SchedulerNode node) {
+    // Get the application for the finished container
+    ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+    SchedulerApp application = getApplication(applicationAttemptId);
+    if (application == null) {
+      LOG.info("Unknown application: " + applicationAttemptId + 
+          " launched container " + container.getId() +
+          " on node: " + node);
+      return;
+    }
+    
+    application.launchContainer(container.getId());
+  }
+
+  @Lock(FifoScheduler.class)
+  private synchronized void completedContainer(Container container, RMContainerEventType
event) {
+    // Get the application for the finished container
+    ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+    SchedulerApp application = getApplication(applicationAttemptId);
+    
+    // Get the node on which the container was allocated
+    SchedulerNode node = getNode(container.getNodeId());
+    
+    if (application == null) {
+      LOG.info("Unknown application: " + applicationAttemptId + 
+          " released container " + container.getId() +
+          " on node: " + node + 
+          " with event: " + event);
+      return;
+    }
+    
+    // Inform the application
+    application.completedContainer(container, event);
+    
+    // Inform the node
+    node.releaseContainer(container);
 
+    LOG.info("Application " + applicationAttemptId + 
+        " released container " + container.getId() +
+        " on node: " + node + 
+        " with event: " + event);
+     
+  }
+  
   private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
   private synchronized void removeNode(RMNode nodeInfo) {
+    SchedulerNode node = getNode(nodeInfo.getNodeID());
+    // Kill running containers
+    for(Container container : node.getRunningContainers()) {
+      completedContainer(container, RMContainerEventType.KILL);
+    }
+    
+    //Remove the node
     this.nodes.remove(nodeInfo.getNodeID());
+    
+    // Update cluster metrics
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
-    killContainers(nodeInfo.getRunningContainers());
   }
 
-
   @Override
   public QueueInfo getQueueInfo(String queueName,
       boolean includeChildQueues, boolean recursive) {
@@ -641,35 +689,6 @@ public class FifoScheduler implements Re
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }
 
-  private synchronized void releaseContainer(ApplicationId applicationId, 
-      Container container) {
-    // Reap containers
-    LOG.info("Application " + applicationId + " released container " +
-        container.getId());
-    nodes.get(container.getNodeId()).releaseContainer(container);
-  }
-
-  private synchronized void addAllocatedContainers(SchedulerNode node,
-      ApplicationAttemptId appAttemptId, List<Container> containers) {
-    node.allocateContainer(appAttemptId.getApplicationId(), containers);
-    for (Container container : containers) {
-      // Create the container and 'start' it.
-      ContainerId containerId = container.getId();
-      RMContainer rmContainer = new RMContainerImpl(containerId,
-          appAttemptId, node.getNodeID(), container, this.rmContext
-              .getDispatcher().getEventHandler(), this.rmContext
-              .getContainerAllocationExpirer());
-      if (this.rmContext.getRMContainers().putIfAbsent(containerId,
-          rmContainer) != null) {
-        LOG.error("Duplicate container addition! ContainerID :  "
-            + containerId);
-      } else {
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMContainerEvent(containerId, RMContainerEventType.START));
-      }
-    }
-  }
-
   @Override
   public void recover(RMState state) {
 //    for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet())
{
@@ -679,4 +698,11 @@ public class FifoScheduler implements Re
 //      app.allocate(appInfo.getContainers());
 //    }
   }
+
+  @Override
+  public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
+    SchedulerNode node = getNode(nodeId);
+    return new SchedulerNodeReport(
+        node.getUsedResource(), node.getNumContainers());
+  }
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java?rev=1153445&r1=1153444&r2=1153445&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
Wed Aug  3 11:46:46 2011
@@ -71,7 +71,8 @@ class NodesPage extends RmView {
             td(health.getIsNodeHealthy() ? "Healthy" : "Unhealthy").
             td(Times.format(health.getLastHealthReportTime())).
             td(String.valueOf(health.getHealthReport())).
-            td(String.valueOf(ni.getNumContainers())).
+            // TODO: acm: refactor2 FIXME
+            //td(String.valueOf(ni.getNumContainers())).
             // TODO: FIXME Vinodkv
 //            td(String.valueOf(ni.getUsedResource().getMemory())).
 //            td(String.valueOf(ni.getAvailableResource().getMemory())).



Mime
View raw message