Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java Mon Oct 15 21:09:59 2012
@@ -39,9 +39,9 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
@@ -61,10 +61,9 @@ import org.apache.hadoop.yarn.state.Stat
public class AMContainerImpl implements AMContainer {
private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
-
+
private final ReadLock readLock;
private final WriteLock writeLock;
- // TODO Use ContainerId or a custom JvmId.
private final ContainerId containerId;
// Container to be used for getters on capability, locality etc.
private final Container container;
@@ -88,7 +87,7 @@ public class AMContainerImpl implements
private TaskAttemptId pendingAttempt;
private TaskAttemptId runningAttempt;
- private TaskAttemptId interruptedEvent;
+ private List<TaskAttemptId> failedAssignments;
private TaskAttemptId pullAttempt;
private boolean inError = false;
@@ -109,53 +108,59 @@ public class AMContainerImpl implements
private void initStateMachineFactory() {
stateMachineFactory =
stateMachineFactory
- .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_START_REQUEST, createLaunchRequestTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, createLaunchRequestTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtAllocatedTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtAllocatedTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, createStopRequestTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtAllocatedTransition())
- .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorTransition())
- .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOPPING), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptTransition())
+ .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, createLaunchedTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, createLaunchFailedTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
.addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtLaunchingTransition())
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtLaunchingTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtLaunchingTransition())
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorAtLaunchingTransition())
-
-
- .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOPPING), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtIdleTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), createGenericErrorAtLaunchingTransition())
+
+ .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtIdleTransition())
.addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, createPullTAAtIdleTransition())
.addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtIdleTransition())
- .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtIdleTransition())
- .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_TIMED_OUT, createTimedOutAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, createTimedOutAtIdleTransition())
.addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtIdleTransition())
- .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), createGenericErrorAtIdleTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, createAssignTaskAttemptAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
.addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, createTASucceededAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_TIMED_OUT, createTimedOutAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, createStopRequestAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, createTimedOutAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), createGenericErrorAtRunningTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtStoppingTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtStoppingTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, createStopFailedAtNMStopRequested()) // TODO XXX: Rename these.
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtNMStopRequestedTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, createGenericErrorAtStoppingTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtStoppingTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, createCompletedAtStoppingTransition())
- .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedBaseTransition())
- .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
- .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_START_REQUEST, createGenericErrorAtStoppingTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtStoppingTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, createGenericErrorAtStoppingTransition())
.addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, createAssignTAAtCompletedTransition())
- .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedBaseTransition())
- .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_START_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_STOP_FAILED), createGenericErrorAtStoppingTransition())
- .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
-
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, createNodeFailedAtCompletedTransition())
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST), createGenericErrorAtStoppingTransition())
+
.installTopology();
}
@@ -335,8 +340,8 @@ public class AMContainerImpl implements
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
container.inError = true;
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
- "AMScheduler Error: TaskAttempt should not be" +
- " allocated before a launch request.");
+ "AMScheduler Error: TaskAttempt allocated to unlaunched container: "
+ + container.getContainerId());
container.sendCompletedToScheduler();
container.deAllocate();
LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId()
@@ -387,6 +392,10 @@ public class AMContainerImpl implements
}
}
+ protected void registerFailedTAAssignment(TaskAttemptId taId) {
+ failedAssignments.add(taId);
+ }
+
protected void deAllocate() {
sendEvent(new RMCommunicatorContainerDeAllocateRequestEvent(containerId));
}
@@ -396,15 +405,17 @@ public class AMContainerImpl implements
}
protected void sendTerminatedToTaskAttempt(TaskAttemptId taId, String message) {
- if (message != null) {
- sendEvent(new TaskAttemptDiagnosticsUpdateEvent(taId, message));
- }
- sendEvent(new TaskAttemptEventTerminated(taId));
+ sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
}
- protected void sendKillRequestToTaskAttempt(TaskAttemptId taId) {
- sendEvent(new TaskAttemptEventKillRequest(taId,
- "Node running the contianer failed"));
+ protected void sendTerminatingToTA(TaskAttemptId taId, String message) {
+ sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+ }
+
+ protected void sendNodeFailureToTA(AMContainerEvent event,
+ TaskAttemptId taId, String message) {
+ sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+ // TODO XXX: Diag message from the node. Otherwise include the nodeId
}
protected void sendStopRequestToNM() {
@@ -439,11 +450,14 @@ public class AMContainerImpl implements
container.inError = true;
String errorMessage = "AMScheduler Error: Multiple simultaneous " +
"taskAttempt allocations to: " + container.getContainerId();
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
- errorMessage);
- container.deAllocate();
+ container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+ container.registerFailedTAAssignment(event.getTaskAttemptId());
+ // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The
+ // NMCommunicator should be able to handle this. The STOP_REQUEST would
+ // only go out after the START_REQUEST.
LOG.warn(errorMessage);
- return AMContainerState.STOPPING;
+ container.sendStopRequestToNM();
+ return AMContainerState.STOP_REQUESTED;
}
container.pendingAttempt = event.getTaskAttemptId();
container.remoteTaskMap.put(event.getTaskAttemptId(),
@@ -490,7 +504,7 @@ public class AMContainerImpl implements
container.pendingAttempt = null;
if (container.lastTaskFinishTime != 0) {
long idleTimeDiff = System.currentTimeMillis() - container.lastTaskFinishTime;
- LOG.info("Computing idle time for container: " + container.getContainerId() + ", lastFinishTime: " + container.lastTaskFinishTime + ", Incremented by: " + idleTimeDiff);
+ LOG.info("XXX: Computing idle time for container: " + container.getContainerId() + ", lastFinishTime: " + container.lastTaskFinishTime + ", Incremented by: " + idleTimeDiff);
container.idleTimeBetweenTasks += System.currentTimeMillis() - container.lastTaskFinishTime;
}
LOG.info("XXX: Assigned task + [" + container.runningAttempt + "] to container: [" + container.getContainerId() + "]");
@@ -512,8 +526,8 @@ public class AMContainerImpl implements
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
- container.sendEvent(new TaskAttemptDiagnosticsUpdateEvent(
- container.pendingAttempt, event.getMessage()));
+ container.sendTerminatingToTA(container.pendingAttempt,
+ event.getMessage());
}
container.deAllocate();
}
@@ -531,7 +545,8 @@ public class AMContainerImpl implements
if (container.pendingAttempt != null) {
String errorMessage = "Container" + container.getContainerId()
+ " failed. Received COMPLETED event while trying to launch";
- container.sendTerminatedToTaskAttempt(container.pendingAttempt,errorMessage);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+ errorMessage);
LOG.warn(errorMessage);
// TODO XXX Maybe nullify pendingAttempt.
}
@@ -548,11 +563,14 @@ public class AMContainerImpl implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.pendingAttempt != null) {
+ container.sendTerminatingToTA(container.pendingAttempt,
+ " Container" + container.getContainerId() + " received a STOP_REQUEST");
+ }
container.sendStopRequestToNM();
- container.deAllocate();
}
}
-
+
protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
createNodeFailedAtLaunchingTransition() {
return new NodeFailedAtLaunching();
@@ -563,7 +581,10 @@ public class AMContainerImpl implements
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
- container.sendKillRequestToTaskAttempt(container.pendingAttempt);
+ container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
+ // TODO XXX: Maybe include a diagnostic message along with the incoming
+ // Node failure event.
+ container.sendTerminatingToTA(container.pendingAttempt, "Node failure");
}
container.sendStopRequestToNM();
container.deAllocate();
@@ -575,7 +596,7 @@ public class AMContainerImpl implements
return new AssignTaskAttemptAtIdle();
}
- // TODO Make this the base for all assignRequests. Some more error checking in
+ // TODO XXX Make this the base for all assignRequests. Some more error checking in
// that case.
protected static class AssignTaskAttemptAtIdle
implements
@@ -588,17 +609,16 @@ public class AMContainerImpl implements
container.inError = true;
String errorMessage = "AMScheduler Error: Multiple simultaneous "
+ "taskAttempt allocations to: " + container.getContainerId();
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
- errorMessage);
+ container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+ container.registerFailedTAAssignment(event.getTaskAttemptId());
LOG.warn(errorMessage);
container.sendStopRequestToNM();
- container.deAllocate();
container.containerHeartbeatHandler.unregister(container.containerId);
- return AMContainerState.STOPPING;
+ return AMContainerState.STOP_REQUESTED;
}
container.pendingAttempt = event.getTaskAttemptId();
- // TODO LATER. Cleanup the remoteTaskMap.
+ // TODO XXX LATER. Cleanup the remoteTaskMap.
container.remoteTaskMap.put(event.getTaskAttemptId(),
event.getRemoteTask());
return AMContainerState.IDLE;
@@ -617,10 +637,12 @@ public class AMContainerImpl implements
LOG.info("Cotnainer with id: " + container.getContainerId()
+ " Completed." + " Previous state was: " + container.getState());
if (container.pendingAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+ "Container " + container.getContainerId() + " FINISHED.");
}
container.sendCompletedToScheduler();
container.containerHeartbeatHandler.unregister(container.containerId);
+ container.unregisterJvmFromListener(container.jvmId);
}
}
@@ -629,16 +651,13 @@ public class AMContainerImpl implements
return new StopRequestAtIdle();
}
- protected static class StopRequestAtIdle implements
- SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ protected static class StopRequestAtIdle extends StopRequestAtLaunching {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
LOG.info("XXX: IdleTimeBetweenTasks: " + container.idleTimeBetweenTasks);
- container.sendStopRequestToNM();
- container.deAllocate();
container.containerHeartbeatHandler.unregister(container.containerId);
container.unregisterJvmFromListener(container.jvmId);
- // TODO XXXXXXXXX: Unregister from TAL so that the Container kills itself (via a kill task assignment)
}
}
@@ -648,6 +667,7 @@ public class AMContainerImpl implements
}
protected static class TimedOutAtIdle extends StopRequestAtIdle {
+ // TODO XXX: Override to change the diagnostic message that goes to the TaskAttempt. Functionality is the same.
}
protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
@@ -675,15 +695,13 @@ public class AMContainerImpl implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
- container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
+ container.sendTerminatedToTaskAttempt(container.runningAttempt,
+ "Container " + container.getContainerId()
+ + " FINISHED while task was running");
container.sendCompletedToScheduler();
container.containerHeartbeatHandler.unregister(container.containerId);
container.unregisterAttemptFromListener(container.runningAttempt);
container.unregisterJvmFromListener(container.jvmId);
- container.interruptedEvent = container.runningAttempt;
- container.runningAttempt = null;
-
-
}
}
@@ -696,10 +714,9 @@ public class AMContainerImpl implements
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.unregisterAttemptFromListener(container.runningAttempt);
-// container.unregisterJvmFromListener(container.jvmId);
+ container.sendTerminatingToTA(container.runningAttempt,
+ " Container" + container.getContainerId() + " received a STOP_REQUEST");
// TODO XXX: All running transition. verify whether runningAttempt should be null.
- container.interruptedEvent = container.runningAttempt;
- container.runningAttempt = null;
}
}
@@ -709,6 +726,7 @@ public class AMContainerImpl implements
}
protected static class TimedOutAtRunning extends StopRequestAtRunning {
+ // TODO XXX: Change the error message.
}
protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
@@ -721,12 +739,10 @@ public class AMContainerImpl implements
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
- container.sendKillRequestToTaskAttempt(container.runningAttempt);
+ container.sendNodeFailureToTA(cEvent, container.runningAttempt, null);
+ container.sendTerminatingToTA(container.runningAttempt, "Node failure");
+
container.unregisterAttemptFromListener(container.runningAttempt);
- container.unregisterJvmFromListener(container.jvmId);
- container.interruptedEvent = container.runningAttempt;
- container.runningAttempt = null;
-
}
}
@@ -744,9 +760,9 @@ public class AMContainerImpl implements
container.inError = true;
String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ " cannot be allocated to container: " + container.getContainerId()
- + " in STOPPING state";
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
- errorMessage);
+ + " in " + container.getState() + " state";
+ container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+ container.registerFailedTAAssignment(event.getTaskAttemptId());
}
}
@@ -761,6 +777,7 @@ public class AMContainerImpl implements
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.inError = true;
+ // TODO XXX: Anything else required in the error transitions ?
}
}
@@ -791,22 +808,32 @@ public class AMContainerImpl implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
- // XXX: Would some of these events not have gone out when entering the STOPPING state. Fix errorMessages
+ // TODO XXX: Set everything to null after sending these out.
if (container.pendingAttempt != null) {
container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
}
if (container.runningAttempt != null) {
container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
}
- if (container.interruptedEvent != null) {
- container.sendTerminatedToTaskAttempt(container.interruptedEvent, null);
- }
container.sendCompletedToScheduler();
}
}
+ protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
+ createStopFailedAtNMStopRequested() {
+ return new StopFailedAtNMStopRequested();
+ }
+
+ protected static class StopFailedAtNMStopRequested implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ @Override
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ container.deAllocate();
+ }
+ }
- protected SingleArcTransition<AMContainerImpl, AMContainerEvent> createNodeFailedBaseTransition() {
+ protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
+ createNodeFailedBaseTransition() {
return new NodeFailedBase();
}
@@ -820,43 +847,96 @@ public class AMContainerImpl implements
// let multiple events go out and the TA should be able to handle them.
// Kill_TA going out in this case.
if (container.runningAttempt != null) {
- container.killTaskAttempt(container.runningAttempt);
+ container.sendNodeFailureToTA(cEvent, container.runningAttempt, null);
+ container.sendTerminatingToTA(container.runningAttempt, "Node Failure");
}
if (container.pendingAttempt != null) {
- container.killTaskAttempt(container.pendingAttempt);
+ container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
}
for (TaskAttemptId attemptId : container.completedAttempts) {
// TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs.
-// if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
- container.killTaskAttempt(attemptId);
-// }s
+ container.sendNodeFailureToTA(cEvent, attemptId, null);
}
-
}
}
- private void killTaskAttempt(TaskAttemptId attemptId) {
- sendEvent(new TaskAttemptEventKillRequest(attemptId, "The node running the task attempt was marked as bad"));
+ protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
+ createNodeFailedAtStoppingTransition() {
+ return new NodeFailedAtSopping();
}
+ protected static class NodeFailedAtSopping extends NodeFailedBase {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ if (container.runningAttempt != null) {
+ container.sendTerminatingToTA(container.runningAttempt, "Node Failure");
+ }
+ }
+ }
+
+ protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
+ createNodeFailedAtCompletedTransition() {
+ return new NodeFailedAtCompleted();
+ }
+
+ protected static class NodeFailedAtCompleted extends NodeFailedBase {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ super.transition(container, cEvent);
+ if (container.runningAttempt != null) {
+ container.sendTerminatedToTaskAttempt(container.runningAttempt,
+ "Node Failure");
+ }
+ }
+ }
+
+ protected SingleArcTransition<AMContainerImpl, AMContainerEvent> createNodeFailedAtNMStopRequestedTransition() {
+ return new NodeFailedAtNMStopRequested();
+ }
+
+ protected static class NodeFailedAtNMStopRequested implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+ public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ if (container.runningAttempt != null) {
+ container.sendNodeFailureToTA(cEvent, container.runningAttempt,
+ null);
+ container.sendTerminatingToTA(container.runningAttempt, "Node Failure");
+ }
+ if (container.pendingAttempt != null) {
+ container.sendNodeFailureToTA(cEvent, container.pendingAttempt,
+ null);
+ }
+ for (TaskAttemptId attemptId : container.completedAttempts) {
+ // TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs.
+ container.sendNodeFailureToTA(cEvent, attemptId, null);
+ }
+ for (TaskAttemptId attemptId : container.failedAssignments) {
+ container.sendNodeFailureToTA(cEvent, attemptId, null);
+ }
+ container.deAllocate();
+ }
+ }
+
protected SingleArcTransition<AMContainerImpl, AMContainerEvent>
createNodeFailedAtIdleTransition() {
return new NodeFailedAtIdle();
}
-
- protected static class NodeFailedAtIdle implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
-
+
+ protected static class NodeFailedAtIdle implements
+ SingleArcTransition<AMContainerImpl, AMContainerEvent> {
+
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.sendStopRequestToNM();
container.deAllocate();
if (container.pendingAttempt != null) {
- container.sendKillRequestToTaskAttempt(container.pendingAttempt);
+ container.sendNodeFailureToTA(cEvent, container.pendingAttempt, null);
+ container.sendTerminatingToTA(container.pendingAttempt, "Node Failure");
}
for (TaskAttemptId taId : container.completedAttempts) {
- container.sendKillRequestToTaskAttempt(taId);
+ container.sendNodeFailureToTA(cEvent, taId, null);
}
container.containerHeartbeatHandler.unregister(container.containerId);
+ container.unregisterJvmFromListener(container.jvmId);
}
}
@@ -873,16 +953,18 @@ public class AMContainerImpl implements
container.inError = true;
String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ " cannot be allocated to container: " + container.getContainerId()
- + " in RUNNING state";
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), errorMessage);
+ + " in RUNNING state. Already executing TaskAttempt: "
+ + container.runningAttempt;
+ container.sendTerminatingToTA(event.getTaskAttemptId(), errorMessage);
+ container.registerFailedTAAssignment(event.getTaskAttemptId());
+
+ container.sendTerminatingToTA(container.runningAttempt, errorMessage);
+
container.sendStopRequestToNM();
- container.deAllocate();
container.unregisterAttemptFromListener(container.runningAttempt);
container.unregisterJvmFromListener(container.jvmId);
container.containerHeartbeatHandler.unregister(container.containerId);
- container.interruptedEvent = container.runningAttempt;
- container.runningAttempt = null;
- // TODO XXX: Is the TAL unregister required ?
+
}
}
@@ -926,6 +1008,7 @@ public class AMContainerImpl implements
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.containerHeartbeatHandler.unregister(container.containerId);
+ container.unregisterJvmFromListener(container.jvmId);
}
}
@@ -939,12 +1022,11 @@ public class AMContainerImpl implements
super.transition(container, cEvent);
container.unregisterAttemptFromListener(container.runningAttempt);
container.unregisterJvmFromListener(container.jvmId);
- container.interruptedEvent = container.runningAttempt;
- container.runningAttempt = null;
}
}
// TODO Create a generic ERROR state. Container tries informing relevant components in this case.
+ // TODO XXX: Rename all generic error transitions.
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java Mon Oct 15 21:09:59 2012
@@ -38,7 +38,7 @@ public class AMContainerLaunchRequestEve
public AMContainerLaunchRequestEvent(ContainerId containerId, JobId jobId,
TaskType taskType, Token<JobTokenIdentifier> jobToken,
Credentials credentials, boolean shouldProfile, JobConf jobConf) {
- super(containerId, AMContainerEventType.C_START_REQUEST);
+ super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
this.jobId = jobId;
this.taskTypeForContainer = taskType;
this.jobToken = jobToken;
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerState.java Mon Oct 15 21:09:59 2012
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package org.apache.hadoop.mapreduce.v2.app2.rm.container;
public enum AMContainerState {
@@ -5,6 +22,9 @@ public enum AMContainerState {
LAUNCHING,
IDLE,
RUNNING,
+ // indicates a NM stop request has been attempted. This request could fail, in
+ // which case an RM stop request needs to be sent.
+ STOP_REQUESTED,
STOPPING,
COMPLETED,
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventTaskAttemptEnded.java Mon Oct 15 21:09:59 2012
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package org.apache.hadoop.mapreduce.v2.app2.rm.node;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -11,7 +28,8 @@ public class AMNodeEventTaskAttemptEnded
private final ContainerId containerId;
private final TaskAttemptId taskAttemptId;
- public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId, TaskAttemptId taskAttemptId, boolean failed) {
+ public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId,
+ TaskAttemptId taskAttemptId, boolean failed) {
super(nodeId, AMNodeEventType.N_TA_ENDED);
this.failed = failed;
this.containerId = containerId;
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java Mon Oct 15 21:09:59 2012
@@ -29,7 +29,7 @@ public enum AMNodeEventType {
//Producer: RMCommunicator
N_TURNED_UNHEALTHY,
N_TURNED_HEALTHY,
- N_NODE_COUNT_UPDATED,
+ N_NODE_COUNT_UPDATED, // for blacklisting.
//Producer: AMNodeManager
N_BLACKLISTING_ENABLED,
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java Mon Oct 15 21:09:59 2012
@@ -259,6 +259,7 @@ public class AMNodeImpl implements AMNod
AMNodeEventType.N_NODE_WAS_BLACKLISTED));
return AMNodeState.BLACKLISTED;
// TODO XXX: An event likely needs to go out to the scheduler.
+ // XXX Someone needs to update the scheduler tables - send a ZEROd request to the scheduler. Who's doing that ?
}
}
return AMNodeState.ACTIVE;
@@ -378,6 +379,7 @@ public class AMNodeImpl implements AMNod
LOG.info("Node: " + node.getNodeId()
+ " got allocated a contaienr with id: " + event.getContainerId()
+ " while in UNHEALTHY state. Releasing it.");
+ // TODO XXX: Maybe consider including some diagnostics with this event. (RM reported NODE as unhealthy maybe). Which would then be included in diagnostics from the Container.
node.sendEvent(new AMContainerEvent(event.getContainerId(),
AMContainerEventType.C_NODE_FAILED));
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Mon Oct 15 21:09:59 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapreduce.jobhistory;
-import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@@ -51,7 +50,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
public class TestJobHistoryEventHandler {
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Mon Oct 15 21:09:59 2012
@@ -72,8 +72,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
@@ -261,7 +260,8 @@ public class MRApp extends MRAppMaster {
TaskAttemptReport report = attempt.getReport();
while (!finalState.equals(report.getTaskAttemptState()) &&
timeoutSecs++ < 20) {
- System.out.println("TaskAttempt State is : " + report.getTaskAttemptState() +
+ System.out.println("TaskAttempt State for " + attempt.getID() + " is : " +
+ report.getTaskAttemptState() +
" Waiting for state : " + finalState +
" progress : " + report.getProgress());
report = attempt.getReport();
@@ -651,24 +651,27 @@ public class MRApp extends MRAppMaster {
.getRemoteTask()));
break;
- case S_TA_STOP_REQUEST:
+ case S_TA_ENDED:
// Send out a Container_stop_request.
- AMSchedulerTAStopRequestEvent stEvent = (AMSchedulerTAStopRequestEvent) rawEvent;
- LOG.info("XXX: Handling S_TA_STOP_REQUEST for attemptId:" + stEvent.getAttemptID());
- getContext().getEventHandler().handle(
- new AMContainerEvent(attemptToContainerIdMap.get(stEvent
- .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
-
- break;
- case S_TA_SUCCEEDED:
- // No re-use in MRApp. Stop the container.
- AMSchedulerTASucceededEvent suEvent = (AMSchedulerTASucceededEvent) rawEvent;
- LOG.info("XXX: Handling S_TA_SUCCEEDED for attemptId: "
- + suEvent.getAttemptID());
- getContext().getEventHandler().handle(
- new AMContainerEvent(attemptToContainerIdMap.get(suEvent
- .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
- break;
+ AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) rawEvent;
+ LOG.info("XXX: Handling S_TA_ENDED for attemptId:"
+ + sEvent.getAttemptID() + " with state: " + sEvent.getState());
+ switch (sEvent.getState()) {
+ case FAILED:
+ case KILLED:
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+ .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+ break;
+ case SUCCEEDED:
+ // No re-use in MRApp. Stop the container.
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+ .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+ break;
+ default:
+ throw new YarnException("Unexpected state: " + sEvent.getState());
+ }
case S_CONTAINERS_ALLOCATED:
break;
case S_CONTAINER_COMPLETED:
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java Mon Oct 15 21:09:59 2012
@@ -195,7 +195,7 @@ public class TestFail {
// TODO XXX: This may not be a valid test.
app.getDispatcher().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_TERMINATED));
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED));
app.waitForState(job, JobState.FAILED);
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java Mon Oct 15 21:09:59 2012
@@ -58,7 +58,8 @@ public class TestMapReduceChildJVM {
" -Dhadoop.root.logger=INFO,CLA" +
" org.apache.hadoop.mapred.YarnChild2 127.0.0.1" +
" 54321" +
- " attempt_0_0000_m_000000_0" +
+ " job_0_0000" +
+ " MAP" +
" 0" +
" 1><LOG_DIR>/stdout" +
" 2><LOG_DIR>/stderr ]", app.myCommandLine);
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java?rev=1398523&r1=1398522&r2=1398523&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java Mon Oct 15 21:09:59 2012
@@ -45,6 +45,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -815,6 +816,12 @@ public class TestRMContainerAllocator {
super.handleEvent(event);
}
+ @Override
+ protected boolean shouldProfileTaskAttempt(JobConf conf,
+ org.apache.hadoop.mapred.Task remoteTask) {
+ return false;
+ }
+
static Priority getMapPriority() {
return BuilderUtils.newPriority(PRIORITY_MAP.getPriority());
}
@@ -845,6 +852,12 @@ public class TestRMContainerAllocator {
int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
recalculatedReduceSchedule = true;
}
+
+ @Override
+ protected boolean shouldProfileTaskAttempt(JobConf conf,
+ org.apache.hadoop.mapred.Task remoteTask) {
+ return false;
+ }
}
class TrackingAMContainerRequestor extends RMContainerRequestor {
@@ -928,7 +941,7 @@ public class TestRMContainerAllocator {
@Override
public void handle(Event event) {
- if (event.getType() == AMContainerEventType.C_START_REQUEST) {
+ if (event.getType() == AMContainerEventType.C_LAUNCH_REQUEST) {
launchRequests.add((AMContainerLaunchRequestEvent)event);
} else if (event.getType() == AMContainerEventType.C_ASSIGN_TA) {
assignEvents.add((AMContainerAssignTAEvent)event);
@@ -960,6 +973,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getID()).thenReturn(jobId);
when(mockJob.getProgress()).thenReturn(0.0f);
+ when(mockJob.getConf()).thenReturn(conf);
Clock clock = new ControlledClock(new SystemClock());
|