Author: vinodkv
Date: Fri Oct 19 06:01:05 2012
New Revision: 1399978
URL: http://svn.apache.org/viewvc?rev=1399978&view=rev
Log:
MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol for Job, Task and TaskAttempt. Contributed by Siddarth Seth.
Added:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1399978&r1=1399977&r2=1399978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Oct 19 06:01:05 2012
@@ -8,6 +8,9 @@ Release 0.23.5 - UNRELEASED
IMPROVEMENTS
+ MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol
+ for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1399978&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Fri Oct 19 06:01:05 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+public enum JobStateInternal {
+ NEW,
+ INITED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ KILL_WAIT,
+ KILLED,
+ ERROR
+}
Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java?rev=1399978&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java Fri Oct 19 06:01:05 2012
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+* TaskAttemptImpl internal state machine states.
+*
+*/
+@Private
+public enum TaskAttemptStateInternal {
+ NEW,
+ UNASSIGNED,
+ ASSIGNED,
+ RUNNING,
+ COMMIT_PENDING,
+ SUCCESS_CONTAINER_CLEANUP,
+ SUCCEEDED,
+ FAIL_CONTAINER_CLEANUP,
+ FAIL_TASK_CLEANUP,
+ FAILED,
+ KILL_CONTAINER_CLEANUP,
+ KILL_TASK_CLEANUP,
+ KILLED,
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java?rev=1399978&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java Fri Oct 19 06:01:05 2012
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+public enum TaskStateInternal {
+ NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1399978&r1=1399977&r2=1399978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Oct 19 06:01:05 2012
@@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -196,150 +197,150 @@ public class JobImpl implements org.apac
new CounterUpdateTransition();
protected static final
- StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
+ StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
stateMachineFactory
- = new StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
- (JobState.NEW)
+ = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
+ (JobStateInternal.NEW)
// Transitions from NEW state
- .addTransition(JobState.NEW, JobState.NEW,
+ .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(JobState.NEW, JobState.NEW,
+ .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
- (JobState.NEW,
- EnumSet.of(JobState.INITED, JobState.FAILED),
+ (JobStateInternal.NEW,
+ EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
JobEventType.JOB_INIT,
new InitTransition())
- .addTransition(JobState.NEW, JobState.KILLED,
+ .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
JobEventType.JOB_KILL,
new KillNewJobTransition())
- .addTransition(JobState.NEW, JobState.ERROR,
+ .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from INITED state
- .addTransition(JobState.INITED, JobState.INITED,
+ .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(JobState.INITED, JobState.INITED,
+ .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
- .addTransition(JobState.INITED, JobState.RUNNING,
+ .addTransition(JobStateInternal.INITED, JobStateInternal.RUNNING,
JobEventType.JOB_START,
new StartTransition())
- .addTransition(JobState.INITED, JobState.KILLED,
+ .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED,
JobEventType.JOB_KILL,
new KillInitedJobTransition())
- .addTransition(JobState.INITED, JobState.ERROR,
+ .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from RUNNING state
- .addTransition(JobState.RUNNING, JobState.RUNNING,
+ .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition
- (JobState.RUNNING,
- EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+ (JobStateInternal.RUNNING,
+ EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
JobEventType.JOB_TASK_COMPLETED,
new TaskCompletedTransition())
.addTransition
- (JobState.RUNNING,
- EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+ (JobStateInternal.RUNNING,
+ EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
JobEventType.JOB_COMPLETED,
new JobNoTasksCompletedTransition())
- .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
+ .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT,
JobEventType.JOB_KILL, new KillTasksTransition())
- .addTransition(JobState.RUNNING, JobState.RUNNING,
+ .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
new MapTaskRescheduledTransition())
- .addTransition(JobState.RUNNING, JobState.RUNNING,
+ .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(JobState.RUNNING, JobState.RUNNING,
+ .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
- .addTransition(JobState.RUNNING, JobState.RUNNING,
+ .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
new TaskAttemptFetchFailureTransition())
.addTransition(
- JobState.RUNNING,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobStateInternal.RUNNING,
+ JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from KILL_WAIT state.
.addTransition
- (JobState.KILL_WAIT,
- EnumSet.of(JobState.KILL_WAIT, JobState.KILLED),
+ (JobStateInternal.KILL_WAIT,
+ EnumSet.of(JobStateInternal.KILL_WAIT, JobStateInternal.KILLED),
JobEventType.JOB_TASK_COMPLETED,
new KillWaitTaskCompletedTransition())
- .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+ .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
- .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+ .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+ .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
- JobState.KILL_WAIT,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobStateInternal.KILL_WAIT,
+ JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
- .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+ .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from SUCCEEDED state
- .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+ .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+ .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
- JobState.SUCCEEDED,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobStateInternal.SUCCEEDED,
+ JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
- .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+ .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from FAILED state
- .addTransition(JobState.FAILED, JobState.FAILED,
+ .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(JobState.FAILED, JobState.FAILED,
+ .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
- JobState.FAILED,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobStateInternal.FAILED,
+ JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
- .addTransition(JobState.FAILED, JobState.FAILED,
+ .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from KILLED state
- .addTransition(JobState.KILLED, JobState.KILLED,
+ .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(JobState.KILLED, JobState.KILLED,
+ .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
- JobState.KILLED,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobStateInternal.KILLED,
+ JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
- .addTransition(JobState.KILLED, JobState.KILLED,
+ .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
- JobState.ERROR,
- JobState.ERROR,
+ JobStateInternal.ERROR,
+ JobStateInternal.ERROR,
EnumSet.of(JobEventType.JOB_INIT,
JobEventType.JOB_KILL,
JobEventType.JOB_TASK_COMPLETED,
@@ -348,12 +349,12 @@ public class JobImpl implements org.apac
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.INTERNAL_ERROR))
- .addTransition(JobState.ERROR, JobState.ERROR,
+ .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// create the topology tables
.installTopology();
- private final StateMachine<JobState, JobEventType, JobEvent> stateMachine;
+ private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
//changing fields while the job is running
private int numMapTasks;
@@ -418,7 +419,7 @@ public class JobImpl implements org.apac
stateMachine = stateMachineFactory.make(this);
}
- protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+ protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
return stateMachine;
}
@@ -492,9 +493,9 @@ public class JobImpl implements org.apac
readLock.lock();
try {
- JobState state = getState();
- if (state == JobState.ERROR || state == JobState.FAILED
- || state == JobState.KILLED || state == JobState.SUCCEEDED) {
+ JobStateInternal state = getInternalState();
+ if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED
+ || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) {
this.mayBeConstructFinalFullCounters();
return fullCounters;
}
@@ -559,7 +560,7 @@ public class JobImpl implements org.apac
diagsb.append(s).append("\n");
}
- if (getState() == JobState.NEW) {
+ if (getInternalState() == JobStateInternal.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
@@ -646,7 +647,7 @@ public class JobImpl implements org.apac
public JobState getState() {
readLock.lock();
try {
- return getStateMachine().getCurrentState();
+ return getExternalState(getStateMachine().getCurrentState());
} finally {
readLock.unlock();
}
@@ -667,7 +668,7 @@ public class JobImpl implements org.apac
LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
try {
writeLock.lock();
- JobState oldState = getState();
+ JobStateInternal oldState = getInternalState();
try {
getStateMachine().doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
@@ -678,9 +679,9 @@ public class JobImpl implements org.apac
JobEventType.INTERNAL_ERROR));
}
//notify the eventhandler of state change
- if (oldState != getState()) {
+ if (oldState != getInternalState()) {
LOG.info(jobId + "Job Transitioned from " + oldState + " to "
- + getState());
+ + getInternalState());
}
}
@@ -689,6 +690,24 @@ public class JobImpl implements org.apac
}
}
+ @Private
+ public JobStateInternal getInternalState() {
+ readLock.lock();
+ try {
+ return getStateMachine().getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private static JobState getExternalState(JobStateInternal smState) {
+ if (smState == JobStateInternal.KILL_WAIT) {
+ return JobState.KILLED;
+ } else {
+ return JobState.valueOf(smState.name());
+ }
+ }
+
//helpful in testing
protected void addTask(Task task) {
synchronized (tasksSyncHandle) {
@@ -729,7 +748,7 @@ public class JobImpl implements org.apac
return FileSystem.get(conf);
}
- static JobState checkJobCompleteSuccess(JobImpl job) {
+ static JobStateInternal checkJobCompleteSuccess(JobImpl job) {
// check for Job success
if (job.completedTaskCount == job.tasks.size()) {
try {
@@ -739,16 +758,16 @@ public class JobImpl implements org.apac
LOG.error("Could not do commit for Job", e);
job.addDiagnostic("Job commit failed: " + e.getMessage());
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
- return job.finished(JobState.FAILED);
+ return job.finished(JobStateInternal.FAILED);
}
job.logJobHistoryFinishedEvent();
- return job.finished(JobState.SUCCEEDED);
+ return job.finished(JobStateInternal.SUCCEEDED);
}
return null;
}
- JobState finished(JobState finalState) {
- if (getState() == JobState.RUNNING) {
+ JobStateInternal finished(JobStateInternal finalState) {
+ if (getInternalState() == JobStateInternal.RUNNING) {
metrics.endRunningJob(this);
}
if (finishTime == 0) setFinishTime();
@@ -943,7 +962,7 @@ public class JobImpl implements org.apac
*/
public static class InitTransition
- implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
+ implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
/**
* Note that this transition method is called directly (and synchronously)
@@ -953,7 +972,7 @@ public class JobImpl implements org.apac
* way; MR version is).
*/
@Override
- public JobState transition(JobImpl job, JobEvent event) {
+ public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
try {
@@ -1019,7 +1038,7 @@ public class JobImpl implements org.apac
createReduceTasks(job);
job.metrics.endPreparingJob(job);
- return JobState.INITED;
+ return JobStateInternal.INITED;
//TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
} catch (IOException e) {
@@ -1028,7 +1047,7 @@ public class JobImpl implements org.apac
+ StringUtils.stringifyException(e));
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
job.metrics.endPreparingJob(job);
- return job.finished(JobState.FAILED);
+ return job.finished(JobStateInternal.FAILED);
}
}
@@ -1236,9 +1255,9 @@ public class JobImpl implements org.apac
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0,
- JobState.KILLED.toString());
+ JobStateInternal.KILLED.toString());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
- job.finished(JobState.KILLED);
+ job.finished(JobStateInternal.KILLED);
}
}
@@ -1248,7 +1267,7 @@ public class JobImpl implements org.apac
public void transition(JobImpl job, JobEvent event) {
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
job.addDiagnostic("Job received Kill in INITED state.");
- job.finished(JobState.KILLED);
+ job.finished(JobStateInternal.KILLED);
}
}
@@ -1329,10 +1348,10 @@ public class JobImpl implements org.apac
}
private static class TaskCompletedTransition implements
- MultipleArcTransition<JobImpl, JobEvent, JobState> {
+ MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@Override
- public JobState transition(JobImpl job, JobEvent event) {
+ public JobStateInternal transition(JobImpl job, JobEvent event) {
job.completedTaskCount++;
LOG.info("Num completed Tasks: " + job.completedTaskCount);
JobTaskEvent taskEvent = (JobTaskEvent) event;
@@ -1348,7 +1367,7 @@ public class JobImpl implements org.apac
return checkJobForCompletion(job);
}
- protected JobState checkJobForCompletion(JobImpl job) {
+ protected JobStateInternal checkJobForCompletion(JobImpl job) {
//check for Job failure
if (job.failedMapTaskCount*100 >
job.allowedMapFailuresPercent*job.numMapTasks ||
@@ -1362,16 +1381,16 @@ public class JobImpl implements org.apac
LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg);
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
- return job.finished(JobState.FAILED);
+ return job.finished(JobStateInternal.FAILED);
}
- JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+ JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
if (jobCompleteSuccess != null) {
return jobCompleteSuccess;
}
//return the current state, Job not finished yet
- return job.getState();
+ return job.getInternalState();
}
private void taskSucceeded(JobImpl job, Task task) {
@@ -1405,17 +1424,17 @@ public class JobImpl implements org.apac
// Transition class for handling jobs with no tasks
static class JobNoTasksCompletedTransition implements
- MultipleArcTransition<JobImpl, JobEvent, JobState> {
+ MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@Override
- public JobState transition(JobImpl job, JobEvent event) {
- JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+ public JobStateInternal transition(JobImpl job, JobEvent event) {
+ JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
if (jobCompleteSuccess != null) {
return jobCompleteSuccess;
}
// Return the current state, Job not finished yet
- return job.getState();
+ return job.getInternalState();
}
}
@@ -1432,14 +1451,14 @@ public class JobImpl implements org.apac
private static class KillWaitTaskCompletedTransition extends
TaskCompletedTransition {
@Override
- protected JobState checkJobForCompletion(JobImpl job) {
+ protected JobStateInternal checkJobForCompletion(JobImpl job) {
if (job.completedTaskCount == job.tasks.size()) {
job.setFinishTime();
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
- return job.finished(JobState.KILLED);
+ return job.finished(JobStateInternal.KILLED);
}
//return the current state, Job not finished yet
- return job.getState();
+ return job.getInternalState();
}
}
@@ -1478,9 +1497,9 @@ public class JobImpl implements org.apac
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0,
- JobState.ERROR.toString());
+ JobStateInternal.ERROR.toString());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
- job.finished(JobState.ERROR);
+ job.finished(JobStateInternal.ERROR);
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1399978&r1=1399977&r2=1399978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Oct 19 06:01:05 2012
@@ -75,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -128,6 +129,8 @@ import org.apache.hadoop.yarn.util.Build
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Implementation of TaskAttempt interface.
*/
@@ -178,149 +181,149 @@ public abstract class TaskAttemptImpl im
= new DiagnosticInformationUpdater();
private static final StateMachineFactory
- <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory
= new StateMachineFactory
- <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- (TaskAttemptState.NEW)
+ <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
+ (TaskAttemptStateInternal.NEW)
// Transitions from the NEW state.
- .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
- .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
- .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED,
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new KilledTransition())
- .addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED,
+ .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
// Transitions from the UNASSIGNED state.
- .addTransition(TaskAttemptState.UNASSIGNED,
- TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
+ .addTransition(TaskAttemptStateInternal.UNASSIGNED,
+ TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
new ContainerAssignedTransition())
- .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED,
+ .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
- TaskAttemptState.KILLED, true))
- .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.FAILED,
+ TaskAttemptStateInternal.KILLED, true))
+ .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
- TaskAttemptState.FAILED, true))
+ TaskAttemptStateInternal.FAILED, true))
// Transitions from the ASSIGNED state.
- .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING,
+ .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
new LaunchedContainerTransition())
- .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.ASSIGNED,
+ .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.ASSIGNED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
+ .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
- new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
- .addTransition(TaskAttemptState.ASSIGNED,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
+ .addTransition(TaskAttemptStateInternal.ASSIGNED,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION)
// ^ If RM kills the container due to expiry, preemption etc.
- .addTransition(TaskAttemptState.ASSIGNED,
- TaskAttemptState.KILL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.ASSIGNED,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
- .addTransition(TaskAttemptState.ASSIGNED,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.ASSIGNED,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
// Transitions from RUNNING state.
- .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
- .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// If no commit is required, task directly goes to success
- .addTransition(TaskAttemptState.RUNNING,
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
// If commit is required, task goes through commit pending state.
- .addTransition(TaskAttemptState.RUNNING,
- TaskAttemptState.COMMIT_PENDING,
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
// Failure handling while RUNNING
- .addTransition(TaskAttemptState.RUNNING,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
//for handling container exit without sending the done or fail msg
- .addTransition(TaskAttemptState.RUNNING,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION)
// Timeout handling while RUNNING
- .addTransition(TaskAttemptState.RUNNING,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down
- .addTransition(TaskAttemptState.RUNNING,
- TaskAttemptState.KILLED,
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
// Kill handling
- .addTransition(TaskAttemptState.RUNNING,
- TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
// Transitions from COMMIT_PENDING state
- .addTransition(TaskAttemptState.COMMIT_PENDING,
- TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
new StatusUpdater())
- .addTransition(TaskAttemptState.COMMIT_PENDING,
- TaskAttemptState.COMMIT_PENDING,
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptState.COMMIT_PENDING,
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
- .addTransition(TaskAttemptState.COMMIT_PENDING,
- TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down
- .addTransition(TaskAttemptState.COMMIT_PENDING,
- TaskAttemptState.KILLED,
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
- .addTransition(TaskAttemptState.COMMIT_PENDING,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
- .addTransition(TaskAttemptState.COMMIT_PENDING,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION)
- .addTransition(TaskAttemptState.COMMIT_PENDING,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
// Transitions from SUCCESS_CONTAINER_CLEANUP state
// kill and cleanup the container
- .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
- TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
+ .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
new SucceededTransition())
.addTransition(
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
- .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
- TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
// Transitions from FAIL_CONTAINER_CLEANUP state.
- .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
- TaskAttemptState.FAIL_TASK_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
- .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
- .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
- TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
@@ -333,17 +336,17 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_TIMED_OUT))
// Transitions from KILL_CONTAINER_CLEANUP
- .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
- TaskAttemptState.KILL_TASK_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
- .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
- TaskAttemptState.KILL_CONTAINER_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
.addTransition(
- TaskAttemptState.KILL_CONTAINER_CLEANUP,
- TaskAttemptState.KILL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
@@ -356,16 +359,16 @@ public abstract class TaskAttemptImpl im
// Transitions from FAIL_TASK_CLEANUP
// run the task cleanup
- .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
- TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
+ .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+ TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
new FailedTransition())
- .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
- TaskAttemptState.FAIL_TASK_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+ TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
- .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
- TaskAttemptState.FAIL_TASK_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+ TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
@@ -378,16 +381,16 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from KILL_TASK_CLEANUP
- .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
- TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
+ .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+ TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
new KilledTransition())
- .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
- TaskAttemptState.KILL_TASK_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+ TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
- .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
- TaskAttemptState.KILL_TASK_CLEANUP,
+ .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+ TaskAttemptStateInternal.KILL_TASK_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
@@ -400,28 +403,28 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from SUCCEEDED
- .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
- TaskAttemptState.FAILED,
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, //only possible for map attempts
+ TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
new TooManyFetchFailureTransition())
.addTransition(
- TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
+ TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for SUCCEEDED state
- .addTransition(TaskAttemptState.SUCCEEDED,
- TaskAttemptState.SUCCEEDED,
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
// Transitions from FAILED state
- .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for FAILED state
- .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
@@ -436,11 +439,11 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE))
// Transitions from KILLED state
- .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for KILLED state
- .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
@@ -457,7 +460,7 @@ public abstract class TaskAttemptImpl im
.installTopology();
private final StateMachine
- <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachine;
private ContainerId containerID;
@@ -855,9 +858,9 @@ public abstract class TaskAttemptImpl im
readLock.lock();
try {
// TODO: Use stateMachine level method?
- return (getState() == TaskAttemptState.SUCCEEDED ||
- getState() == TaskAttemptState.FAILED ||
- getState() == TaskAttemptState.KILLED);
+ return (getInternalState() == TaskAttemptStateInternal.SUCCEEDED ||
+ getInternalState() == TaskAttemptStateInternal.FAILED ||
+ getInternalState() == TaskAttemptStateInternal.KILLED);
} finally {
readLock.unlock();
}
@@ -934,7 +937,7 @@ public abstract class TaskAttemptImpl im
public TaskAttemptState getState() {
readLock.lock();
try {
- return stateMachine.getCurrentState();
+ return getExternalState(stateMachine.getCurrentState());
} finally {
readLock.unlock();
}
@@ -949,7 +952,7 @@ public abstract class TaskAttemptImpl im
}
writeLock.lock();
try {
- final TaskAttemptState oldState = getState();
+ final TaskAttemptStateInternal oldState = getInternalState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
@@ -961,16 +964,58 @@ public abstract class TaskAttemptImpl im
eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
- if (oldState != getState()) {
+ if (oldState != getInternalState()) {
LOG.info(attemptId + " TaskAttempt Transitioned from "
+ oldState + " to "
- + getState());
+ + getInternalState());
}
} finally {
writeLock.unlock();
}
}
+ @VisibleForTesting
+ public TaskAttemptStateInternal getInternalState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private static TaskAttemptState getExternalState(
+ TaskAttemptStateInternal smState) {
+ switch (smState) {
+ case ASSIGNED:
+ case UNASSIGNED:
+ return TaskAttemptState.STARTING;
+ case COMMIT_PENDING:
+ return TaskAttemptState.COMMIT_PENDING;
+ case FAILED:
+ return TaskAttemptState.FAILED;
+ case KILLED:
+ return TaskAttemptState.KILLED;
+ // All CLEANUP states considered as RUNNING since events have not gone out
+ // to the Task yet. May be possible to consider them as a Finished state.
+ case FAIL_CONTAINER_CLEANUP:
+ case FAIL_TASK_CLEANUP:
+ case KILL_CONTAINER_CLEANUP:
+ case KILL_TASK_CLEANUP:
+ case SUCCESS_CONTAINER_CLEANUP:
+ case RUNNING:
+ return TaskAttemptState.RUNNING;
+ case NEW:
+ return TaskAttemptState.NEW;
+ case SUCCEEDED:
+ return TaskAttemptState.SUCCEEDED;
+ default:
+ throw new YarnException("Attempt to convert invalid "
+ + "stateMachineTaskAttemptState to externalTaskAttemptState: "
+ + smState);
+ }
+ }
+
//always called in write lock
private void setFinishTime() {
//set the finish time only if launch time is set
@@ -1035,7 +1080,7 @@ public abstract class TaskAttemptImpl im
private static
TaskAttemptUnsuccessfulCompletionEvent
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
- TaskAttemptState attemptState) {
+ TaskAttemptStateInternal attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
@@ -1216,10 +1261,10 @@ public abstract class TaskAttemptImpl im
private static class DeallocateContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
- private final TaskAttemptState finalState;
+ private final TaskAttemptStateInternal finalState;
private final boolean withdrawsContainerRequest;
DeallocateContainerTransition
- (TaskAttemptState finalState, boolean withdrawsContainerRequest) {
+ (TaskAttemptStateInternal finalState, boolean withdrawsContainerRequest) {
this.finalState = finalState;
this.withdrawsContainerRequest = withdrawsContainerRequest;
}
@@ -1257,10 +1302,10 @@ public abstract class TaskAttemptImpl im
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
finalState);
- if(finalState == TaskAttemptState.FAILED) {
+ if(finalState == TaskAttemptStateInternal.FAILED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
- } else if(finalState == TaskAttemptState.KILLED) {
+ } else if(finalState == TaskAttemptStateInternal.KILLED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt));
}
@@ -1374,7 +1419,7 @@ public abstract class TaskAttemptImpl im
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
slotMillis);
taskAttempt.eventHandler.handle(jce);
- taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
+ taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_SUCCEEDED));
@@ -1397,10 +1442,10 @@ public abstract class TaskAttemptImpl im
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
- TaskAttemptState.FAILED);
+ TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
- // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
+ // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
// handling failed map/reduce events.
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
@@ -1412,7 +1457,7 @@ public abstract class TaskAttemptImpl im
}
@SuppressWarnings({ "unchecked" })
- private void logAttemptFinishedEvent(TaskAttemptState state) {
+ private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started.
if (getLaunchTime() == 0) return;
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
@@ -1466,7 +1511,7 @@ public abstract class TaskAttemptImpl im
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
- TaskAttemptState.FAILED);
+ TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
@@ -1492,14 +1537,14 @@ public abstract class TaskAttemptImpl im
.handle(createJobCounterUpdateEventTAKilled(taskAttempt));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
- TaskAttemptState.KILLED);
+ TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
-// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure.
+// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1399978&r1=1399977&r2=1399978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Oct 19 06:01:05 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -83,6 +84,8 @@ import org.apache.hadoop.yarn.state.Sing
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Implementation of Task interface.
*/
@@ -124,62 +127,62 @@ public abstract class TaskImpl implement
KILL_TRANSITION = new KillTransition();
private static final StateMachineFactory
- <TaskImpl, TaskState, TaskEventType, TaskEvent>
+ <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
stateMachineFactory
- = new StateMachineFactory<TaskImpl, TaskState, TaskEventType, TaskEvent>
- (TaskState.NEW)
+ = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
+ (TaskStateInternal.NEW)
// define the state machine of Task
// Transitions from NEW state
- .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+ .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
- .addTransition(TaskState.NEW, TaskState.KILLED,
+ .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
TaskEventType.T_KILL, new KillNewTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
- .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+ .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
- .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
+ .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
TaskEventType.T_KILL, KILL_TRANSITION)
- .addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED,
+ .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
- .addTransition(TaskState.SCHEDULED,
- EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
+ .addTransition(TaskStateInternal.SCHEDULED,
+ EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition())
// Transitions from RUNNING state
- .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
- .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
new AttemptCommitPendingTransition())
- .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
- .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
new AttemptSucceededTransition())
- .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_KILLED,
ATTEMPT_KILLED_TRANSITION)
- .addTransition(TaskState.RUNNING,
- EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+ .addTransition(TaskStateInternal.RUNNING,
+ EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition())
- .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
TaskEventType.T_KILL, KILL_TRANSITION)
// Transitions from KILL_WAIT state
- .addTransition(TaskState.KILL_WAIT,
- EnumSet.of(TaskState.KILL_WAIT, TaskState.KILLED),
+ .addTransition(TaskStateInternal.KILL_WAIT,
+ EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_KILLED,
new KillWaitAttemptKilledTransition())
// Ignore-able transitions.
.addTransition(
- TaskState.KILL_WAIT,
- TaskState.KILL_WAIT,
+ TaskStateInternal.KILL_WAIT,
+ TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
@@ -188,31 +191,31 @@ public abstract class TaskImpl implement
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state
- .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
- EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
+ .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
+ EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
// Ignore-able transitions.
.addTransition(
- TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+ TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_KILLED))
// Transitions from FAILED state
- .addTransition(TaskState.FAILED, TaskState.FAILED,
+ .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from KILLED state
- .addTransition(TaskState.KILLED, TaskState.KILLED,
+ .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// create the topology tables
.installTopology();
- private final StateMachine<TaskState, TaskEventType, TaskEvent>
+ private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
stateMachine;
// By default, the next TaskAttempt number is zero. Changes during recovery
@@ -243,7 +246,12 @@ public abstract class TaskImpl implement
@Override
public TaskState getState() {
- return stateMachine.getCurrentState();
+ readLock.lock();
+ try {
+ return getExternalState(getInternalState());
+ } finally {
+ readLock.unlock();
+ }
}
public TaskImpl(JobId jobId, TaskType taskType, int partition,
@@ -350,9 +358,9 @@ public abstract class TaskImpl implement
readLock.lock();
try {
// TODO: Use stateMachine level method?
- return (getState() == TaskState.SUCCEEDED ||
- getState() == TaskState.FAILED ||
- getState() == TaskState.KILLED);
+ return (getInternalState() == TaskStateInternal.SUCCEEDED ||
+ getInternalState() == TaskStateInternal.FAILED ||
+ getInternalState() == TaskStateInternal.KILLED);
} finally {
readLock.unlock();
}
@@ -427,6 +435,24 @@ public abstract class TaskImpl implement
}
}
+ @VisibleForTesting
+ public TaskStateInternal getInternalState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private static TaskState getExternalState(TaskStateInternal smState) {
+ if (smState == TaskStateInternal.KILL_WAIT) {
+ return TaskState.KILLED;
+ } else {
+ return TaskState.valueOf(smState.name());
+ }
+ }
+
//this is always called in read/write lock
private long getLaunchTime() {
long taskLaunchTime = 0;
@@ -478,8 +504,8 @@ public abstract class TaskImpl implement
return finishTime;
}
- private TaskState finished(TaskState finalState) {
- if (getState() == TaskState.RUNNING) {
+ private TaskStateInternal finished(TaskStateInternal finalState) {
+ if (getInternalState() == TaskStateInternal.RUNNING) {
metrics.endRunningTask(this);
}
return finalState;
@@ -494,11 +520,7 @@ public abstract class TaskImpl implement
switch (at.getState()) {
// ignore all failed task attempts
- case FAIL_CONTAINER_CLEANUP:
- case FAIL_TASK_CLEANUP:
- case FAILED:
- case KILL_CONTAINER_CLEANUP:
- case KILL_TASK_CLEANUP:
+ case FAILED:
case KILLED:
continue;
}
@@ -599,7 +621,7 @@ public abstract class TaskImpl implement
}
try {
writeLock.lock();
- TaskState oldState = getState();
+ TaskStateInternal oldState = getInternalState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
@@ -607,9 +629,9 @@ public abstract class TaskImpl implement
+ this.taskId, e);
internalError(event.getType());
}
- if (oldState != getState()) {
+ if (oldState != getInternalState()) {
LOG.info(taskId + " Task Transitioned from " + oldState + " to "
- + getState());
+ + getInternalState());
}
} finally {
@@ -653,7 +675,7 @@ public abstract class TaskImpl implement
}
}
- private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) {
+ private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
TypeConverter.fromYarn(task.successfulAttempt),
@@ -664,7 +686,7 @@ public abstract class TaskImpl implement
return tfe;
}
- private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskState taskState, TaskAttemptId taId) {
+ private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
StringBuilder errorSb = new StringBuilder();
if (diag != null) {
for (String d : diag) {
@@ -763,7 +785,7 @@ public abstract class TaskImpl implement
// issue kill to all other attempts
if (task.historyTaskStartGenerated) {
TaskFinishedEvent tfe = createTaskFinishedEvent(task,
- TaskState.SUCCEEDED);
+ TaskStateInternal.SUCCEEDED);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
tfe));
}
@@ -779,7 +801,7 @@ public abstract class TaskImpl implement
TaskAttemptEventType.TA_KILL));
}
}
- task.finished(TaskState.SUCCEEDED);
+ task.finished(TaskStateInternal.SUCCEEDED);
}
}
@@ -799,12 +821,12 @@ public abstract class TaskImpl implement
private static class KillWaitAttemptKilledTransition implements
- MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
- protected TaskState finalState = TaskState.KILLED;
+ protected TaskStateInternal finalState = TaskStateInternal.KILLED;
@Override
- public TaskState transition(TaskImpl task, TaskEvent event) {
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
TaskAttemptCompletionEventStatus.KILLED);
@@ -821,18 +843,18 @@ public abstract class TaskImpl implement
}
task.eventHandler.handle(
- new JobTaskEvent(task.taskId, finalState));
+ new JobTaskEvent(task.taskId, getExternalState(finalState)));
return finalState;
}
- return task.getState();
+ return task.getInternalState();
}
}
private static class AttemptFailedTransition implements
- MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
- public TaskState transition(TaskImpl task, TaskEvent event) {
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++;
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
@@ -863,7 +885,7 @@ public abstract class TaskImpl implement
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
- TaskState.FAILED, taId);
+ TaskStateInternal.FAILED, taId);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent));
} else {
@@ -872,13 +894,13 @@ public abstract class TaskImpl implement
}
task.eventHandler.handle(
new JobTaskEvent(task.taskId, TaskState.FAILED));
- return task.finished(TaskState.FAILED);
+ return task.finished(TaskStateInternal.FAILED);
}
return getDefaultState(task);
}
- protected TaskState getDefaultState(Task task) {
- return task.getState();
+ protected TaskStateInternal getDefaultState(TaskImpl task) {
+ return task.getInternalState();
}
protected void unSucceed(TaskImpl task) {
@@ -892,14 +914,14 @@ public abstract class TaskImpl implement
extends AttemptFailedTransition {
@Override
- public TaskState transition(TaskImpl task, TaskEvent event) {
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
if (event instanceof TaskTAttemptEvent) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
- if (task.getState() == TaskState.SUCCEEDED &&
+ if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
- return TaskState.SUCCEEDED;
+ return TaskStateInternal.SUCCEEDED;
}
}
@@ -922,8 +944,8 @@ public abstract class TaskImpl implement
}
@Override
- protected TaskState getDefaultState(Task task) {
- return TaskState.SCHEDULED;
+ protected TaskStateInternal getDefaultState(TaskImpl task) {
+ return TaskStateInternal.SCHEDULED;
}
}
@@ -934,7 +956,7 @@ public abstract class TaskImpl implement
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
- TaskState.KILLED, null); // TODO Verify failedAttemptId is null
+ TaskStateInternal.KILLED, null); // TODO Verify failedAttemptId is null
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent));
}else {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1399978&r1=1399977&r2=1399978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Fri Oct 19 06:01:05 2012
@@ -44,9 +44,9 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -365,7 +365,7 @@ public class DefaultSpeculator extends A
for (TaskAttempt taskAttempt : attempts.values()) {
if (taskAttempt.getState() == TaskAttemptState.RUNNING
- || taskAttempt.getState() == TaskAttemptState.ASSIGNED) {
+ || taskAttempt.getState() == TaskAttemptState.STARTING) {
if (++numberRunningAttempts > 1) {
return ALREADY_SPECULATING;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1399978&r1=1399977&r2=1399978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Oct 19 06:01:05 2012
@@ -50,8 +50,10 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@@ -60,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -240,6 +243,24 @@ public class MRApp extends MRAppMaster {
return job;
}
+ public void waitForInternalState(TaskAttemptImpl attempt,
+ TaskAttemptStateInternal finalState) throws Exception {
+ int timeoutSecs = 0;
+ TaskAttemptReport report = attempt.getReport();
+ TaskAttemptStateInternal iState = attempt.getInternalState();
+ while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+ System.out.println("TaskAttempt Internal State is : " + iState
+ + " Waiting for Internal state : " + finalState + " progress : "
+ + report.getProgress());
+ Thread.sleep(500);
+ report = attempt.getReport();
+ iState = attempt.getInternalState();
+ }
+ System.out.println("TaskAttempt Internal State is : " + iState);
+ Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
+ finalState, iState);
+ }
+
public void waitForState(TaskAttempt attempt,
TaskAttemptState finalState) throws Exception {
int timeoutSecs = 0;
@@ -501,18 +522,18 @@ public class MRApp extends MRAppMaster {
//override the init transition
private final TestInitTransition initTransition = new TestInitTransition(
maps, reduces);
- StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
- = stateMachineFactory.addTransition(JobState.NEW,
- EnumSet.of(JobState.INITED, JobState.FAILED),
+ StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+ = stateMachineFactory.addTransition(JobStateInternal.NEW,
+ EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
JobEventType.JOB_INIT,
// This is abusive.
initTransition);
- private final StateMachine<JobState, JobEventType, JobEvent>
+ private final StateMachine<JobStateInternal, JobEventType, JobEvent>
localStateMachine;
@Override
- protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+ protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
return localStateMachine;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1399978&r1=1399977&r2=1399978&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Fri Oct 19 06:01:05 2012
@@ -36,8 +36,10 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
@@ -190,7 +192,8 @@ public class TestFail {
Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
.size());
TaskAttempt attempt = attempts.values().iterator().next();
- app.waitForState(attempt, TaskAttemptState.ASSIGNED);
+ app.waitForInternalState((TaskAttemptImpl) attempt,
+ TaskAttemptStateInternal.ASSIGNED);
app.getDispatcher().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
|