Author: tgraves
Date: Wed Feb 6 22:22:22 2013
New Revision: 1443243
URL: http://svn.apache.org/viewvc?rev=1443243&view=rev
Log:
MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and UNASSIGNED states.
(Mayank Bansal via tgraves)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1443243&r1=1443242&r2=1443243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Feb 6 22:22:22
2013
@@ -47,6 +47,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4953. HadoopPipes misuses fprintf. (Andy Isaacson via tgraves)
+ MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and
+ UNASSIGNED states. (Mayank Bansal via tgraves)
+
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1443243&r1=1443242&r2=1443243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
Wed Feb 6 22:22:22 2013
@@ -196,6 +196,10 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_KILL, new KilledTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
+ .addTransition(TaskAttemptStateInternal.NEW,
+ TaskAttemptStateInternal.NEW,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
@@ -207,6 +211,10 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
TaskAttemptStateInternal.FAILED, true))
+ .addTransition(TaskAttemptStateInternal.UNASSIGNED,
+ TaskAttemptStateInternal.UNASSIGNED,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the ASSIGNED state.
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1443243&r1=1443242&r2=1443243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
Wed Feb 6 22:22:22 2013
@@ -630,6 +630,106 @@ public class TestTaskAttempt{
eventHandler.internalError);
}
+ @Test
+ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener, mock(Token.class),
+ new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+ "Task got killed"));
+ assertFalse(
+ "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
+ eventHandler.internalError);
+ }
+
+ @Test
+ public void testAppDiognosticEventOnNewTask() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener,
+ mock(Token.class), new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+ "Task got killed"));
+ assertFalse(
+ "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
+ eventHandler.internalError);
+ }
+
+
public static class MockEventHandler implements EventHandler {
public boolean internalError;
|