Author: vinodkv
Date: Wed Aug 3 11:48:09 2011
New Revision: 1153446
URL: http://svn.apache.org/viewvc?rev=1153446&view=rev
Log:
Fixing Tests : compilation and execution.
Added:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java
Removed:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java
Modified:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java Wed Aug 3 11:48:09 2011
@@ -1,5 +1,5 @@
package org.apache.hadoop.yarn.api.records;
public enum ContainerState {
- INITIALIZING, RUNNING, COMPLETE
-}
+ NEW, RUNNING, COMPLETE
+}
\ No newline at end of file
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto Wed Aug 3 11:48:09 2011
@@ -31,7 +31,7 @@ message ResourceProto {
}
enum ContainerStateProto {
- C_INITIALIZING = 1;
+ C_NEW = 1;
C_RUNNING = 2;
C_COMPLETE = 3;
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Wed Aug 3 11:48:09 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -164,23 +165,24 @@ public class BuilderUtils {
public static Container newContainer(RecordFactory recordFactory,
ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId,
- String containerManagerAddress, String nodeHttpAddress,
- Resource resource) {
+ String nodeHttpAddress, Resource resource) {
ContainerId containerID =
newContainerId(recordFactory, appAttemptId, containerId);
- return newContainer(containerID, nodeId, containerManagerAddress,
- nodeHttpAddress, resource);
+ return newContainer(containerID, nodeId, nodeHttpAddress, resource);
}
- public static Container newContainer(ContainerId containerId, NodeId nodeId,
- String containerManagerAddress, String nodeHttpAddress,
- Resource resource) {
+ public static Container newContainer(ContainerId containerId,
+ NodeId nodeId, String nodeHttpAddress, Resource resource) {
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
container.setNodeHttpAddress(nodeHttpAddress);
container.setResource(resource);
- container.setState(ContainerState.INITIALIZING);
+ container.setState(ContainerState.NEW);
+ ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
+ containerStatus.setContainerId(containerId);
+ containerStatus.setState(ContainerState.NEW);
+ container.setContainerStatus(containerStatus);
return container;
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Wed Aug 3 11:48:09 2011
@@ -251,7 +251,6 @@ public class ContainerImpl implements Co
case LOCALIZING:
case LOCALIZATION_FAILED:
case LOCALIZED:
- return org.apache.hadoop.yarn.api.records.ContainerState.INITIALIZING;
case RUNNING:
case EXITED_WITH_SUCCESS:
case EXITED_WITH_FAILURE:
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Wed Aug 3 11:48:09 2011
@@ -18,6 +18,7 @@ public enum RMAppAttemptEventType {
UNREGISTERED,
// Source: Containers
+ CONTAINER_ACQUIRED,
CONTAINER_ALLOCATED,
CONTAINER_FINISHED,
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Aug 3 11:48:09 2011
@@ -3,10 +3,8 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -17,7 +15,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -31,15 +28,14 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -93,9 +89,6 @@ public class RMAppAttemptImpl implements
private String finalState;
private final StringBuilder diagnostics = new StringBuilder();
- private static final CancelContainerTransition CANCEL_CONTAINER_TRANSITION
- = new CancelContainerTransition();
-
private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
RMAppAttemptEventType,
@@ -130,6 +123,10 @@ public class RMAppAttemptImpl implements
new BaseFinalTransition(RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State
+ .addTransition(RMAppAttemptState.ALLOCATED,
+ RMAppAttemptState.ALLOCATED,
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
+ new ContainerAcquiredTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
@@ -159,6 +156,10 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.CONTAINER_ALLOCATED)
.addTransition(
+ RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
+ RMAppAttemptEventType.CONTAINER_ACQUIRED,
+ new ContainerAcquiredTransition())
+ .addTransition(
RMAppAttemptState.RUNNING,
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED),
RMAppAttemptEventType.CONTAINER_FINISHED,
@@ -173,34 +174,29 @@ public class RMAppAttemptImpl implements
new FinalTransition(RMAppAttemptState.KILLED))
// Transitions from FAILED State
- .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
- RMAppAttemptEventType.CONTAINER_ALLOCATED,
- CANCEL_CONTAINER_TRANSITION)
.addTransition(
RMAppAttemptState.FAILED,
RMAppAttemptState.FAILED,
- EnumSet.of(RMAppAttemptEventType.EXPIRE,
+ EnumSet.of(
+ RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.STATUS_UPDATE,
+ RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_FINISHED))
// Transitions from FINISHED State
- .addTransition(RMAppAttemptState.FINISHED, RMAppAttemptState.FINISHED,
- RMAppAttemptEventType.CONTAINER_ALLOCATED,
- CANCEL_CONTAINER_TRANSITION)
.addTransition(
RMAppAttemptState.FINISHED,
RMAppAttemptState.FINISHED,
- EnumSet.of(RMAppAttemptEventType.EXPIRE,
+ EnumSet.of(
+ RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.UNREGISTERED,
+ RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.KILL))
// Transitions from KILLED State
- .addTransition(RMAppAttemptState.KILLED, RMAppAttemptState.KILLED,
- RMAppAttemptEventType.CONTAINER_ALLOCATED,
- CANCEL_CONTAINER_TRANSITION)
.addTransition(
RMAppAttemptState.KILLED,
RMAppAttemptState.KILLED,
@@ -209,6 +205,7 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.LAUNCH_FAILED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.REGISTERED,
+ RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
@@ -424,8 +421,11 @@ public class RMAppAttemptImpl implements
}
}
- private static final List<Container> EMPTY_CONTAINER_LIST =
+ private static final List<Container> EMPTY_CONTAINER_RELEASE_LIST =
new ArrayList<Container>();
+ private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST =
+ new ArrayList<ResourceRequest>();
+
private static final class ScheduleTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
@@ -444,7 +444,7 @@ public class RMAppAttemptImpl implements
+ appAttempt.applicationAttemptId + " required " + request);
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
- Collections.singletonList(request), EMPTY_CONTAINER_LIST);
+ Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
}
}
@@ -453,14 +453,14 @@ public class RMAppAttemptImpl implements
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
+ // Acquire the AM container from the scheduler.
+ Allocation amContainerAllocation = appAttempt.scheduler.allocate(
+ appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
+ EMPTY_CONTAINER_RELEASE_LIST);
+
// Set the masterContainer
- RMAppAttemptContainerAllocatedEvent allocatedEvent
- = (RMAppAttemptContainerAllocatedEvent) event;
- appAttempt.masterContainer = allocatedEvent.getContainer();
-
- // Make the AM container as acquired.
- appAttempt.eventHandler.handle(new RMContainerEvent(allocatedEvent
- .getContainer().getId(), RMContainerEventType.ACQUIRED));
+ appAttempt.masterContainer = amContainerAllocation.getContainers().get(
+ 0);
// Send event to launch the AM Container
appAttempt.eventHandler.handle(new AMLauncherEvent(
@@ -665,6 +665,17 @@ public class RMAppAttemptImpl implements
}
}
+ private static final class ContainerAcquiredTransition extends
+ BaseTransition {
+ @Override
+ public void transition(RMAppAttemptImpl appAttempt,
+ RMAppAttemptEvent event) {
+ RMAppAttemptContainerAcquiredEvent acquiredEvent
+ = (RMAppAttemptContainerAcquiredEvent) event;
+ appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());
+ }
+ }
+
private static final class ContainerFinishedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@@ -692,18 +703,4 @@ public class RMAppAttemptImpl implements
return RMAppAttemptState.RUNNING;
}
}
-
- private static final class CancelContainerTransition extends BaseTransition {
- @Override
- public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
- RMAppAttemptContainerAllocatedEvent containerAllocatedEvent
- = (RMAppAttemptContainerAllocatedEvent) event;
- // Kill this container.
- appAttempt.eventHandler.handle(new RMContainerEvent(
- containerAllocatedEvent.getContainer().getId(),
- RMContainerEventType.KILL));
- }
- }
-
}
Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java?rev=1153446&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java Wed Aug 3 11:48:09 2011
@@ -0,0 +1,22 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+
+public class RMAppAttemptContainerAcquiredEvent extends RMAppAttemptEvent {
+
+ private final Container container;
+
+ public RMAppAttemptContainerAcquiredEvent(ApplicationAttemptId appAttemptId,
+ Container container) {
+ super(appAttemptId, RMAppAttemptEventType.CONTAINER_ACQUIRED);
+ this.container = container;
+ }
+
+ public Container getContainer() {
+ return this.container;
+ }
+
+}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Wed Aug 3 11:48:09 2011
@@ -5,14 +5,14 @@ public enum RMContainerEventType {
// Source: scheduler
START,
- // Source: App
+ // Source: SchedulerApp
ACQUIRED,
KILL, // Also from Node on NodeRemoval
LAUNCHED,
FINISHED,
- // Source: ApplicationMasterService
+ // Source: ApplicationMasterService->Scheduler
RELEASED,
// Source: ContainerAllocationExpirer
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Wed Aug 3 11:48:09 2011
@@ -12,6 +12,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -58,7 +59,7 @@ public class RMContainerImpl implements
// Transitions from RUNNING state
.addTransition(RMContainerState.RUNNING, RMContainerState.COMPLETED,
- RMContainerEventType.FINISHED, new FinishedTransition())
+ RMContainerEventType.FINISHED, new ContainerCompletedTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.KILLED,
RMContainerEventType.KILL, new KillTransition())
@@ -147,8 +148,8 @@ public class RMContainerImpl implements
" on container " + this.containerId);
}
if (oldState != getState()) {
- LOG.info(nodeId + " Container Transitioned from " + oldState + " to "
- + getState());
+ LOG.info(event.getContainerId() + " Container Transitioned from "
+ + oldState + " to " + getState());
}
}
@@ -157,7 +158,7 @@ public class RMContainerImpl implements
}
}
- private static class RMContainerTransition implements
+ private static class BaseTransition implements
SingleArcTransition<RMContainerImpl, RMContainerEvent> {
@Override
@@ -167,7 +168,7 @@ public class RMContainerImpl implements
}
private static final class ContainerStartedTransition extends
- RMContainerTransition {
+ BaseTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
@@ -176,16 +177,20 @@ public class RMContainerImpl implements
}
}
- private static final class AcquiredTransition extends RMContainerTransition {
+ private static final class AcquiredTransition extends BaseTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Register with containerAllocationExpirer.
container.containerAllocationExpirer.register(container.getContainer());
+
+ // Tell the appAttempt
+ container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
+ container.getApplicationAttemptId(), container.getContainer()));
}
}
- private static final class LaunchedTransition extends RMContainerTransition {
+ private static final class LaunchedTransition extends BaseTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
@@ -194,19 +199,11 @@ public class RMContainerImpl implements
}
}
- private static class FinishedTransition extends RMContainerTransition {
+ private static class FinishedTransition extends BaseTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
- RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
-
- // Update container-status for diagnostics. Today we completely
- // replace it on finish. We may just need to update diagnostics.
- // ^TODO
- container.container.setContainerStatus(finishedEvent
- .getRemoteContainerStatus());
-
// Inform AppAttempt
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, container.container));
@@ -221,7 +218,7 @@ public class RMContainerImpl implements
// Unregister from containerAllocationExpirer.
container.containerAllocationExpirer.unregister(container.getContainer());
- // Inform AppAttempt, scheduler etc.
+ // Inform AppAttempt
super.transition(container, event);
}
}
@@ -238,11 +235,27 @@ public class RMContainerImpl implements
container.eventHandler.handle(new RMNodeCleanContainerEvent(
container.nodeId, container.containerId));
- // Inform appAttempt and scheduler
+ // Inform appAttempt
super.transition(container, event);
}
}
-
+ private static final class ContainerCompletedTransition extends
+ FinishedTransition {
+ @Override
+ public void transition(RMContainerImpl container, RMContainerEvent event) {
+
+ RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
+
+ // Update container-status for diagnostics. Today we completely
+ // replace it on finish. We may just need to update diagnostics.
+ // ^TODO
+ container.container.setContainerStatus(finishedEvent
+ .getRemoteContainerStatus());
+
+ // Inform appAttempt
+ super.transition(container, event);
+ }
+ }
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Aug 3 11:48:09 2011
@@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,7 +36,9 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -42,6 +47,9 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -81,6 +89,11 @@ public class RMNodeImpl implements RMNod
private final NodeHealthStatus nodeHealthStatus = recordFactory
.newRecordInstance(NodeHealthStatus.class);
+ /* set of containers that have just launched */
+ private final Map<ContainerId, Container> justLaunchedContainers =
+ new HashMap<ContainerId, Container>();
+
+
/* set of containers that need to be cleaned */
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
new ContainerIdComparator());
@@ -338,9 +351,45 @@ public class RMNodeImpl implements RMNod
return RMNodeState.UNHEALTHY;
}
+ // Filter the map to only obtain just launched containers and finished
+ // containers.
+ Map<ApplicationId, List<Container>> remoteAppContainersMap = statusEvent
+ .getContainersCollection();
+ Map<ApplicationId, List<Container>> containersMapForScheduler = new HashMap<ApplicationId, List<Container>>(
+ remoteAppContainersMap.size());
+ for (Entry<ApplicationId, List<Container>> entrySet : remoteAppContainersMap
+ .entrySet()) {
+
+ ApplicationId appId = entrySet.getKey();
+ List<Container> remoteContainerList = entrySet.getValue();
+
+ if (!containersMapForScheduler.containsKey(appId)) {
+ containersMapForScheduler.put(appId, new ArrayList<Container>(
+ remoteContainerList.size()));
+ }
+ List<Container> entryForThisApp = containersMapForScheduler
+ .get(appId);
+
+ for (Container remoteContainer : remoteContainerList) {
+
+ // Process running containers
+ ContainerId containerId = remoteContainer.getId();
+ if (remoteContainer.getState() == ContainerState.RUNNING) {
+ if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
+ // Just launched container. RM knows about it the first time.
+ rmNode.justLaunchedContainers.put(containerId, remoteContainer);
+ entryForThisApp.add(remoteContainer);
+ }
+ } else {
+ // A finished container
+ rmNode.justLaunchedContainers.remove(containerId);
+ entryForThisApp.add(remoteContainer);
+ }
+ }
+ }
+
rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeUpdateSchedulerEvent(rmNode, statusEvent
- .getContainersCollection()));
+ new NodeUpdateSchedulerEvent(rmNode, containersMapForScheduler));
return RMNodeState.RUNNING;
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java Wed Aug 3 11:48:09 2011
@@ -41,8 +41,8 @@ public class SchedulerApp {
private Map<ContainerId, RMContainer> liveContainers
= new HashMap<ContainerId, RMContainer>();
- private List<Container> newlyAllocatedContainers =
- new ArrayList<Container>();
+ private List<RMContainer> newlyAllocatedContainers =
+ new ArrayList<RMContainer>();
public SchedulerApp(AppSchedulingInfo application, Queue queue) {
this.appSchedulingInfo = application;
@@ -78,10 +78,6 @@ public class SchedulerApp {
public List<Container> getCurrentContainers() {
return this.appSchedulingInfo.getCurrentContainers();
}
-
- public synchronized Collection<RMContainer> getLiveContainers() {
- return liveContainers.values();
- }
public Collection<Priority> getPriorities() {
return this.appSchedulingInfo.getPriorities();
@@ -107,18 +103,16 @@ public class SchedulerApp {
return this.queue;
}
+ public synchronized Collection<RMContainer> getLiveContainers() {
+ return this.liveContainers.values();
+ }
+
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
- // Kill all 'live' containers
- for (RMContainer container : getLiveContainers()) {
- completedContainer(container.getContainer(),
- RMContainerEventType.KILL);
- }
-
// Cleanup all scheduling information
this.appSchedulingInfo.stop(rmAppAttemptFinalState);
}
- synchronized public void launchContainer(ContainerId containerId) {
+ synchronized public void containerLaunchedOnNode(ContainerId containerId) {
// Inform the container
RMContainer rmContainer =
getRMContainer(containerId);
@@ -131,7 +125,7 @@ public class SchedulerApp {
SchedulerApp application) {
}
- synchronized public void completedContainer(Container cont,
+ synchronized public void containerCompleted(Container cont,
RMContainerEventType event) {
ContainerId containerId = cont.getId();
// Inform the container
@@ -143,7 +137,7 @@ public class SchedulerApp {
} else {
container.handle(new RMContainerEvent(containerId, event));
}
- LOG.info("Completed container: " + container +
+ LOG.info("Completed container: " + container.getContainerId() +
" in state: " + container.getState());
// Remove from the list of containers
@@ -174,7 +168,7 @@ public class SchedulerApp {
+ c.getNodeId().toString());
// Add it to allContainers list.
- newlyAllocatedContainers.add(c);
+ newlyAllocatedContainers.add(container);
liveContainers.put(c.getId(), container);
}
@@ -183,9 +177,15 @@ public class SchedulerApp {
}
synchronized public List<Container> pullNewlyAllocatedContainers() {
- List<Container> allocatedContainers = newlyAllocatedContainers;
- newlyAllocatedContainers = new ArrayList<Container>();
- return allocatedContainers;
+ List<Container> returnContainerList = new ArrayList<Container>(
+ newlyAllocatedContainers.size());
+ for (RMContainer rmContainer : newlyAllocatedContainers) {
+ rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+ RMContainerEventType.ACQUIRED));
+ returnContainerList.add(rmContainer.getContainer());
+ }
+ newlyAllocatedContainers.clear();
+ return returnContainerList;
}
public Resource getCurrentConsumption() {
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Aug 3 11:48:09 2011
@@ -859,7 +859,7 @@ public class LeafQueue implements Queue
BuilderUtils.newContainer(this.recordFactory,
application.getApplicationAttemptId(),
application.getNewContainerId(),
- node.getNodeID(), node.getNodeAddress(),
+ node.getNodeID(),
node.getHttpAddress(), capability);
// If security is enabled, send the container-tokens too.
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Aug 3 11:48:09 2011
@@ -224,28 +224,30 @@ public class FifoScheduler implements Re
// Release containers
for (Container releasedContainer : release) {
- completedContainer(releasedContainer, RMContainerEventType.RELEASED);
+ containerCompleted(releasedContainer, RMContainerEventType.RELEASED);
}
synchronized (application) {
- LOG.debug("allocate: pre-update" +
- " applicationId=" + applicationAttemptId +
- " application=" + application);
- application.showRequests();
-
- // Update application requests
- application.updateResourceRequests(ask);
-
- LOG.debug("allocate: post-update" +
- " applicationId=" + applicationAttemptId +
- " application=" + application);
- application.showRequests();
+ if (!ask.isEmpty()) {
+ LOG.debug("allocate: pre-update" +
+ " applicationId=" + applicationAttemptId +
+ " application=" + application);
+ application.showRequests();
+
+ // Update application requests
+ application.updateResourceRequests(ask);
+
+ LOG.debug("allocate: post-update" +
+ " applicationId=" + applicationAttemptId +
+ " application=" + application);
+ application.showRequests();
+
+ LOG.debug("allocate:" +
+ " applicationId=" + applicationAttemptId +
+ " #ask=" + ask.size());
+ }
- LOG.debug("allocate:" +
- " applicationId=" + applicationAttemptId +
- " #ask=" + ask.size());
-
return new Allocation(
application.pullNewlyAllocatedContainers(),
application.getHeadroom());
@@ -300,6 +302,11 @@ public class FifoScheduler implements Re
" has completed!");
}
+ // Kill all 'live' containers
+ for (RMContainer container : application.getLiveContainers()) {
+ containerCompleted(container.getContainer(), RMContainerEventType.KILL);
+ }
+
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);
@@ -488,12 +495,12 @@ public class FifoScheduler implements Re
BuilderUtils.newContainer(recordFactory,
application.getApplicationAttemptId(),
application.getNewContainerId(),
- node.getRMNode().getNodeID(), node.getRMNode().getNodeAddress(),
+ node.getRMNode().getNodeID(),
node.getRMNode().getHttpAddress(), capability);
- RMContainer rmContainer =
- new RMContainerImpl(container,
- application.getApplicationAttemptId(), node.getNodeID(),
- null, this.rmContext.getContainerAllocationExpirer());
+ RMContainer rmContainer = new RMContainerImpl(container, application
+ .getApplicationAttemptId(), node.getNodeID(), this.rmContext
+ .getDispatcher().getEventHandler(), this.rmContext
+ .getContainerAllocationExpirer());
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
@@ -528,17 +535,15 @@ public class FifoScheduler implements Re
}
private synchronized void nodeUpdate(RMNode rmNode,
- Map<ApplicationId, List<Container>> containers) {
+ Map<ApplicationId, List<Container>> remoteContainers) {
SchedulerNode node = getNode(rmNode.getNodeID());
- // Process completed containers
- for (List<Container> appContainers : containers.values()) {
+ for (List<Container> appContainers : remoteContainers.values()) {
for (Container container : appContainers) {
- if (container.getContainerStatus().getState() == ContainerState.RUNNING
- || container.getContainerStatus().getState() == ContainerState.INITIALIZING) {
- launchContainer(container, node);
+ if (container.getState() == ContainerState.RUNNING) {
+ containerLaunchedOnNode(container, node);
} else { // has to COMPLETE
- completedContainer(container, RMContainerEventType.FINISHED);
+ containerCompleted(container, RMContainerEventType.FINISHED);
}
}
}
@@ -603,7 +608,7 @@ public class FifoScheduler implements Re
{
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
- completedContainer(containerExpiredEvent.getContainer(),
+ containerCompleted(containerExpiredEvent.getContainer(),
RMContainerEventType.EXPIRE);
}
break;
@@ -612,7 +617,7 @@ public class FifoScheduler implements Re
}
}
- private void launchContainer(Container container, SchedulerNode node) {
+ private void containerLaunchedOnNode(Container container, SchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
@@ -623,11 +628,11 @@ public class FifoScheduler implements Re
return;
}
- application.launchContainer(container.getId());
+ application.containerLaunchedOnNode(container.getId());
}
@Lock(FifoScheduler.class)
- private synchronized void completedContainer(Container container, RMContainerEventType event) {
+ private synchronized void containerCompleted(Container container, RMContainerEventType event) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
@@ -644,7 +649,7 @@ public class FifoScheduler implements Re
}
// Inform the application
- application.completedContainer(container, event);
+ application.containerCompleted(container, event);
// Inform the node
node.releaseContainer(container);
@@ -663,7 +668,7 @@ public class FifoScheduler implements Re
SchedulerNode node = getNode(nodeInfo.getNodeID());
// Kill running containers
for(Container container : node.getRunningContainers()) {
- completedContainer(container, RMContainerEventType.KILL);
+ containerCompleted(container, RMContainerEventType.KILL);
}
//Remove the node
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Wed Aug 3 11:48:09 2011
@@ -266,14 +266,16 @@ public class Application {
// Get resources from the ResourceManager
resourceManager.getResourceScheduler().allocate(applicationAttemptId,
- new ArrayList<ResourceRequest>(ask));
+ new ArrayList<ResourceRequest>(ask), new ArrayList<Container>());
System.out.println("-=======" + applicationAttemptId);
System.out.println("----------" + resourceManager.getRMContext().getRMApps()
.get(applicationId).getRMAppAttempt(applicationAttemptId));
- List<Container> containers = resourceManager.getRMContext().getRMApps()
- .get(applicationId).getRMAppAttempt(applicationAttemptId)
- .pullNewlyAllocatedContainers();
+ List<Container> containers = null;
+ // TODO: Fix
+// resourceManager.getRMContext().getRMApps()
+// .get(applicationId).getRMAppAttempt(applicationAttemptId)
+// .pullNewlyAllocatedContainers();
// Clear state for next interaction with ResourceManager
ask.clear();
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Wed Aug 3 11:48:09 2011
@@ -72,7 +72,10 @@ public class MockAM {
}
public AMResponse schedule() throws Exception {
- return allocate(releases, requests);
+ AMResponse response = allocate(requests, releases);
+ requests.clear();
+ releases.clear();
+ return response;
}
public AMResponse allocate(
@@ -85,7 +88,7 @@ public class MockAM {
cont.setId(id);
//TOOD: set all fields
}
- return allocate(toRelease, reqs);
+ return allocate(reqs, toRelease);
}
public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
@@ -122,7 +125,7 @@ public class MockAM {
}
public AMResponse allocate(
- List<Container> releases, List<ResourceRequest> resourceRequest)
+ List<ResourceRequest> resourceRequest, List<Container> releases)
throws Exception {
AllocateRequest req = Records.newRecord(AllocateRequest.class);
req.setResponseId(++responseId);
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Wed Aug 3 11:48:09 2011
@@ -93,7 +93,6 @@ public class MockNodes {
recordFactory.newRecordInstance(NodeHealthStatus.class);
final Resource used = newUsedResource(perNode);
final Resource avail = newAvailResource(perNode, used);
- final int containers = (int)(Math.random() * 8);
return new RMNode() {
@Override
public NodeId getNodeID() {
@@ -126,22 +125,11 @@ public class MockNodes {
}
@Override
- public int getNumContainers() {
- return containers;
- }
-
- @Override
public NodeHealthStatus getNodeHealthStatus() {
return nodeHealthStatus;
}
@Override
- public List<Container> getRunningContainers() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public int getCommandPort() {
return nid;
}
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Wed Aug 3 11:48:09 2011
@@ -170,7 +170,7 @@ public class NodeManager implements Cont
Container container =
BuilderUtils.newContainer(containerLaunchContext.getContainerId(),
- this.nodeId, containerManagerAddress, nodeHttpAddress,
+ this.nodeId, nodeHttpAddress,
containerLaunchContext.getResource());
applicationContainers.add(container);
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Wed Aug 3 11:48:09 2011
@@ -47,12 +47,12 @@ public class TestApplicationCleanup {
//kick the scheduler
nm1.nodeHeartbeat(true);
- List<Container> conts = am.allocate(new ArrayList<Container>(),
- new ArrayList<ResourceRequest>()).getNewContainerList();
+ List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<Container>()).getNewContainerList();
int contReceived = conts.size();
while (contReceived < request) {
- conts = am.allocate(new ArrayList<Container>(),
- new ArrayList<ResourceRequest>()).getNewContainerList();
+ conts = am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<Container>()).getNewContainerList();
contReceived += conts.size();
Log.info("Got " + contReceived + " containers. Waiting to get " + request);
Thread.sleep(2000);
@@ -73,7 +73,8 @@ public class TestApplicationCleanup {
HeartbeatResponse resp = nm1.nodeHeartbeat(true);
contsToClean = resp.getContainersToCleanupList();
apps = resp.getApplicationsToCleanupList();
- Log.info("Waiting to get cleanup events.." + cleanedConts);
+ Log.info("Waiting to get cleanup events.. cleanedConts: "
+ + cleanedConts + " cleanedApps: " + cleanedApps);
cleanedConts += contsToClean.size();
cleanedApps += apps.size();
Thread.sleep(1000);
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Wed Aug 3 11:48:09 2011
@@ -66,7 +66,7 @@ public class TestFifoScheduler {
MockNM nm2 = rm.registerNode("h2:5678", 4 * GB);
RMApp app1 = rm.submitApp(2048);
- // kick the scheduling, 2 GB given to AM1, remaining 4GB
+ // kick the scheduling, 2 GB given to AM1, remaining 4GB on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
@@ -75,7 +75,7 @@ public class TestFifoScheduler {
nm1.getNodeId()).getMemory());
RMApp app2 = rm.submitApp(2048);
- // kick the scheduling, 2GB given to AM, remaining 2 GB
+ // kick the scheduling, 2GB given to AM, remaining 2 GB on nm2
nm2.nodeHeartbeat(true);
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
@@ -85,32 +85,32 @@ public class TestFifoScheduler {
// add request for containers
am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1);
- am1.schedule(); // send the request
+ AMResponse am1Response = am1.schedule(); // send the request
// add request for containers
am2.addRequests(new String[] { "h1", "h2" }, 3 * GB, 0, 1);
- am2.schedule(); // send the request
+ AMResponse am2Response = am2.schedule(); // send the request
// kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0
nm1.nodeHeartbeat(true);
- while (attempt1.getNewlyAllocatedContainers().size() < 1) {
+ while (am1Response.getNewContainerCount() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(1000);
+ am1Response = am1.schedule();
}
- while (attempt2.getNewlyAllocatedContainers().size() < 1) {
+ while (am2Response.getNewContainerCount() < 1) {
LOG.info("Waiting for containers to be created for app 2...");
Thread.sleep(1000);
+ am2Response = am2.schedule();
}
// kick the scheduler, nothing given remaining 2 GB.
nm2.nodeHeartbeat(true);
- AMResponse resp1 = am1.schedule(); // get allocations
- List<Container> allocated1 = resp1.getNewContainerList();
+ List<Container> allocated1 = am1Response.getNewContainerList();
Assert.assertEquals(1, allocated1.size());
Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
- AMResponse resp2 = am2.schedule(); // get allocations
- List<Container> allocated2 = resp2.getNewContainerList();
+ List<Container> allocated2 = am2Response.getNewContainerList();
Assert.assertEquals(1, allocated2.size());
Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId());
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1153446&r1=1153445&r2=1153446&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Wed Aug 3 11:48:09 2011
@@ -28,7 +28,7 @@ public class TestRM {
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
rm.start();
- MockNM nm1 = rm.registerNode("h1:1234", 5000);
+ MockNM nm1 = rm.registerNode("h1:1234", 5120);
RMApp app = rm.submitApp(2000);
@@ -49,8 +49,8 @@ public class TestRM {
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
rm.start();
- MockNM nm1 = rm.registerNode("h1:1234", 5000);
- MockNM nm2 = rm.registerNode("h2:5678", 10000);
+ MockNM nm1 = rm.registerNode("h1:1234", 5120);
+ MockNM nm2 = rm.registerNode("h2:5678", 10240);
RMApp app = rm.submitApp(2000);
@@ -67,13 +67,13 @@ public class TestRM {
//kick the scheduler
nm1.nodeHeartbeat(true);
- List<Container> conts = am.allocate(new ArrayList<Container>(),
- new ArrayList<ResourceRequest>()).getNewContainerList();
+ List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<Container>()).getNewContainerList();
int contReceived = conts.size();
while (contReceived < 3) {//only 3 containers are available on node1
- conts = am.allocate(new ArrayList<Container>(),
- new ArrayList<ResourceRequest>()).getNewContainerList();
- contReceived += conts.size();
+ conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<Container>()).getNewContainerList());
+ contReceived = conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
Thread.sleep(2000);
}
@@ -81,13 +81,13 @@ public class TestRM {
//send node2 heartbeat
nm2.nodeHeartbeat(true);
- conts = am.allocate(new ArrayList<Container>(),
- new ArrayList<ResourceRequest>()).getNewContainerList();
+ conts = am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<Container>()).getNewContainerList();
contReceived = conts.size();
while (contReceived < 10) {
- conts = am.allocate(new ArrayList<Container>(),
- new ArrayList<ResourceRequest>()).getNewContainerList();
- contReceived += conts.size();
+ conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<Container>()).getNewContainerList());
+ contReceived = conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
Thread.sleep(2000);
}
|