Author: vinodkv
Date: Wed Aug 3 11:50:47 2011
New Revision: 1153451
URL: http://svn.apache.org/viewvc?rev=1153451&view=rev
Log:
Fixing sync bugs in FIFO correctly. + One other proto related bug.
Modified:
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
Wed Aug 3 11:50:47 2011
@@ -210,6 +210,7 @@ public class RMCommunicator extends Abst
heartbeat();
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
+ // TODO: for other exceptions
}
} catch (InterruptedException e) {
LOG.info("Allocated thread interrupted. Returning.");
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java
Wed Aug 3 11:50:47 2011
@@ -37,14 +37,14 @@ public class AMResponsePBImpl extends Pr
viaProto = true;
}
- public AMResponseProto getProto() {
+ public synchronized AMResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
- private void mergeLocalToBuilder() {
+ private synchronized void mergeLocalToBuilder() {
if (this.newContainersList != null) {
builder.clearNewContainers();
Iterable<ContainerProto> iterable = getProtoIterable(this.newContainersList);
@@ -60,7 +60,7 @@ public class AMResponsePBImpl extends Pr
}
}
- private void mergeLocalToProto() {
+ private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
@@ -68,7 +68,7 @@ public class AMResponsePBImpl extends Pr
viaProto = true;
}
- private void maybeInitBuilder() {
+ private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AMResponseProto.newBuilder(proto);
}
@@ -77,29 +77,29 @@ public class AMResponsePBImpl extends Pr
@Override
- public boolean getReboot() {
+ public synchronized boolean getReboot() {
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReboot());
}
@Override
- public void setReboot(boolean reboot) {
+ public synchronized void setReboot(boolean reboot) {
maybeInitBuilder();
builder.setReboot((reboot));
}
@Override
- public int getResponseId() {
+ public synchronized int getResponseId() {
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getResponseId());
}
@Override
- public void setResponseId(int responseId) {
+ public synchronized void setResponseId(int responseId) {
maybeInitBuilder();
builder.setResponseId((responseId));
}
@Override
- public Resource getAvailableResources() {
+ public synchronized Resource getAvailableResources() {
if (this.limit != null) {
return this.limit;
}
@@ -113,7 +113,7 @@ public class AMResponsePBImpl extends Pr
}
@Override
- public void setAvailableResources(Resource limit) {
+ public synchronized void setAvailableResources(Resource limit) {
maybeInitBuilder();
if (limit == null)
builder.clearLimit();
@@ -121,24 +121,24 @@ public class AMResponsePBImpl extends Pr
}
@Override
- public List<Container> getNewContainerList() {
+ public synchronized List<Container> getNewContainerList() {
initLocalNewContainerList();
return this.newContainersList;
}
@Override
- public Container getNewContainer(int index) {
+ public synchronized Container getNewContainer(int index) {
initLocalNewContainerList();
return this.newContainersList.get(index);
}
@Override
- public int getNewContainerCount() {
+ public synchronized int getNewContainerCount() {
initLocalNewContainerList();
return this.newContainersList.size();
}
//Once this is called. containerList will never be null - untill a getProto is called.
- private void initLocalNewContainerList() {
+ private synchronized void initLocalNewContainerList() {
if (this.newContainersList != null) {
return;
}
@@ -152,35 +152,35 @@ public class AMResponsePBImpl extends Pr
}
@Override
- public void addAllNewContainers(final List<Container> containers) {
+ public synchronized void addAllNewContainers(final List<Container> containers) {
if (containers == null)
return;
initLocalNewContainerList();
newContainersList.addAll(containers);
}
- private Iterable<ContainerProto> getProtoIterable(
+ private synchronized Iterable<ContainerProto> getProtoIterable(
final List<Container> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerProto>() {
@Override
- public Iterator<ContainerProto> iterator() {
+ public synchronized Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
Iterator<Container> iter = newContainersList.iterator();
@Override
- public boolean hasNext() {
+ public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
- public ContainerProto next() {
+ public synchronized ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
- public void remove() {
+ public synchronized void remove() {
throw new UnsupportedOperationException();
}
@@ -191,7 +191,7 @@ public class AMResponsePBImpl extends Pr
}
@Override
- public void addNewContainer(Container containers) {
+ public synchronized void addNewContainer(Container containers) {
initLocalNewContainerList();
if (containers == null)
return;
@@ -199,36 +199,36 @@ public class AMResponsePBImpl extends Pr
}
@Override
- public void removeNewContainer(int index) {
+ public synchronized void removeNewContainer(int index) {
initLocalNewContainerList();
this.newContainersList.remove(index);
}
@Override
- public void clearNewContainers() {
+ public synchronized void clearNewContainers() {
initLocalNewContainerList();
this.newContainersList.clear();
}
//// Finished containers
@Override
- public List<Container> getFinishedContainerList() {
+ public synchronized List<Container> getFinishedContainerList() {
initLocalFinishedContainerList();
return this.finishedContainersList;
}
@Override
- public Container getFinishedContainer(int index) {
+ public synchronized Container getFinishedContainer(int index) {
initLocalFinishedContainerList();
return this.finishedContainersList.get(index);
}
@Override
- public int getFinishedContainerCount() {
+ public synchronized int getFinishedContainerCount() {
initLocalFinishedContainerList();
return this.finishedContainersList.size();
}
//Once this is called. containerList will never be null - untill a getProto is called.
- private void initLocalFinishedContainerList() {
+ private synchronized void initLocalFinishedContainerList() {
if (this.finishedContainersList != null) {
return;
}
@@ -242,7 +242,7 @@ public class AMResponsePBImpl extends Pr
}
@Override
- public void addAllFinishedContainers(final List<Container> containers) {
+ public synchronized void addAllFinishedContainers(final List<Container> containers)
{
if (containers == null)
return;
initLocalFinishedContainerList();
@@ -250,7 +250,7 @@ public class AMResponsePBImpl extends Pr
}
@Override
- public void addFinishedContainer(Container containers) {
+ public synchronized void addFinishedContainer(Container containers) {
initLocalFinishedContainerList();
if (containers == null)
return;
@@ -258,29 +258,29 @@ public class AMResponsePBImpl extends Pr
}
@Override
- public void removeFinishedContainer(int index) {
+ public synchronized void removeFinishedContainer(int index) {
initLocalFinishedContainerList();
this.finishedContainersList.remove(index);
}
@Override
- public void clearFinishedContainers() {
+ public synchronized void clearFinishedContainers() {
initLocalFinishedContainerList();
this.finishedContainersList.clear();
}
- private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+ private synchronized ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
return new ContainerPBImpl(p);
}
- private ContainerProto convertToProtoFormat(Container t) {
+ private synchronized ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl)t).getProto();
}
- private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
+ private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
- private ResourceProto convertToProtoFormat(Resource r) {
+ private synchronized ResourceProto convertToProtoFormat(Resource r) {
return ((ResourcePBImpl) r).getProto();
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Wed Aug 3 11:50:47 2011
@@ -691,7 +691,7 @@ public class RMAppAttemptImpl implements
// Is this container the AmContainer? If the finished container is same as
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer.getId().equals(container.getId())) {
- new BaseFinalTransition(RMAppAttemptState.FAILED).transition(
+ new FinalTransition(RMAppAttemptState.FAILED).transition(
appAttempt, containerFinishedEvent);
return RMAppAttemptState.FAILED;
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
Wed Aug 3 11:50:47 2011
@@ -62,7 +62,8 @@ public class SchedulerApp {
return this.appSchedulingInfo.getUser();
}
- public void updateResourceRequests(List<ResourceRequest> requests) {
+ public synchronized void updateResourceRequests(
+ List<ResourceRequest> requests) {
this.appSchedulingInfo.updateResourceRequests(requests);
}
@@ -104,7 +105,7 @@ public class SchedulerApp {
}
public synchronized Collection<RMContainer> getLiveContainers() {
- return this.liveContainers.values();
+ return new ArrayList<RMContainer>(liveContainers.values());
}
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
@@ -130,6 +131,12 @@ public class SchedulerApp {
ContainerId containerId = cont.getId();
// Inform the container
RMContainer container = getRMContainer(containerId);
+
+ if (container == null) {
+ LOG.error("Invalid container completed " + cont.getId());
+ return;
+ }
+
if (event.equals(RMContainerEventType.FINISHED)) {
// Have to send diagnostics for finished containers.
container.handle(new RMContainerFinishedEvent(containerId,
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
Wed Aug 3 11:50:47 2011
@@ -1,7 +1,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -11,7 +10,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -109,15 +107,13 @@ public class SchedulerNode {
/**
* Release an allocated container on this node.
* @param container container to be released
- * @return <code>true</code> iff the container was unused,
- * <code>false</code> otherwise
*/
- public synchronized boolean releaseContainer(Container container) {
+ public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container)) {
LOG.error("Invalid container released " + container);
- return false;
+ return;
}
-
+
/* remove the containers from the nodemanger */
launchedContainers.remove(container.getId());
@@ -128,7 +124,6 @@ public class SchedulerNode {
", which currently has " + numContainers + " containers, " +
getUsedResource() + " used and " + getAvailableResource()
+ " available" + ", release resources=" + true);
- return true;
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153451&r1=1153450&r2=1153451&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Wed Aug 3 11:50:47 2011
@@ -210,7 +210,7 @@ public class FifoScheduler implements Re
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
@Override
- public synchronized Allocation allocate(
+ public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<Container> release) {
SchedulerApp application = getApplication(applicationAttemptId);
@@ -275,7 +275,7 @@ public class FifoScheduler implements Re
return nodes.get(nodeId);
}
- private void addApplication(ApplicationAttemptId appAttemptId,
+ private synchronized void addApplication(ApplicationAttemptId appAttemptId,
String queueName, String user) {
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
appAttemptId, queueName, user, null);
@@ -290,7 +290,7 @@ public class FifoScheduler implements Re
RMAppAttemptEventType.APP_ACCEPTED));
}
- private void doneApplication(
+ private synchronized void doneApplication(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState)
throws IOException {
@@ -532,7 +532,7 @@ public class FifoScheduler implements Re
return assignedContainers;
}
- private void nodeUpdate(RMNode rmNode,
+ private synchronized void nodeUpdate(RMNode rmNode,
Map<ApplicationId, List<Container>> remoteContainers) {
SchedulerNode node = getNode(rmNode.getNodeID());
@@ -562,7 +562,7 @@ public class FifoScheduler implements Re
}
@Override
- public synchronized void handle(SchedulerEvent event) {
+ public void handle(SchedulerEvent event) {
switch(event.getType()) {
case NODE_ADDED:
{
@@ -631,7 +631,8 @@ public class FifoScheduler implements Re
}
@Lock(FifoScheduler.class)
- private void containerCompleted(Container container, RMContainerEventType event) {
+ private synchronized void containerCompleted(Container container,
+ RMContainerEventType event) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
@@ -646,10 +647,10 @@ public class FifoScheduler implements Re
" with event: " + event);
return;
}
-
+
// Inform the application
application.containerCompleted(container, event);
-
+
// Inform the node
node.releaseContainer(container);
@@ -663,7 +664,7 @@ public class FifoScheduler implements Re
private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
- private void removeNode(RMNode nodeInfo) {
+ private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = getNode(nodeInfo.getNodeID());
// Kill running containers
for(Container container : node.getRunningContainers()) {
@@ -688,7 +689,7 @@ public class FifoScheduler implements Re
return DEFAULT_QUEUE.getQueueUserAclInfo(null);
}
- private void addNode(RMNode nodeManager) {
+ private synchronized void addNode(RMNode nodeManager) {
this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
}
|