hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1153451 - in /hadoop/common/branches/MR-279/mapreduce: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ yarn/yarn-server/yarn...
Date Wed, 03 Aug 2011 11:50:48 GMT
Author: vinodkv
Date: Wed Aug  3 11:50:47 2011
New Revision: 1153451

URL: http://svn.apache.org/viewvc?rev=1153451&view=rev
Log:
Fixing sync bugs in FIFO correctly. + One other proto related bug.

Modified:
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
    hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
Wed Aug  3 11:50:47 2011
@@ -210,6 +210,7 @@ public class RMCommunicator extends Abst
               heartbeat();
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
+              // TODO: for other exceptions
             }
           } catch (InterruptedException e) {
             LOG.info("Allocated thread interrupted. Returning.");

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
Wed Aug  3 11:50:47 2011
@@ -37,14 +37,14 @@ public class AMResponsePBImpl extends Pr
     viaProto = true;
   }
   
-  public AMResponseProto getProto() {
+  public synchronized AMResponseProto getProto() {
       mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
   }
   
-  private void mergeLocalToBuilder() {
+  private synchronized void mergeLocalToBuilder() {
     if (this.newContainersList != null) {
       builder.clearNewContainers();
       Iterable<ContainerProto> iterable = getProtoIterable(this.newContainersList);
@@ -60,7 +60,7 @@ public class AMResponsePBImpl extends Pr
     }
   }
   
-  private void mergeLocalToProto() {
+  private synchronized void mergeLocalToProto() {
     if (viaProto) 
       maybeInitBuilder();
     mergeLocalToBuilder();
@@ -68,7 +68,7 @@ public class AMResponsePBImpl extends Pr
     viaProto = true;
   }
 
-  private void maybeInitBuilder() {
+  private synchronized void maybeInitBuilder() {
     if (viaProto || builder == null) {
       builder = AMResponseProto.newBuilder(proto);
     }
@@ -77,29 +77,29 @@ public class AMResponsePBImpl extends Pr
     
   
   @Override
-  public boolean getReboot() {
+  public synchronized boolean getReboot() {
     AMResponseProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getReboot());
   }
 
   @Override
-  public void setReboot(boolean reboot) {
+  public synchronized void setReboot(boolean reboot) {
     maybeInitBuilder();
     builder.setReboot((reboot));
   }
   @Override
-  public int getResponseId() {
+  public synchronized int getResponseId() {
     AMResponseProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getResponseId());
   }
 
   @Override
-  public void setResponseId(int responseId) {
+  public synchronized void setResponseId(int responseId) {
     maybeInitBuilder();
     builder.setResponseId((responseId));
   }
   @Override
-  public Resource getAvailableResources() {
+  public synchronized Resource getAvailableResources() {
     if (this.limit != null) {
       return this.limit;
     }
@@ -113,7 +113,7 @@ public class AMResponsePBImpl extends Pr
   }
 
   @Override
-  public void setAvailableResources(Resource limit) {
+  public synchronized void setAvailableResources(Resource limit) {
     maybeInitBuilder();
     if (limit == null)
       builder.clearLimit();
@@ -121,24 +121,24 @@ public class AMResponsePBImpl extends Pr
   }
 
   @Override
-  public List<Container> getNewContainerList() {
+  public synchronized List<Container> getNewContainerList() {
     initLocalNewContainerList();
     return this.newContainersList;
   }
   
   @Override
-  public Container getNewContainer(int index) {
+  public synchronized Container getNewContainer(int index) {
     initLocalNewContainerList();
     return this.newContainersList.get(index);
   }
   @Override
-  public int getNewContainerCount() {
+  public synchronized int getNewContainerCount() {
     initLocalNewContainerList();
     return this.newContainersList.size();
   }
   
   //Once this is called. containerList will never be null - untill a getProto is called.
-  private void initLocalNewContainerList() {
+  private synchronized void initLocalNewContainerList() {
     if (this.newContainersList != null) {
       return;
     }
@@ -152,35 +152,35 @@ public class AMResponsePBImpl extends Pr
   }
 
   @Override
-  public void addAllNewContainers(final List<Container> containers) {
+  public synchronized void addAllNewContainers(final List<Container> containers) {
     if (containers == null) 
       return;
     initLocalNewContainerList();
     newContainersList.addAll(containers);
   }
 
-  private Iterable<ContainerProto> getProtoIterable(
+  private synchronized Iterable<ContainerProto> getProtoIterable(
       final List<Container> newContainersList) {
     maybeInitBuilder();
     return new Iterable<ContainerProto>() {
       @Override
-      public Iterator<ContainerProto> iterator() {
+      public synchronized Iterator<ContainerProto> iterator() {
         return new Iterator<ContainerProto>() {
 
           Iterator<Container> iter = newContainersList.iterator();
 
           @Override
-          public boolean hasNext() {
+          public synchronized boolean hasNext() {
             return iter.hasNext();
           }
 
           @Override
-          public ContainerProto next() {
+          public synchronized ContainerProto next() {
             return convertToProtoFormat(iter.next());
           }
 
           @Override
-          public void remove() {
+          public synchronized void remove() {
             throw new UnsupportedOperationException();
 
           }
@@ -191,7 +191,7 @@ public class AMResponsePBImpl extends Pr
   }
   
   @Override
-  public void addNewContainer(Container containers) {
+  public synchronized void addNewContainer(Container containers) {
     initLocalNewContainerList();
     if (containers == null) 
       return;
@@ -199,36 +199,36 @@ public class AMResponsePBImpl extends Pr
   }
   
   @Override
-  public void removeNewContainer(int index) {
+  public synchronized void removeNewContainer(int index) {
     initLocalNewContainerList();
     this.newContainersList.remove(index);
   }
   @Override
-  public void clearNewContainers() {
+  public synchronized void clearNewContainers() {
     initLocalNewContainerList();
     this.newContainersList.clear();
   }
 
   //// Finished containers
   @Override
-  public List<Container> getFinishedContainerList() {
+  public synchronized List<Container> getFinishedContainerList() {
     initLocalFinishedContainerList();
     return this.finishedContainersList;
   }
   
   @Override
-  public Container getFinishedContainer(int index) {
+  public synchronized Container getFinishedContainer(int index) {
     initLocalFinishedContainerList();
     return this.finishedContainersList.get(index);
   }
   @Override
-  public int getFinishedContainerCount() {
+  public synchronized int getFinishedContainerCount() {
     initLocalFinishedContainerList();
     return this.finishedContainersList.size();
   }
   
   //Once this is called. containerList will never be null - untill a getProto is called.
-  private void initLocalFinishedContainerList() {
+  private synchronized void initLocalFinishedContainerList() {
     if (this.finishedContainersList != null) {
       return;
     }
@@ -242,7 +242,7 @@ public class AMResponsePBImpl extends Pr
   }
 
   @Override
-  public void addAllFinishedContainers(final List<Container> containers) {
+  public synchronized void addAllFinishedContainers(final List<Container> containers)
{
     if (containers == null) 
       return;
     initLocalFinishedContainerList();
@@ -250,7 +250,7 @@ public class AMResponsePBImpl extends Pr
   }
   
   @Override
-  public void addFinishedContainer(Container containers) {
+  public synchronized void addFinishedContainer(Container containers) {
     initLocalFinishedContainerList();
     if (containers == null) 
       return;
@@ -258,29 +258,29 @@ public class AMResponsePBImpl extends Pr
   }
   
   @Override
-  public void removeFinishedContainer(int index) {
+  public synchronized void removeFinishedContainer(int index) {
     initLocalFinishedContainerList();
     this.finishedContainersList.remove(index);
   }
   @Override
-  public void clearFinishedContainers() {
+  public synchronized void clearFinishedContainers() {
     initLocalFinishedContainerList();
     this.finishedContainersList.clear();
   }
 
-  private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+  private synchronized ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
     return new ContainerPBImpl(p);
   }
 
-  private ContainerProto convertToProtoFormat(Container t) {
+  private synchronized ContainerProto convertToProtoFormat(Container t) {
     return ((ContainerPBImpl)t).getProto();
   }
 
-  private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
+  private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
     return new ResourcePBImpl(p);
   }
 
-  private ResourceProto convertToProtoFormat(Resource r) {
+  private synchronized ResourceProto convertToProtoFormat(Resource r) {
     return ((ResourcePBImpl) r).getProto();
   }
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Wed Aug  3 11:50:47 2011
@@ -691,7 +691,7 @@ public class RMAppAttemptImpl implements
       // Is this container the AmContainer? If the finished container is same as
       // the AMContainer, AppAttempt fails
       if (appAttempt.masterContainer.getId().equals(container.getId())) {
-        new BaseFinalTransition(RMAppAttemptState.FAILED).transition(
+        new FinalTransition(RMAppAttemptState.FAILED).transition(
             appAttempt, containerFinishedEvent);
         return RMAppAttemptState.FAILED;
       }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
Wed Aug  3 11:50:47 2011
@@ -62,7 +62,8 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getUser();
   }
 
-  public void updateResourceRequests(List<ResourceRequest> requests) {
+  public synchronized void updateResourceRequests(
+      List<ResourceRequest> requests) {
     this.appSchedulingInfo.updateResourceRequests(requests);
   }
 
@@ -104,7 +105,7 @@ public class SchedulerApp {
   }
 
   public synchronized Collection<RMContainer> getLiveContainers() {
-    return this.liveContainers.values();
+    return new ArrayList<RMContainer>(liveContainers.values());
   }
 
   public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
@@ -130,6 +131,12 @@ public class SchedulerApp {
     ContainerId containerId = cont.getId();
     // Inform the container
     RMContainer container = getRMContainer(containerId);
+
+    if (container == null) {
+      LOG.error("Invalid container completed " + cont.getId());
+      return;
+    }
+
     if (event.equals(RMContainerEventType.FINISHED)) {
       // Have to send diagnostics for finished containers.
       container.handle(new RMContainerFinishedEvent(containerId,

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
Wed Aug  3 11:50:47 2011
@@ -1,7 +1,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -11,7 +10,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -109,15 +107,13 @@ public class SchedulerNode {
   /**
    * Release an allocated container on this node.
    * @param container container to be released
-   * @return <code>true</code> iff the container was unused, 
-   *         <code>false</code> otherwise
    */
-  public synchronized boolean releaseContainer(Container container) {
+  public synchronized void releaseContainer(Container container) {
     if (!isValidContainer(container)) {
       LOG.error("Invalid container released " + container);
-      return false;
+      return;
     }
-    
+
     /* remove the containers from the nodemanger */
     
     launchedContainers.remove(container.getId());
@@ -128,7 +124,6 @@ public class SchedulerNode {
         ", which currently has " + numContainers + " containers, " + 
         getUsedResource() + " used and " + getAvailableResource()
         + " available" + ", release resources=" + true);
-    return true;
   }
 
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Wed Aug  3 11:50:47 2011
@@ -210,7 +210,7 @@ public class FifoScheduler implements Re
   private static final Allocation EMPTY_ALLOCATION = 
       new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
   @Override
-  public synchronized Allocation allocate(
+  public Allocation allocate(
       ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
       List<Container> release) {
     SchedulerApp application = getApplication(applicationAttemptId);
@@ -275,7 +275,7 @@ public class FifoScheduler implements Re
     return nodes.get(nodeId);
   }
   
-  private void addApplication(ApplicationAttemptId appAttemptId,
+  private synchronized void addApplication(ApplicationAttemptId appAttemptId,
       String queueName, String user) {
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
         appAttemptId, queueName, user, null);
@@ -290,7 +290,7 @@ public class FifoScheduler implements Re
             RMAppAttemptEventType.APP_ACCEPTED));
   }
 
-  private void doneApplication(
+  private synchronized void doneApplication(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState)
       throws IOException {
@@ -532,7 +532,7 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private void nodeUpdate(RMNode rmNode,
+  private synchronized void nodeUpdate(RMNode rmNode,
       Map<ApplicationId, List<Container>> remoteContainers) {
     SchedulerNode node = getNode(rmNode.getNodeID());
     
@@ -562,7 +562,7 @@ public class FifoScheduler implements Re
   }  
 
   @Override
-  public synchronized void handle(SchedulerEvent event) {
+  public void handle(SchedulerEvent event) {
     switch(event.getType()) {
     case NODE_ADDED:
     {
@@ -631,7 +631,8 @@ public class FifoScheduler implements Re
   }
 
   @Lock(FifoScheduler.class)
-  private void containerCompleted(Container container, RMContainerEventType event) {
+  private synchronized void containerCompleted(Container container,
+      RMContainerEventType event) {
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
     SchedulerApp application = getApplication(applicationAttemptId);
@@ -646,10 +647,10 @@ public class FifoScheduler implements Re
           " with event: " + event);
       return;
     }
-    
+
     // Inform the application
     application.containerCompleted(container, event);
-    
+
     // Inform the node
     node.releaseContainer(container);
 
@@ -663,7 +664,7 @@ public class FifoScheduler implements Re
   private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
-  private void removeNode(RMNode nodeInfo) {
+  private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = getNode(nodeInfo.getNodeID());
     // Kill running containers
     for(Container container : node.getRunningContainers()) {
@@ -688,7 +689,7 @@ public class FifoScheduler implements Re
     return DEFAULT_QUEUE.getQueueUserAclInfo(null); 
   }
 
-  private void addNode(RMNode nodeManager) {
+  private synchronized void addNode(RMNode nodeManager) {
     this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }



Mime
View raw message