Author: cnauroth
Date: Tue Jan 7 01:07:33 2014
New Revision: 1556097
URL: http://svn.apache.org/r1556097
Log:
Merge trunk to HDFS-4685.
Added:
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java
- copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
- copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
- copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
- copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
- copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
Modified:
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1553225-1556096
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt Tue Jan 7 01:07:33 2014
@@ -77,6 +77,9 @@ Trunk (Unreleased)
MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
from YARN. (Carlo Curino via cdouglas)
+ MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.
+ (Carlo Curino via cdouglas)
+
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
@@ -193,6 +196,8 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
+ MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
+
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
@@ -258,6 +263,15 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5687. Fixed failure in TestYARNRunner caused by YARN-1446. (Jian He
via vinodkv)
+ MAPREDUCE-5694. Fixed MR AppMaster to shutdown the LogManager so as to avoid
+ losing syslog in some conditions. (Mohammad Kamrul Islam via vinodkv)
+
+ MAPREDUCE-5685. Fixed a bug with JobContext getCacheFiles API inside the
+ WrappedReducer class. (Yi Song via vinodkv)
+
+ MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps
+ cannot be fulfilled. (lohit via kasha)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1553225-1556096
Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1553225-1556096
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Tue Jan 7 01:07:33 2014
@@ -36,7 +36,9 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -45,8 +47,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
@@ -229,6 +231,22 @@ public class TaskAttemptListenerImpl ext
}
@Override
+ public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ LOG.info("Preempted state update from " + taskAttemptID.toString());
+ // An attempt is telling us that it got preempted.
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+
+ preemptionPolicy.reportSuccessfulPreemption(attemptID);
+ taskHeartbeatHandler.progressing(attemptID);
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_PREEMPTED));
+ }
+
+ @Override
public void done(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Done acknowledgement from " + taskAttemptID.toString());
@@ -250,6 +268,10 @@ public class TaskAttemptListenerImpl ext
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
+
+ // handling checkpoints
+ preemptionPolicy.handleFailedContainer(attemptID);
+
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
}
@@ -264,6 +286,10 @@ public class TaskAttemptListenerImpl ext
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
+
+ // handling checkpoints
+ preemptionPolicy.handleFailedContainer(attemptID);
+
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
}
@@ -294,12 +320,6 @@ public class TaskAttemptListenerImpl ext
}
@Override
- public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
- LOG.info("Ping from " + taskAttemptID.toString());
- return true;
- }
-
- @Override
public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
throws IOException {
diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
@@ -321,11 +341,33 @@ public class TaskAttemptListenerImpl ext
}
@Override
- public boolean statusUpdate(TaskAttemptID taskAttemptID,
+ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
TaskStatus taskStatus) throws IOException, InterruptedException {
- LOG.info("Status update from " + taskAttemptID.toString());
+
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID);
+
+ AMFeedback feedback = new AMFeedback();
+ feedback.setTaskFound(true);
+
+ // Propagating preemption to the task if TASK_PREEMPTION is enabled
+ if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
+ && preemptionPolicy.isPreempted(yarnAttemptID)) {
+ feedback.setPreemption(true);
+ LOG.info("Setting preemption bit for task: "+ yarnAttemptID
+ + " of type " + yarnAttemptID.getTaskId().getTaskType());
+ }
+
+ if (taskStatus == null) {
+ //We are using statusUpdate only as a simple ping
+ LOG.info("Ping from " + taskAttemptID.toString());
+ taskHeartbeatHandler.progressing(yarnAttemptID);
+ return feedback;
+ }
+
+ // if we are here there is an actual status update to be processed
+ LOG.info("Status update from " + taskAttemptID.toString());
+
taskHeartbeatHandler.progressing(yarnAttemptID);
TaskAttemptStatus taskAttemptStatus =
new TaskAttemptStatus();
@@ -386,7 +428,7 @@ public class TaskAttemptListenerImpl ext
context.getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
taskAttemptStatus));
- return true;
+ return feedback;
}
@Override
@@ -494,4 +536,18 @@ public class TaskAttemptListenerImpl ext
return ProtocolSignature.getProtocolSignature(this,
protocol, clientVersion, clientMethodsHash);
}
+
+ // task checkpoint bookeeping
+ @Override
+ public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ TaskId tid = TypeConverter.toYarn(taskId);
+ return preemptionPolicy.getCheckpointID(tid);
+ }
+
+ @Override
+ public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+ TaskId tid = TypeConverter.toYarn(taskId);
+ preemptionPolicy.setCheckpointID(tid, cid);
+ }
+
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Jan 7 01:07:33 2014
@@ -139,6 +139,7 @@ import org.apache.hadoop.yarn.security.c
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
@@ -1395,6 +1396,8 @@ public class MRAppMaster extends Composi
} catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t);
System.exit(1);
+ } finally {
+ LogManager.shutdown();
}
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java Tue Jan 7 01:07:33 2014
@@ -47,6 +47,7 @@ public enum TaskAttemptEventType {
TA_FAILMSG,
TA_UPDATE,
TA_TIMED_OUT,
+ TA_PREEMPTED,
//Producer:TaskCleaner
TA_CLEANUP_DONE,
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jan 7 01:07:33 2014
@@ -304,6 +304,9 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
// Transitions from COMMIT_PENDING state
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
@@ -437,6 +440,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
+ TaskAttemptEventType.TA_PREEMPTED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -1874,6 +1878,27 @@ public abstract class TaskAttemptImpl im
}
}
+ private static class PreemptedTransition implements
+ SingleArcTransition<TaskAttemptImpl,TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ taskAttempt.setFinishTime();
+ taskAttempt.taskAttemptListener.unregister(
+ taskAttempt.attemptId, taskAttempt.jvmID);
+ taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+ taskAttempt.attemptId,
+ taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
+ taskAttempt.container.getContainerToken(),
+ ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_KILLED));
+
+ }
+ }
+
private static class CleanupContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Jan 7 01:07:33 2014
@@ -229,7 +229,8 @@ public class RMContainerAllocator extend
int completedMaps = getJob().getCompletedMaps();
int completedTasks = completedMaps + getJob().getCompletedReduces();
- if (lastCompletedTasks != completedTasks) {
+ if ((lastCompletedTasks != completedTasks) ||
+ (scheduledRequests.maps.size() > 0)) {
lastCompletedTasks = completedTasks;
recalculateReduceSchedule = true;
}
@@ -347,7 +348,7 @@ public class RMContainerAllocator extend
}
} else if (
- event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+ event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
LOG.info("Processing the event " + event.toString());
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java Tue Jan 7 01:07:33 2014
@@ -19,10 +19,9 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.List;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.yarn.api.records.Container;
@@ -81,7 +80,7 @@ public interface AMPreemptionPolicy {
* successfully preempted (for bookeeping, counters, etc..)
* @param attemptID Task attempt that preempted
*/
- public void reportSuccessfulPreemption(TaskAttemptID attemptID);
+ public void reportSuccessfulPreemption(TaskAttemptId attemptID);
/**
* Callback informing the policy of containers exiting with a failure. This
@@ -98,20 +97,20 @@ public interface AMPreemptionPolicy {
public void handleCompletedContainer(TaskAttemptId attemptID);
/**
- * Method to retrieve the latest checkpoint for a given {@link TaskID}
+ * Method to retrieve the latest checkpoint for a given {@link TaskId}
* @param taskId TaskID
* @return CheckpointID associated with this task or null
*/
- public TaskCheckpointID getCheckpointID(TaskID taskId);
+ public TaskCheckpointID getCheckpointID(TaskId taskId);
/**
* Method to store the latest {@link
* org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link
- * TaskID}. Assigning a null is akin to remove all previous checkpoints for
+ * TaskId}. Assigning a null is akin to remove all previous checkpoints for
* this task.
* @param taskId TaskID
* @param cid Checkpoint to assign or <tt>null</tt> to remove it.
*/
- public void setCheckpointID(TaskID taskId, TaskCheckpointID cid);
+ public void setCheckpointID(TaskId taskId, TaskCheckpointID cid);
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java Tue Jan 7 01:07:33 2014
@@ -19,11 +19,10 @@ package org.apache.hadoop.mapreduce.v2.a
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -89,17 +88,17 @@ public class KillAMPreemptionPolicy impl
}
@Override
- public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+ public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
// ignore
}
@Override
- public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ public TaskCheckpointID getCheckpointID(TaskId taskId) {
return null;
}
@Override
- public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+ public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
// ignore
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java Tue Jan 7 01:07:33 2014
@@ -17,10 +17,9 @@
*/
package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -50,17 +49,17 @@ public class NoopAMPreemptionPolicy impl
}
@Override
- public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+ public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
// ignore
}
@Override
- public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ public TaskCheckpointID getCheckpointID(TaskId taskId) {
return null;
}
@Override
- public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+ public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
// ignore
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Tue Jan 7 01:07:33 2014
@@ -17,26 +17,23 @@
*/
package org.apache.hadoop.mapred;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
-
-import junit.framework.Assert;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -46,21 +43,31 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.SystemClock;
+
import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class TestTaskAttemptListenerImpl {
- public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
+ public static class MockTaskAttemptListenerImpl
+ extends TaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
- TaskHeartbeatHandler hbHandler) {
- super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
+ TaskHeartbeatHandler hbHandler,
+ AMPreemptionPolicy policy) {
+
+ super(context, jobTokenSecretManager, rmHeartbeatHandler, policy);
this.taskHeartbeatHandler = hbHandler;
}
@@ -87,9 +94,16 @@ public class TestTaskAttemptListenerImpl
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
MockTaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret,
- rmHeartbeatHandler, hbHandler);
+ rmHeartbeatHandler, hbHandler, policy);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
@@ -144,7 +158,7 @@ public class TestTaskAttemptListenerImpl
assertNotNull(jvmid);
try {
JVMId.forName("jvm_001_002_m_004_006");
- Assert.fail();
+ fail();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
"TaskId string : jvm_001_002_m_004_006 is not properly formed");
@@ -190,8 +204,14 @@ public class TestTaskAttemptListenerImpl
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- TaskAttemptListenerImpl listener =
- new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+ appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -219,7 +239,8 @@ public class TestTaskAttemptListenerImpl
isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
: org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(eventId);
@@ -244,8 +265,14 @@ public class TestTaskAttemptListenerImpl
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- TaskAttemptListenerImpl listener =
- new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+ appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -270,4 +297,88 @@ public class TestTaskAttemptListenerImpl
listener.stop();
}
+
+ @Test
+ public void testCheckpointIDTracking()
+ throws IOException, InterruptedException{
+
+ SystemClock clock = new SystemClock();
+
+ org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+ mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+ when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+
+ AppContext appCtx = mock(AppContext.class);
+ when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+ when(appCtx.getClock()).thenReturn(clock);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+ appCtx, secret, rmHeartbeatHandler, policy) {
+ @Override
+ protected void registerHeartbeatHandler(Configuration conf) {
+ taskHeartbeatHandler = hbHandler;
+ }
+ };
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.TASK_PREEMPTION, true);
+ //conf.setBoolean("preemption.reduce", true);
+
+ listener.init(conf);
+ listener.start();
+
+ TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+
+ List<Path> partialOut = new ArrayList<Path>();
+ partialOut.add(new Path("/prev1"));
+ partialOut.add(new Path("/prev2"));
+
+ Counters counters = mock(Counters.class);
+ final long CBYTES = 64L * 1024 * 1024;
+ final long CTIME = 4344L;
+ final Path CLOC = new Path("/test/1");
+ Counter cbytes = mock(Counter.class);
+ when(cbytes.getValue()).thenReturn(CBYTES);
+ Counter ctime = mock(Counter.class);
+ when(ctime.getValue()).thenReturn(CTIME);
+ when(counters.findCounter(eq(EnumCounter.CHECKPOINT_BYTES)))
+ .thenReturn(cbytes);
+ when(counters.findCounter(eq(EnumCounter.CHECKPOINT_MS)))
+ .thenReturn(ctime);
+
+ // propagating a taskstatus that contains a checkpoint id
+ TaskCheckpointID incid = new TaskCheckpointID(new FSCheckpointID(
+ CLOC), partialOut, counters);
+ listener.setCheckpointID(
+ org.apache.hadoop.mapred.TaskID.downgrade(tid.getTaskID()), incid);
+
+ // and try to get it back
+ CheckpointID outcid = listener.getCheckpointID(tid.getTaskID());
+ TaskCheckpointID tcid = (TaskCheckpointID) outcid;
+ assertEquals(CBYTES, tcid.getCheckpointBytes());
+ assertEquals(CTIME, tcid.getCheckpointTime());
+ assertTrue(partialOut.containsAll(tcid.getPartialCommittedOutput()));
+ assertTrue(tcid.getPartialCommittedOutput().containsAll(partialOut));
+
+ //assert it worked
+ assert outcid == incid;
+
+ listener.stop();
+
+ }
+
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Jan 7 01:07:33 2014
@@ -1604,6 +1604,21 @@ public class TestRMContainerAllocator {
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
+
+ // Test reduce ramp-down for when there are scheduled maps
+ // Since we have two scheduled Maps, rampDownReducers
+ // should be invoked twice.
+ scheduledMaps = 2;
+ assignedReduces = 2;
+ doReturn(10 * 1024).when(allocator).getMemLimit();
+ allocator.scheduleReduces(
+ totalMaps, succeededMaps,
+ scheduledMaps, scheduledReduces,
+ assignedMaps, assignedReduces,
+ mapResourceReqt, reduceResourceReqt,
+ numPendingReduces,
+ maxReduceRampupLimit, reduceSlowStart);
+ verify(allocator, times(2)).rampDownReduces(anyInt());
}
private static class RecalculateContainerAllocator extends MyContainerAllocator {
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jan 7 01:07:33 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.Queue
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -575,10 +576,17 @@ public class LocalJobRunner implements C
// TaskUmbilicalProtocol methods
+ @Override
public JvmTask getTask(JvmContext context) { return null; }
- public synchronized boolean statusUpdate(TaskAttemptID taskId,
+ @Override
+ public synchronized AMFeedback statusUpdate(TaskAttemptID taskId,
TaskStatus taskStatus) throws IOException, InterruptedException {
+ AMFeedback feedback = new AMFeedback();
+ feedback.setTaskFound(true);
+ if (null == taskStatus) {
+ return feedback;
+ }
// Serialize as we would if distributed in order to make deep copy
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@@ -618,7 +626,7 @@ public class LocalJobRunner implements C
}
// ignore phase
- return true;
+ return feedback;
}
/** Return the current values of the counters for this job,
@@ -654,24 +662,24 @@ public class LocalJobRunner implements C
statusUpdate(taskid, taskStatus);
}
+ @Override
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
// Ignore for now
}
+ @Override
public void reportNextRecordRange(TaskAttemptID taskid,
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
- public boolean ping(TaskAttemptID taskid) throws IOException {
- return true;
- }
-
+ @Override
public boolean canCommit(TaskAttemptID taskid)
throws IOException {
return true;
}
+ @Override
public void done(TaskAttemptID taskId) throws IOException {
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
@@ -681,11 +689,13 @@ public class LocalJobRunner implements C
}
}
+ @Override
public synchronized void fsError(TaskAttemptID taskId, String message)
throws IOException {
LOG.fatal("FSError: "+ message + "from task: " + taskId);
}
+ @Override
public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
}
@@ -695,12 +705,30 @@ public class LocalJobRunner implements C
LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
}
+ @Override
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
return new MapTaskCompletionEventsUpdate(
org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
}
-
+
+ @Override
+ public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ // ignore
+ }
+
+ @Override
+ public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ // ignore
+ return null;
+ }
+
+ @Override
+ public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+ // ignore
+ }
+
}
public LocalJobRunner(Configuration conf) throws IOException {
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Tue Jan 7 01:07:33 2014
@@ -44,6 +44,8 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -82,12 +84,11 @@ public class TestMRWithDistributedCache
private static final Log LOG =
LogFactory.getLog(TestMRWithDistributedCache.class);
+
+ private static class DistributedCacheChecker {
- public static class DistributedCacheChecker extends
- Mapper<LongWritable, Text, NullWritable, NullWritable> {
-
- @Override
- public void setup(Context context) throws IOException {
+ public void setup(TaskInputOutputContext<?, ?, ?, ?> context)
+ throws IOException {
Configuration conf = context.getConfiguration();
Path[] localFiles = context.getLocalCacheFiles();
URI[] files = context.getCacheFiles();
@@ -101,6 +102,10 @@ public class TestMRWithDistributedCache
TestCase.assertEquals(2, files.length);
TestCase.assertEquals(2, archives.length);
+ // Check the file name
+ TestCase.assertTrue(files[0].getPath().endsWith("distributed.first"));
+ TestCase.assertTrue(files[1].getPath().endsWith("distributed.second.jar"));
+
// Check lengths of the files
TestCase.assertEquals(1, fs.getFileStatus(localFiles[0]).getLen());
TestCase.assertTrue(fs.getFileStatus(localFiles[1]).getLen() > 1);
@@ -130,8 +135,28 @@ public class TestMRWithDistributedCache
TestCase.assertTrue("second file should be symlinked too",
expectedAbsentSymlinkFile.exists());
}
+
}
-
+
+ public static class DistributedCacheCheckerMapper extends
+ Mapper<LongWritable, Text, NullWritable, NullWritable> {
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ new DistributedCacheChecker().setup(context);
+ }
+ }
+
+ public static class DistributedCacheCheckerReducer extends
+ Reducer<LongWritable, Text, NullWritable, NullWritable> {
+
+ @Override
+ public void setup(Context context) throws IOException {
+ new DistributedCacheChecker().setup(context);
+ }
+ }
+
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
@@ -146,7 +171,8 @@ public class TestMRWithDistributedCache
Job job = Job.getInstance(conf);
- job.setMapperClass(DistributedCacheChecker.class);
+ job.setMapperClass(DistributedCacheCheckerMapper.class);
+ job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Tue Jan 7 01:07:33 2014
@@ -949,12 +949,29 @@ public class JobConf extends Configurati
return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
}
+ /**
+ * Get the user defined {@link WritableComparable} comparator for
+ * grouping keys of inputs to the combiner.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see #setCombinerKeyGroupingComparator(Class) for details.
+ */
+ public RawComparator getCombinerKeyGroupingComparator() {
+ Class<? extends RawComparator> theClass = getClass(
+ JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
+ if (theClass == null) {
+ return getOutputKeyComparator();
+ }
+
+ return ReflectionUtils.newInstance(theClass, this);
+ }
+
/**
* Get the user defined {@link WritableComparable} comparator for
* grouping keys of inputs to the reduce.
*
* @return comparator set by the user for grouping values.
- * @see #setOutputValueGroupingComparator(Class) for details.
+ * @see #setOutputValueGroupingComparator(Class) for details.
*/
public RawComparator getOutputValueGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
@@ -966,6 +983,37 @@ public class JobConf extends Configurati
return ReflectionUtils.newInstance(theClass, this);
}
+ /**
+ * Set the user defined {@link RawComparator} comparator for
+ * grouping keys in the input to the combiner.
+ * <p/>
+ * <p>This comparator should be provided if the equivalence rules for keys
+ * for sorting the intermediates are different from those for grouping keys
+ * before each call to
+ * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
+ * <p/>
+ * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
+ * in a single call to the reduce function if K1 and K2 compare as equal.</p>
+ * <p/>
+ * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
+ * how keys are sorted, this can be used in conjunction to simulate
+ * <i>secondary sort on values</i>.</p>
+ * <p/>
+ * <p><i>Note</i>: This is not a guarantee of the combiner sort being
+ * <i>stable</i> in any sense. (In any case, with the order of available
+ * map-outputs to the combiner being non-deterministic, it wouldn't make
+ * that much sense.)</p>
+ *
+ * @param theClass the comparator class to be used for grouping keys for the
+ * combiner. It should implement <code>RawComparator</code>.
+ * @see #setOutputKeyComparatorClass(Class)
+ */
+ public void setCombinerKeyGroupingComparator(
+ Class<? extends RawComparator> theClass) {
+ setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
+ theClass, RawComparator.class);
+ }
+
/**
* Set the user defined {@link RawComparator} comparator for
* grouping keys in the input to the reduce.
@@ -989,7 +1037,8 @@ public class JobConf extends Configurati
*
* @param theClass the comparator class to be used for grouping keys.
* It should implement <code>RawComparator</code>.
- * @see #setOutputKeyComparatorClass(Class)
+ * @see #setOutputKeyComparatorClass(Class)
+ * @see #setCombinerKeyGroupingComparator(Class)
*/
public void setOutputValueGroupingComparator(
Class<? extends RawComparator> theClass) {
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue Jan 7 01:07:33 2014
@@ -187,6 +187,7 @@ abstract public class Task implements Wr
protected SecretKey tokenSecret;
protected SecretKey shuffleSecret;
protected GcTimeUpdater gcUpdater;
+ final AtomicBoolean mustPreempt = new AtomicBoolean(false);
////////////////////////////////////////////
// Constructors
@@ -711,6 +712,7 @@ abstract public class Task implements Wr
}
try {
boolean taskFound = true; // whether TT knows about this task
+ AMFeedback amFeedback = null;
// sleep for a bit
synchronized(lock) {
if (taskDone.get()) {
@@ -728,12 +730,14 @@ abstract public class Task implements Wr
taskStatus.statusUpdate(taskProgress.get(),
taskProgress.toString(),
counters);
- taskFound = umbilical.statusUpdate(taskId, taskStatus);
+ amFeedback = umbilical.statusUpdate(taskId, taskStatus);
+ taskFound = amFeedback.getTaskFound();
taskStatus.clearStatus();
}
else {
// send ping
- taskFound = umbilical.ping(taskId);
+ amFeedback = umbilical.statusUpdate(taskId, null);
+ taskFound = amFeedback.getTaskFound();
}
// if Task Tracker is not aware of our task ID (probably because it died and
@@ -744,6 +748,17 @@ abstract public class Task implements Wr
System.exit(66);
}
+ // Set a flag that says we should preempt this is read by
+ // ReduceTasks in places of the execution where it is
+ // safe/easy to preempt
+ boolean lastPreempt = mustPreempt.get();
+ mustPreempt.set(mustPreempt.get() || amFeedback.getPreemption());
+
+ if (lastPreempt ^ mustPreempt.get()) {
+ LOG.info("PREEMPTION TASK: setting mustPreempt to " +
+ mustPreempt.get() + " given " + amFeedback.getPreemption() +
+ " for "+ taskId + " task status: " +taskStatus.getPhase());
+ }
sendProgress = resetProgressFlag();
remainingRetries = MAX_RETRIES;
}
@@ -992,10 +1007,17 @@ abstract public class Task implements Wr
public void done(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
- LOG.info("Task:" + taskId + " is done."
- + " And is in the process of committing");
updateCounters();
-
+ if (taskStatus.getRunState() == TaskStatus.State.PREEMPTED ) {
+ // If we are preempted, do no output promotion; signal done and exit
+ committer.commitTask(taskContext);
+ umbilical.preempted(taskId, taskStatus);
+ taskDone.set(true);
+ reporter.stopCommunicationThread();
+ return;
+ }
+ LOG.info("Task:" + taskId + " is done."
+ + " And is in the process of committing");
boolean commitRequired = isCommitRequired();
if (commitRequired) {
int retries = MAX_RETRIES;
@@ -1054,7 +1076,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+ if (!umbilical.statusUpdate(getTaskID(), taskStatus).getTaskFound()) {
LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
}
@@ -1098,8 +1120,8 @@ abstract public class Task implements Wr
if (isMapTask() && conf.getNumReduceTasks() > 0) {
try {
Path mapOutput = mapOutputFile.getOutputFile();
- FileSystem localFS = FileSystem.getLocal(conf);
- return localFS.getFileStatus(mapOutput).getLen();
+ FileSystem fs = mapOutput.getFileSystem(conf);
+ return fs.getFileStatus(mapOutput).getLen();
} catch (IOException e) {
LOG.warn ("Could not find output size " , e);
}
@@ -1553,7 +1575,8 @@ abstract public class Task implements Wr
combinerClass = cls;
keyClass = (Class<K>) job.getMapOutputKeyClass();
valueClass = (Class<V>) job.getMapOutputValueClass();
- comparator = (RawComparator<K>) job.getOutputKeyComparator();
+ comparator = (RawComparator<K>)
+ job.getCombinerKeyGroupingComparator();
}
@SuppressWarnings("unchecked")
@@ -1602,7 +1625,7 @@ abstract public class Task implements Wr
this.taskId = taskId;
keyClass = (Class<K>) context.getMapOutputKeyClass();
valueClass = (Class<V>) context.getMapOutputValueClass();
- comparator = (RawComparator<K>) context.getSortComparator();
+ comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
this.committer = committer;
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java Tue Jan 7 01:07:33 2014
@@ -51,7 +51,7 @@ public abstract class TaskStatus impleme
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
+ COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN, PREEMPTED}
private final TaskAttemptID taskid;
private float progress;
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Jan 7 01:07:33 2014
@@ -24,6 +24,9 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
import org.apache.hadoop.security.token.TokenInfo;
@@ -64,9 +67,10 @@ public interface TaskUmbilicalProtocol e
* Version 17 Modified TaskID to be aware of the new TaskTypes
* Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
* Version 19 Added fatalError for child to communicate fatal errors to TT
+ * Version 20 Added methods to manage checkpoints
* */
- public static final long versionID = 19L;
+ public static final long versionID = 20L;
/**
* Called when a child task process starts, to get its task.
@@ -78,7 +82,8 @@ public interface TaskUmbilicalProtocol e
JvmTask getTask(JvmContext context) throws IOException;
/**
- * Report child's progress to parent.
+ * Report child's progress to parent. Also invoked to report still alive (used
+ * to be in ping). It reports an AMFeedback used to propagate preemption requests.
*
* @param taskId task-id of the child
* @param taskStatus status of the child
@@ -86,7 +91,7 @@ public interface TaskUmbilicalProtocol e
* @throws InterruptedException
* @return True if the task is known
*/
- boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException;
/** Report error messages back to parent. Calls should be sparing, since all
@@ -105,11 +110,6 @@ public interface TaskUmbilicalProtocol e
void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
throws IOException;
- /** Periodically called by child to check if parent is still alive.
- * @return True if the task is known
- */
- boolean ping(TaskAttemptID taskid) throws IOException;
-
/** Report that the task is successfully completed. Failure is assumed if
* the task process exits without calling this.
* @param taskid task's id
@@ -161,4 +161,33 @@ public interface TaskUmbilicalProtocol e
TaskAttemptID id)
throws IOException;
+ /**
+ * Report to the AM that the task has been succesfully preempted.
+ *
+ * @param taskId task's id
+ * @param taskStatus status of the child
+ * @throws IOException
+ */
+ void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException;
+
+ /**
+ * Return the latest CheckpointID for the given TaskID. This provides
+ * the task with a way to locate the checkpointed data and restart from
+ * that point in the computation.
+ *
+ * @param taskID task's id
+ * @return the most recent checkpoint (if any) for this task
+ * @throws IOException
+ */
+ TaskCheckpointID getCheckpointID(TaskID taskID);
+
+ /**
+ * Send a CheckpointID for a given TaskID to be stored in the AM,
+ * to later restart a task from this checkpoint.
+ * @param tid
+ * @param cid
+ */
+ void setCheckpointID(TaskID tid, TaskCheckpointID cid);
+
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Tue Jan 7 01:07:33 2014
@@ -949,10 +949,26 @@ public class Job extends JobContextImpl
}
/**
+ * Define the comparator that controls which keys are grouped together
+ * for a single call to combiner,
+ * {@link Reducer#reduce(Object, Iterable,
+ * org.apache.hadoop.mapreduce.Reducer.Context)}
+ *
+ * @param cls the raw comparator to use
+ * @throws IllegalStateException if the job is submitted
+ */
+ public void setCombinerKeyGroupingComparatorClass(
+ Class<? extends RawComparator> cls) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
+ conf.setCombinerKeyGroupingComparator(cls);
+ }
+
+ /**
* Define the comparator that controls how the keys are sorted before they
* are passed to the {@link Reducer}.
* @param cls the raw comparator
* @throws IllegalStateException if the job is submitted
+ * @see #setCombinerKeyGroupingComparatorClass(Class)
*/
public void setSortComparatorClass(Class<? extends RawComparator> cls
) throws IllegalStateException {
@@ -967,6 +983,7 @@ public class Job extends JobContextImpl
* org.apache.hadoop.mapreduce.Reducer.Context)}
* @param cls the raw comparator to use
* @throws IllegalStateException if the job is submitted
+ * @see #setCombinerKeyGroupingComparatorClass(Class)
*/
public void setGroupingComparatorClass(Class<? extends RawComparator> cls
) throws IllegalStateException {
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Tue Jan 7 01:07:33 2014
@@ -167,13 +167,23 @@ public interface JobContext extends MRJo
*/
public String getJar();
- /**
- * Get the user defined {@link RawComparator} comparator for
- * grouping keys of inputs to the reduce.
- *
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the combiner.
+ *
* @return comparator set by the user for grouping values.
- * @see Job#setGroupingComparatorClass(Class) for details.
+ * @see Job#setCombinerKeyGroupingComparatorClass(Class)
*/
+ public RawComparator<?> getCombinerKeyGroupingComparator();
+
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the reduce.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see Job#setGroupingComparatorClass(Class)
+ * @see #getCombinerKeyGroupingComparator()
+ */
public RawComparator<?> getGroupingComparator();
/**
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Jan 7 01:07:33 2014
@@ -93,6 +93,8 @@ public interface MRJobConfig {
public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
+ public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class";
+
public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
public static final String WORKING_DIR = "mapreduce.job.working.dir";
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java Tue Jan 7 01:07:33 2014
@@ -34,37 +34,31 @@ import org.apache.hadoop.mapred.Counters
* cost of checkpoints and other counters. This is sent by the task to the AM
* to be stored and provided to the next execution of the same task.
*/
-public class TaskCheckpointID implements CheckpointID{
+public class TaskCheckpointID implements CheckpointID {
- FSCheckpointID rawId;
- private List<Path> partialOutput;
- private Counters counters;
+ final FSCheckpointID rawId;
+ private final List<Path> partialOutput;
+ private final Counters counters;
public TaskCheckpointID() {
- this.rawId = new FSCheckpointID();
- this.partialOutput = new ArrayList<Path>();
+ this(new FSCheckpointID(), new ArrayList<Path>(), new Counters());
}
public TaskCheckpointID(FSCheckpointID rawId, List<Path> partialOutput,
Counters counters) {
this.rawId = rawId;
this.counters = counters;
- if(partialOutput == null)
- this.partialOutput = new ArrayList<Path>();
- else
- this.partialOutput = partialOutput;
+ this.partialOutput = null == partialOutput
+ ? new ArrayList<Path>()
+ : partialOutput;
}
@Override
public void write(DataOutput out) throws IOException {
counters.write(out);
- if (partialOutput == null) {
- WritableUtils.writeVLong(out, 0L);
- } else {
- WritableUtils.writeVLong(out, partialOutput.size());
- for(Path p:partialOutput){
- Text.writeString(out, p.toString());
- }
+ WritableUtils.writeVLong(out, partialOutput.size());
+ for (Path p : partialOutput) {
+ Text.writeString(out, p.toString());
}
rawId.write(out);
}
@@ -74,21 +68,22 @@ public class TaskCheckpointID implements
partialOutput.clear();
counters.readFields(in);
long numPout = WritableUtils.readVLong(in);
- for(int i=0;i<numPout;i++)
+ for (int i = 0; i < numPout; i++) {
partialOutput.add(new Path(Text.readString(in)));
+ }
rawId.readFields(in);
}
@Override
- public boolean equals(Object other){
+ public boolean equals(Object other) {
if (other instanceof TaskCheckpointID){
- return this.rawId.equals(((TaskCheckpointID)other).rawId) &&
- this.counters.equals(((TaskCheckpointID) other).counters) &&
- this.partialOutput.containsAll(((TaskCheckpointID) other).partialOutput) &&
- ((TaskCheckpointID) other).partialOutput.containsAll(this.partialOutput);
- } else {
- return false;
+ TaskCheckpointID o = (TaskCheckpointID) other;
+ return rawId.equals(o.rawId) &&
+ counters.equals(o.counters) &&
+ partialOutput.containsAll(o.partialOutput) &&
+ o.partialOutput.containsAll(partialOutput);
}
+ return false;
}
@Override
@@ -110,7 +105,7 @@ public class TaskCheckpointID implements
return counters.findCounter(EnumCounter.CHECKPOINT_MS).getValue();
}
- public String toString(){
+ public String toString() {
return rawId.toString() + " counters:" + counters;
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Tue Jan 7 01:07:33 2014
@@ -167,6 +167,11 @@ class ChainMapContextImpl<KEYIN, VALUEIN
}
@Override
+ public RawComparator<?> getCombinerKeyGroupingComparator() {
+ return base.getCombinerKeyGroupingComparator();
+ }
+
+ @Override
public RawComparator<?> getGroupingComparator() {
return base.getGroupingComparator();
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Tue Jan 7 01:07:33 2014
@@ -160,6 +160,11 @@ class ChainReduceContextImpl<KEYIN, VALU
}
@Override
+ public RawComparator<?> getCombinerKeyGroupingComparator() {
+ return base.getCombinerKeyGroupingComparator();
+ }
+
+ @Override
public RawComparator<?> getGroupingComparator() {
return base.getGroupingComparator();
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Tue Jan 7 01:07:33 2014
@@ -169,6 +169,11 @@ public class WrappedMapper<KEYIN, VALUEI
}
@Override
+ public RawComparator<?> getCombinerKeyGroupingComparator() {
+ return mapContext.getCombinerKeyGroupingComparator();
+ }
+
+ @Override
public RawComparator<?> getGroupingComparator() {
return mapContext.getGroupingComparator();
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Tue Jan 7 01:07:33 2014
@@ -137,7 +137,7 @@ public class WrappedReducer<KEYIN, VALUE
@Override
public URI[] getCacheFiles() throws IOException {
- return reduceContext.getCacheArchives();
+ return reduceContext.getCacheFiles();
}
@Override
@@ -162,6 +162,11 @@ public class WrappedReducer<KEYIN, VALUE
}
@Override
+ public RawComparator<?> getCombinerKeyGroupingComparator() {
+ return reduceContext.getCombinerKeyGroupingComparator();
+ }
+
+ @Override
public RawComparator<?> getGroupingComparator() {
return reduceContext.getGroupingComparator();
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Tue Jan 7 01:07:33 2014
@@ -252,6 +252,17 @@ public class JobContextImpl implements J
return conf.getJar();
}
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the combiner.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
+ */
+ public RawComparator<?> getCombinerKeyGroupingComparator() {
+ return conf.getCombinerKeyGroupingComparator();
+ }
+
/**
* Get the user defined {@link RawComparator} comparator for
* grouping keys of inputs to the reduce.
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Jan 7 01:07:33 2014
@@ -582,7 +582,7 @@ public class MergeManagerImpl<K, V> impl
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
- (RawComparator<K>)job.getOutputKeyComparator();
+ (RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1553225-1556096
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Tue Jan 7 01:07:33 2014
@@ -88,6 +88,8 @@ import org.apache.hadoop.yarn.util.Recor
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This module is responsible for talking to the
* JobClient (user facing).
@@ -142,7 +144,8 @@ public class HistoryClientService extend
super.serviceStart();
}
- private void initializeWebApp(Configuration conf) {
+ @VisibleForTesting
+ protected void initializeWebApp(Configuration conf) {
webApp = new HsWebApp(history);
InetSocketAddress bindAddress = MRWebAppUtil.getJHSWebBindAddress(conf);
// NOTE: there should be a .at(InetSocketAddress)
|