hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153450 - in /hadoop/common/branches/MR-279/mapreduce/yarn: yarn-common/src/main/java/org/apache/hadoop/yarn/security/ yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ yarn-server/yarn-server-resourceman...
Date Wed, 03 Aug 2011 11:49:56 GMT
Author: vinodkv
Date: Wed Aug  3 11:49:55 2011
New Revision: 1153450

URL: http://svn.apache.org/viewvc?rev=1153450&view=rev
Log:
Fixing synchronization issues in FifoScheduler.

Modified:
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java?rev=1153450&r1=1153449&r2=1153450&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
Wed Aug  3 11:49:55 2011
@@ -66,7 +66,7 @@ public class ContainerTokenIdentifier ex
 
   @Override
   public void write(DataOutput out) throws IOException {
-    LOG.info("Writing ContainerTokenIdentifier to RPC layer");
+    LOG.debug("Writing ContainerTokenIdentifier to RPC layer");
     out.writeInt(this.containerId.getAppId().getId());
     out.writeInt(this.containerId.getId());
     // TODO: Cluster time-stamp?

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java?rev=1153450&r1=1153449&r2=1153450&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java
Wed Aug  3 11:49:55 2011
@@ -41,7 +41,7 @@ public class ContainerTokenSecretManager
   public SecretKey createAndGetSecretKey(CharSequence hostName) {
     String hostNameStr = hostName.toString();
     if (!this.secretkeys.containsKey(hostNameStr)) {
-      LOG.info("Creating secretKey for NM " + hostNameStr);
+      LOG.debug("Creating secretKey for NM " + hostNameStr);
       this.secretkeys.put(hostNameStr,
           createSecretKey("mySecretKey".getBytes()));
     }
@@ -55,8 +55,9 @@ public class ContainerTokenSecretManager
 
   @Override
   public byte[] createPassword(ContainerTokenIdentifier identifier) {
-    LOG.info("Creating password for " + identifier.getContainerID()
-        + " to be run on NM " + identifier.getNmHostName() + " ======= " + this.secretkeys.get(identifier.getNmHostName()));
+    LOG.debug("Creating password for " + identifier.getContainerID()
+        + " to be run on NM " + identifier.getNmHostName() + " "
+        + this.secretkeys.get(identifier.getNmHostName()));
     return createPassword(identifier.getBytes(),
         this.secretkeys.get(identifier.getNmHostName()));
   }
@@ -64,7 +65,7 @@ public class ContainerTokenSecretManager
   @Override
   public byte[] retrievePassword(ContainerTokenIdentifier identifier)
       throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
-    LOG.info("Retrieving password for " + identifier.getContainerID()
+    LOG.debug("Retrieving password for " + identifier.getContainerID()
         + " to be run on NM " + identifier.getNmHostName());
     return createPassword(identifier.getBytes(),
         this.secretkeys.get(identifier.getNmHostName()));

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153450&r1=1153449&r2=1153450&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Wed Aug  3 11:49:55 2011
@@ -210,8 +210,9 @@ public class FifoScheduler implements Re
   private static final Allocation EMPTY_ALLOCATION = 
       new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
   @Override
-  public Allocation allocate(ApplicationAttemptId applicationAttemptId,
-      List<ResourceRequest> ask, List<Container> release) {
+  public synchronized Allocation allocate(
+      ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+      List<Container> release) {
     SchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +
@@ -227,31 +228,28 @@ public class FifoScheduler implements Re
       containerCompleted(releasedContainer, RMContainerEventType.RELEASED);
     }
 
-    synchronized (application) {
+    if (!ask.isEmpty()) {
+      LOG.debug("allocate: pre-update" +
+          " applicationId=" + applicationAttemptId + 
+          " application=" + application);
+      application.showRequests();
 
-      if (!ask.isEmpty()) {
-        LOG.debug("allocate: pre-update" +
-            " applicationId=" + applicationAttemptId + 
-            " application=" + application);
-        application.showRequests();
-  
-        // Update application requests
-        application.updateResourceRequests(ask);
-  
-        LOG.debug("allocate: post-update" +
-            " applicationId=" + applicationAttemptId + 
-            " application=" + application);
-        application.showRequests();
-  
-        LOG.debug("allocate:" +
-            " applicationId=" + applicationAttemptId + 
-            " #ask=" + ask.size());
-      }
+      // Update application requests
+      application.updateResourceRequests(ask);
 
-      return new Allocation(
-          application.pullNewlyAllocatedContainers(), 
-          application.getHeadroom());
+      LOG.debug("allocate: post-update" +
+          " applicationId=" + applicationAttemptId + 
+          " application=" + application);
+      application.showRequests();
+
+      LOG.debug("allocate:" +
+          " applicationId=" + applicationAttemptId + 
+          " #ask=" + ask.size());
     }
+
+    return new Allocation(
+        application.pullNewlyAllocatedContainers(), 
+        application.getHeadroom());
   }
 
   private void normalizeRequests(List<ResourceRequest> asks) {
@@ -268,16 +266,16 @@ public class FifoScheduler implements Re
     ask.setCapability(Resources.createResource(memory));
   }
 
-  private synchronized SchedulerApp getApplication(
+  private SchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
     return applications.get(applicationAttemptId);
   }
 
-  private synchronized SchedulerNode getNode(NodeId nodeId) {
+  private SchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
   
-  private synchronized void addApplication(ApplicationAttemptId appAttemptId,
+  private void addApplication(ApplicationAttemptId appAttemptId,
       String queueName, String user) {
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
         appAttemptId, queueName, user, null);
@@ -292,7 +290,7 @@ public class FifoScheduler implements Re
             RMAppAttemptEventType.APP_ACCEPTED));
   }
 
-  private synchronized void doneApplication(
+  private void doneApplication(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState)
       throws IOException {
@@ -319,7 +317,7 @@ public class FifoScheduler implements Re
    * 
    * @param node node on which resources are available to be allocated
    */
-  private synchronized void assignContainers(SchedulerNode node) {
+  private void assignContainers(SchedulerNode node) {
     LOG.debug("assignContainers:" +
         " node=" + node.getRMNode().getNodeAddress() + 
         " #applications=" + applications.size());
@@ -534,7 +532,7 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private synchronized void nodeUpdate(RMNode rmNode,
+  private void nodeUpdate(RMNode rmNode,
       Map<ApplicationId, List<Container>> remoteContainers) {
     SchedulerNode node = getNode(rmNode.getNodeID());
     
@@ -548,18 +546,19 @@ public class FifoScheduler implements Re
       }
     }
 
-    LOG.info("Node heartbeat " + rmNode.getNodeID() + 
-        " available resource = " + node.getAvailableResource());
-    
     if (Resources.greaterThanOrEqual(node.getAvailableResource(),
         minimumAllocation)) {
+      LOG.info("Node heartbeat " + rmNode.getNodeID() + 
+          " available resource = " + node.getAvailableResource());
+      
       assignContainers(node);
+
+      LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
+          + node.getAvailableResource());
     }
     
     metrics.setAvailableResourcesToQueue(
         Resources.subtract(clusterResource, usedResource));
-    LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
-        + node.getAvailableResource());
   }  
 
   @Override
@@ -632,7 +631,7 @@ public class FifoScheduler implements Re
   }
 
   @Lock(FifoScheduler.class)
-  private synchronized void containerCompleted(Container container, RMContainerEventType
event) {
+  private void containerCompleted(Container container, RMContainerEventType event) {
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
     SchedulerApp application = getApplication(applicationAttemptId);
@@ -664,7 +663,7 @@ public class FifoScheduler implements Re
   private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
-  private synchronized void removeNode(RMNode nodeInfo) {
+  private void removeNode(RMNode nodeInfo) {
     SchedulerNode node = getNode(nodeInfo.getNodeID());
     // Kill running containers
     for(Container container : node.getRunningContainers()) {
@@ -689,7 +688,7 @@ public class FifoScheduler implements Re
     return DEFAULT_QUEUE.getQueueUserAclInfo(null); 
   }
 
-  private synchronized void addNode(RMNode nodeManager) {
+  private void addNode(RMNode nodeManager) {
     this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }



Mime
View raw message