Author: suresh
Date: Thu Nov 8 19:09:46 2012
New Revision: 1407217
URL: http://svn.apache.org/viewvc?rev=1407217&view=rev
Log:
Merging trunk to branch-trunk-win branch
Added:
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
- copied unchanged from r1407201, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
- copied unchanged from r1407201, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
Modified:
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/QueueAclsInfo.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/QueueInfo.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskReport.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsView.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedPartitioner.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/c++/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/block_forensics/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/build-contrib.xml (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/build.xml (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/data_join/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/eclipse-plugin/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/vaidya/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/examples/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/java/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/webapps/job/ (props changed)
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1401063-1407201
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Thu Nov 8 19:09:46 2012
@@ -189,6 +189,14 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4736. Remove obsolete option [-rootDir] from TestDFSIO.
(Brandon Li via suresh)
+ MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and
+ UNASSIGNED states. (Mayank Bansal via sseth)
+
+ MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera Shegalov via tucu)
+
+ MAPREDUCE-4777. In TestIFile, testIFileReaderWithCodec relies on
+ testIFileWriterWithCodec. (Sandy Ryza via tomwhite)
+
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@@ -576,6 +584,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol
for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv)
+ MAPREDUCE-4752. Reduce MR AM memory usage through String Interning (Robert
+ Evans via tgraves)
+
OPTIMIZATIONS
BUG FIXES
@@ -602,6 +613,34 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4740. only .jars can be added to the Distributed Cache
classpath. (Robert Joseph Evans via jlowe)
+ MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn)
+
+ MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
+ (Vinod Kumar Vavilapalli via jlowe)
+
+ MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion
+ requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv)
+
+ MAPREDUCE-4748. Invalid event: T_ATTEMPT_SUCCEEDED at SUCCEEDED. (jlowe)
+
+ MAPREDUCE-4724. job history web ui applications page should be sorted to
+ display last app first (tgraves via bobby)
+
+ MAPREDUCE-4746. The MR Application Master does not have a config to set
+ environment variables (Rob Parker via bobby)
+
+ MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod
+ Kumar Vavilapalli via jlowe)
+
+ MAPREDUCE-4763 repair test TestUmbilicalProtocolWithJobToken (Ivan A.
+ Veselovsky via bobby)
+
+ MAPREDUCE-4771. KeyFieldBasedPartitioner not partitioning properly when
+ configured (jlowe via bobby)
+
+ MAPREDUCE-4772. Fetch failures can take way too long for a map to be
+ restarted (bobby)
+
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1401063-1407201
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1401063-1407201
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu Nov 8 19:09:46 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.CompositeService;
@@ -280,6 +281,7 @@ public class TaskAttemptListenerImpl ext
@Override
public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
throws IOException {
+ diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": "
+ diagnosticInfo);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Nov 8 19:09:46 2012
@@ -23,14 +23,17 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -89,6 +95,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
@@ -107,6 +114,8 @@ import org.apache.hadoop.yarn.service.Co
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
@@ -398,52 +407,65 @@ public class MRAppMaster extends Composi
protected void sysexit() {
System.exit(0);
}
-
- private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
- @Override
- public void handle(JobFinishEvent event) {
- // job has finished
- // this is the only job, so shut down the Appmaster
- // note in a workflow scenario, this may lead to creation of a new
- // job (FIXME?)
- // Send job-end notification
- if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
- try {
- LOG.info("Job end notification started for jobID : "
- + job.getReport().getJobId());
- JobEndNotifier notifier = new JobEndNotifier();
- notifier.setConf(getConfig());
- notifier.notify(job.getReport());
- } catch (InterruptedException ie) {
- LOG.warn("Job end notification interrupted for jobID : "
- + job.getReport().getJobId(), ie);
- }
- }
- // TODO:currently just wait for some time so clients can know the
- // final states. Will be removed once RM come on.
+ @VisibleForTesting
+ public void shutDownJob() {
+ // job has finished
+ // this is the only job, so shut down the Appmaster
+ // note in a workflow scenario, this may lead to creation of a new
+ // job (FIXME?)
+ // Send job-end notification
+ if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
+ LOG.info("Job end notification started for jobID : "
+ + job.getReport().getJobId());
+ JobEndNotifier notifier = new JobEndNotifier();
+ notifier.setConf(getConfig());
+ notifier.notify(job.getReport());
+ } catch (InterruptedException ie) {
+ LOG.warn("Job end notification interrupted for jobID : "
+ + job.getReport().getJobId(), ie);
}
+ }
- try {
- //We are finishing cleanly so this is the last retry
- isLastAMRetry = true;
- // Stop all services
- // This will also send the final report to the ResourceManager
- LOG.info("Calling stop for all the services");
- stop();
+ // TODO:currently just wait for some time so clients can know the
+ // final states. Will be removed once RM come on.
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
- } catch (Throwable t) {
- LOG.warn("Graceful stop failed ", t);
- }
+ try {
+ //We are finishing cleanly so this is the last retry
+ isLastAMRetry = true;
+ // Stop all services
+ // This will also send the final report to the ResourceManager
+ LOG.info("Calling stop for all the services");
+ MRAppMaster.this.stop();
- //Bring the process down by force.
- //Not needed after HADOOP-7140
- LOG.info("Exiting MR AppMaster..GoodBye!");
- sysexit();
+ } catch (Throwable t) {
+ LOG.warn("Graceful stop failed ", t);
+ }
+
+ //Bring the process down by force.
+ //Not needed after HADOOP-7140
+ LOG.info("Exiting MR AppMaster..GoodBye!");
+ sysexit();
+ }
+
+ private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+ @Override
+ public void handle(JobFinishEvent event) {
+ // Create a new thread to shutdown the AM. We should not do it in-line
+ // to avoid blocking the dispatcher itself.
+ new Thread() {
+
+ @Override
+ public void run() {
+ shutDownJob();
+ }
+ }.start();
}
}
@@ -811,16 +833,21 @@ public class MRAppMaster extends Composi
@Override
public void start() {
+ amInfos = new LinkedList<AMInfo>();
+
// Pull completedTasks etc from recovery
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
+ } else {
+ // Get the amInfos anyways irrespective of whether recovery is enabled or
+ // not IF this is not the first AM generation
+ if (appAttemptID.getAttemptId() != 1) {
+ amInfos.addAll(readJustAMInfos());
+ }
}
- // / Create the AMInfo for the current AppMaster
- if (amInfos == null) {
- amInfos = new LinkedList<AMInfo>();
- }
+ // Current an AMInfo for the current AM generation.
AMInfo amInfo =
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
nmPort, nmHttpPort);
@@ -878,6 +905,51 @@ public class MRAppMaster extends Composi
startJobs();
}
+ private List<AMInfo> readJustAMInfos() {
+ List<AMInfo> amInfos = new ArrayList<AMInfo>();
+ FSDataInputStream inputStream = null;
+ try {
+ inputStream =
+ RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
+ appAttemptID);
+ EventReader jobHistoryEventReader = new EventReader(inputStream);
+
+ // All AMInfos are contiguous. Track when the first AMStartedEvent
+ // appears.
+ boolean amStartedEventsBegan = false;
+
+ HistoryEvent event;
+ while ((event = jobHistoryEventReader.getNextEvent()) != null) {
+ if (event.getEventType() == EventType.AM_STARTED) {
+ if (!amStartedEventsBegan) {
+ // First AMStartedEvent.
+ amStartedEventsBegan = true;
+ }
+ AMStartedEvent amStartedEvent = (AMStartedEvent) event;
+ amInfos.add(MRBuilderUtils.newAMInfo(
+ amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
+ amStartedEvent.getContainerId(),
+ StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
+ amStartedEvent.getNodeManagerPort(),
+ amStartedEvent.getNodeManagerHttpPort()));
+ } else if (amStartedEventsBegan) {
+ // This means AMStartedEvents began and this event is a
+ // non-AMStarted event.
+ // No need to continue reading all the other events.
+ break;
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not parse the old history file. "
+ + "Will not have old AMinfos ", e);
+ } finally {
+ if (inputStream != null) {
+ IOUtils.closeQuietly(inputStream);
+ }
+ }
+ return amInfos;
+ }
+
/**
* This can be overridden to instantiate multiple jobs and create a
* workflow.
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Nov 8 19:09:46 2012
@@ -68,6 +68,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -1409,16 +1410,22 @@ public class JobImpl implements org.apac
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
job.fetchFailuresMapping.put(mapId, fetchFailures);
- //get number of running reduces
- int runningReduceTasks = 0;
+ //get number of shuffling reduces
+ int shufflingReduceTasks = 0;
for (TaskId taskId : job.reduceTasks) {
- if (TaskState.RUNNING.equals(job.tasks.get(taskId).getState())) {
- runningReduceTasks++;
+ Task task = job.tasks.get(taskId);
+ if (TaskState.RUNNING.equals(task.getState())) {
+ for(TaskAttempt attempt : task.getAttempts().values()) {
+ if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
+ shufflingReduceTasks++;
+ break;
+ }
+ }
}
}
- float failureRate = runningReduceTasks == 0 ? 1.0f :
- (float) fetchFailures / runningReduceTasks;
+ float failureRate = shufflingReduceTasks == 0 ? 1.0f :
+ (float) fetchFailures / shufflingReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
boolean isMapFaulty =
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Nov 8 19:09:46 2012
@@ -105,6 +105,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
@@ -200,6 +201,10 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_KILL, new KilledTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
+ .addTransition(TaskAttemptStateInternal.NEW,
+ TaskAttemptStateInternal.NEW,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
@@ -211,6 +216,10 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
TaskAttemptStateInternal.FAILED, true))
+ .addTransition(TaskAttemptStateInternal.UNASSIGNED,
+ TaskAttemptStateInternal.UNASSIGNED,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the ASSIGNED state.
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
@@ -932,7 +941,6 @@ public abstract class TaskAttemptImpl im
Counters counters = reportedStatus.counters;
if (counters == null) {
counters = EMPTY_COUNTERS;
-// counters.groups = new HashMap<String, CounterGroup>();
}
return counters;
} finally {
@@ -1254,9 +1262,10 @@ public abstract class TaskAttemptImpl im
(TaskAttemptContainerAssignedEvent) event;
taskAttempt.containerID = cEvent.getContainer().getId();
taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
- taskAttempt.containerMgrAddress = taskAttempt.containerNodeId
- .toString();
- taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress();
+ taskAttempt.containerMgrAddress = StringInterner.weakIntern(
+ taskAttempt.containerNodeId.toString());
+ taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
+ cEvent.getContainer().getNodeHttpAddress());
taskAttempt.nodeRackName = RackResolver.resolve(
taskAttempt.containerNodeId.getHost()).getNetworkLocation();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
@@ -1702,7 +1711,6 @@ public abstract class TaskAttemptImpl im
result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW;
Counters counters = EMPTY_COUNTERS;
- // counters.groups = new HashMap<String, CounterGroup>();
result.counters = counters;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Nov 8 19:09:46 2012
@@ -75,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -203,7 +204,10 @@ public abstract class TaskImpl implement
.addTransition(
TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
- TaskEventType.T_ATTEMPT_LAUNCHED))
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+ TaskEventType.T_ATTEMPT_LAUNCHED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ TaskEventType.T_KILL))
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
@@ -664,9 +668,9 @@ public abstract class TaskImpl implement
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
String scheme = (encryptedShuffle) ? "https://" : "http://";
- tce.setMapOutputServerAddress(scheme
+ tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
- + attempt.getShufflePort());
+ + attempt.getShufflePort()));
tce.setStatus(status);
tce.setAttemptId(attempt.getID());
int runTime = 0;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Thu Nov 8 19:09:46 2012
@@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -81,6 +82,7 @@ public class ContainerLauncherImpl exten
protected BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
YarnRPC rpc;
+ private final AtomicBoolean stopped;
private Container getContainer(ContainerLauncherEvent event) {
ContainerId id = event.getContainerID();
@@ -237,6 +239,7 @@ public class ContainerLauncherImpl exten
public ContainerLauncherImpl(AppContext context) {
super(ContainerLauncherImpl.class.getName());
this.context = context;
+ this.stopped = new AtomicBoolean(false);
}
@Override
@@ -271,11 +274,13 @@ public class ContainerLauncherImpl exten
@Override
public void run() {
ContainerLauncherEvent event = null;
- while (!Thread.currentThread().isInterrupted()) {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
+ if (!stopped.get()) {
+ LOG.error("Returning, interrupted : " + e);
+ }
return;
}
int poolSize = launcherPool.getCorePoolSize();
@@ -324,6 +329,10 @@ public class ContainerLauncherImpl exten
}
public void stop() {
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
// shutdown any containers that might be left running
shutdownAllContainers();
eventHandlingThread.interrupt();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Nov 8 19:09:46 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -178,26 +177,13 @@ public class RecoveryService extends Com
}
private void parse() throws IOException {
- // TODO: parse history file based on startCount
- String jobName =
- TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
- String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
- FSDataInputStream in = null;
- Path historyFile = null;
- Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
- new Path(jobhistoryDir));
- FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
- getConfig());
- //read the previous history file
- historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
- histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
- LOG.info("History file is at " + historyFile);
- in = fc.open(historyFile);
+ FSDataInputStream in =
+ getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
- LOG.info("Got an error parsing job-history file " + historyFile +
+ LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
@@ -213,6 +199,28 @@ public class RecoveryService extends Com
LOG.info("Read completed tasks from history "
+ completedTasks.size());
}
+
+ public static FSDataInputStream getPreviousJobHistoryFileStream(
+ Configuration conf, ApplicationAttemptId applicationAttemptId)
+ throws IOException {
+ FSDataInputStream in = null;
+ Path historyFile = null;
+ String jobName =
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
+ .toString();
+ String jobhistoryDir =
+ JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+ Path histDirPath =
+ FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
+ FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
+ // read the previous history file
+ historyFile =
+ fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
+ jobName, (applicationAttemptId.getAttemptId() - 1)));
+ LOG.info("History file is at " + historyFile);
+ in = fc.open(historyFile);
+ return in;
+ }
protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Thu Nov 8 19:09:46 2012
@@ -67,7 +67,7 @@ public abstract class RMCommunicator ext
private int rmPollInterval;//millis
protected ApplicationId applicationId;
protected ApplicationAttemptId applicationAttemptId;
- private AtomicBoolean stopped;
+ private final AtomicBoolean stopped;
protected Thread allocatorThread;
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
@@ -239,7 +239,9 @@ public abstract class RMCommunicator ext
// TODO: for other exceptions
}
} catch (InterruptedException e) {
- LOG.warn("Allocated thread interrupted. Returning.");
+ if (!stopped.get()) {
+ LOG.warn("Allocated thread interrupted. Returning.");
+ }
return;
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Nov 8 19:09:46 2012
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
@@ -84,7 +86,7 @@ public class RMContainerAllocator extend
private static final Priority PRIORITY_MAP;
private Thread eventHandlingThread;
- private volatile boolean stopEventHandling;
+ private final AtomicBoolean stopped;
static {
PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
@@ -145,6 +147,7 @@ public class RMContainerAllocator extend
public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context);
+ this.stopped = new AtomicBoolean(false);
}
@Override
@@ -176,11 +179,13 @@ public class RMContainerAllocator extend
ContainerAllocatorEvent event;
- while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
+ if (!stopped.get()) {
+ LOG.error("Returning, interrupted : " + e);
+ }
return;
}
@@ -234,7 +239,10 @@ public class RMContainerAllocator extend
@Override
public void stop() {
- this.stopEventHandling = true;
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
eventHandlingThread.interrupt();
super.stop();
LOG.info("Final Stats: " + getStat());
@@ -613,7 +621,7 @@ public class RMContainerAllocator extend
eventHandler.handle(new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
// Send the diagnostics
- String diagnostics = cont.getDiagnostics();
+ String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostics));
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java Thu Nov 8 19:09:46 2012
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,10 +44,12 @@ public class TaskCleanerImpl extends Abs
private Thread eventHandlingThread;
private BlockingQueue<TaskCleanupEvent> eventQueue =
new LinkedBlockingQueue<TaskCleanupEvent>();
+ private final AtomicBoolean stopped;
public TaskCleanerImpl(AppContext context) {
super("TaskCleaner");
this.context = context;
+ this.stopped = new AtomicBoolean(false);
}
public void start() {
@@ -59,11 +62,13 @@ public class TaskCleanerImpl extends Abs
@Override
public void run() {
TaskCleanupEvent event = null;
- while (!Thread.currentThread().isInterrupted()) {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
+ if (!stopped.get()) {
+ LOG.error("Returning, interrupted : " + e);
+ }
return;
}
// the events from the queue are handled in parallel
@@ -77,6 +82,10 @@ public class TaskCleanerImpl extends Abs
}
public void stop() {
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
eventHandlingThread.interrupt();
launcherPool.shutdown();
super.stop();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Thu Nov 8 19:09:46 2012
@@ -18,14 +18,19 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -37,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Assert;
import org.junit.Test;
@@ -254,6 +260,169 @@ public class TestFetchFailure {
events = job.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Num completion events not correct", 2, events.length);
}
+
+ @Test
+ public void testFetchFailureMultipleReduces() throws Exception {
+ MRApp app = new MRApp(1, 3, false, this.getClass().getName(), true);
+ Configuration conf = new Configuration();
+ // map -> reduce -> fetch-failure -> map retry is incompatible with
+ // sequential, single-task-attempt approach in uber-AM, so disable:
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ //all maps would be running
+ Assert.assertEquals("Num tasks not correct",
+ 4, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask = it.next();
+ Task reduceTask = it.next();
+ Task reduceTask2 = it.next();
+ Task reduceTask3 = it.next();
+
+ //wait for Task state move to RUNNING
+ app.waitForState(mapTask, TaskState.RUNNING);
+ TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ // wait for map success
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ TaskAttemptCompletionEvent[] events =
+ job.getTaskAttemptCompletionEvents(0, 100);
+ Assert.assertEquals("Num completion events not correct",
+ 1, events.length);
+ Assert.assertEquals("Event status not correct",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
+
+ // wait for reduce to start running
+ app.waitForState(reduceTask, TaskState.RUNNING);
+ app.waitForState(reduceTask2, TaskState.RUNNING);
+ app.waitForState(reduceTask3, TaskState.RUNNING);
+ TaskAttempt reduceAttempt =
+ reduceTask.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+ updateStatus(app, reduceAttempt, Phase.SHUFFLE);
+
+ TaskAttempt reduceAttempt2 =
+ reduceTask2.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt2, TaskAttemptState.RUNNING);
+ updateStatus(app, reduceAttempt2, Phase.SHUFFLE);
+
+ TaskAttempt reduceAttempt3 =
+ reduceTask3.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
+ updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
+
+ //send 3 fetch failures from reduce to trigger map re execution
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+
+ //We should not re-launch the map task yet
+ assertEquals(TaskState.SUCCEEDED, mapTask.getState());
+ updateStatus(app, reduceAttempt2, Phase.REDUCE);
+ updateStatus(app, reduceAttempt3, Phase.REDUCE);
+
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+
+ //wait for map Task state move back to RUNNING
+ app.waitForState(mapTask, TaskState.RUNNING);
+
+ //map attempt must have become FAILED
+ Assert.assertEquals("Map TaskAttempt state not correct",
+ TaskAttemptState.FAILED, mapAttempt1.getState());
+
+ Assert.assertEquals("Num attempts in Map Task not correct",
+ 2, mapTask.getAttempts().size());
+
+ Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
+ atIt.next();
+ TaskAttempt mapAttempt2 = atIt.next();
+
+ app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
+ //send the done signal to the second map attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt2.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ // wait for map success
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ //send done to reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(reduceAttempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //send done to reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(reduceAttempt2.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //send done to reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(reduceAttempt3.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ //previous completion event now becomes obsolete
+ Assert.assertEquals("Event status not correct",
+ TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+
+ events = job.getTaskAttemptCompletionEvents(0, 100);
+ Assert.assertEquals("Num completion events not correct",
+ 6, events.length);
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt1.getID(), events[0].getAttemptId());
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt1.getID(), events[1].getAttemptId());
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt2.getID(), events[2].getAttemptId());
+ Assert.assertEquals("Event reduce attempt id not correct",
+ reduceAttempt.getID(), events[3].getAttemptId());
+ Assert.assertEquals("Event status not correct for map attempt1",
+ TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+ Assert.assertEquals("Event status not correct for map attempt1",
+ TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus());
+ Assert.assertEquals("Event status not correct for map attempt2",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
+ Assert.assertEquals("Event status not correct for reduce attempt1",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
+
+ TaskAttemptCompletionEvent mapEvents[] =
+ job.getMapAttemptCompletionEvents(0, 2);
+ Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
+ Assert.assertArrayEquals("Unexpected map events",
+ Arrays.copyOfRange(events, 0, 2), mapEvents);
+ mapEvents = job.getMapAttemptCompletionEvents(2, 200);
+ Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
+ Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
+ }
+
+
+ private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
+ TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
+ status.counters = new Counters();
+ status.fetchFailedMaps = new ArrayList<TaskAttemptId>();
+ status.id = attempt.getID();
+ status.mapFinishTime = 0;
+ status.outputSize = 0;
+ status.phase = phase;
+ status.progress = 0.5f;
+ status.shuffleFinishTime = 0;
+ status.sortFinishTime = 0;
+ status.stateString = "OK";
+ status.taskState = attempt.getState();
+ TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
+ status);
+ app.getContext().getEventHandler().handle(event);
+ }
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
TaskAttempt mapAttempt) {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Nov 8 19:09:46 2012
@@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.a
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
import java.io.IOException;
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnExcept
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -68,7 +65,6 @@ import org.junit.Test;
private Path stagingJobPath = new Path(stagingJobDir);
private final static RecordFactory recordFactory = RecordFactoryProvider.
getRecordFactory(null);
- private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
@Test
public void testDeletionofStaging() throws IOException {
@@ -86,9 +82,7 @@ import org.junit.Test;
jobid.setAppId(appId);
MRAppMaster appMaster = new TestMRApp(attemptId);
appMaster.init(conf);
- EventHandler<JobFinishEvent> handler =
- appMaster.createJobFinishEventHandler();
- handler.handle(new JobFinishEvent(jobid));
+ appMaster.shutDownJob();
verify(fs).delete(stagingJobPath, true);
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Thu Nov 8 19:09:46 2012
@@ -546,6 +546,105 @@ public class TestTaskAttempt{
eventHandler.internalError);
}
+ @Test
+ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+ mock(Token.class), new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+ "Task got killed"));
+ assertFalse(
+ "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
+ eventHandler.internalError);
+ }
+
+ @Test
+ public void testAppDiognosticEventOnNewTask() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+ mock(Token.class), new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+ "Task got killed"));
+ assertFalse(
+ "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
+ eventHandler.internalError);
+ }
+
+
public static class MockEventHandler implements EventHandler {
public boolean internalError;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Thu Nov 8 19:09:46 2012
@@ -418,6 +418,21 @@ public class TestTaskImpl {
killRunningTaskAttempt(getLastAttempt().getAttemptId());
}
+ @Test
+ public void testKillSuccessfulTask() {
+ LOG.info("--- START: testKillSuccesfulTask ---");
+ mockTask = createMockTask(TaskType.MAP);
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ commitTaskAttempt(getLastAttempt().getAttemptId());
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ assertTaskSucceededState();
+ mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+ assertTaskSucceededState();
+ }
+
@Test
public void testTaskProgress() {
LOG.info("--- START: testTaskProgress ---");
@@ -485,7 +500,8 @@ public class TestTaskImpl {
assertTaskSucceededState();
}
- private void runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType failEvent) {
+ private void runSpeculativeTaskAttemptSucceeds(
+ TaskEventType firstAttemptFinishEvent) {
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -502,9 +518,9 @@ public class TestTaskImpl {
// The task should now have succeeded
assertTaskSucceededState();
- // Now fail the first task attempt, after the second has succeeded
+ // Now complete the first task attempt, after the second has succeeded
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
- failEvent));
+ firstAttemptFinishEvent));
// The task should still be in the succeeded state
assertTaskSucceededState();
@@ -513,25 +529,36 @@ public class TestTaskImpl {
@Test
public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
mockTask = createMockTask(TaskType.MAP);
- runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
}
@Test
public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
mockTask = createMockTask(TaskType.REDUCE);
- runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
}
@Test
public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
mockTask = createMockTask(TaskType.MAP);
- runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
}
@Test
public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
mockTask = createMockTask(TaskType.REDUCE);
- runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
}
+ @Test
+ public void testMultipleTaskAttemptsSucceed() {
+ mockTask = createMockTask(TaskType.MAP);
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
+ }
+
+ @Test
+ public void testCommitAfterSucceeds() {
+ mockTask = createMockTask(TaskType.REDUCE);
+ runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java Thu Nov 8 19:09:46 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.WritableUtil
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.util.StringInterner;
/**
* Status information on the current state of the Map-Reduce cluster.
@@ -141,9 +142,9 @@ public class ClusterStatus implements Wr
@Override
public void readFields(DataInput in) throws IOException {
- trackerName = Text.readString(in);
- reasonForBlackListing = Text.readString(in);
- blackListReport = Text.readString(in);
+ trackerName = StringInterner.weakIntern(Text.readString(in));
+ reasonForBlackListing = StringInterner.weakIntern(Text.readString(in));
+ blackListReport = StringInterner.weakIntern(Text.readString(in));
}
@Override
@@ -429,7 +430,7 @@ public class ClusterStatus implements Wr
int numTrackerNames = in.readInt();
if (numTrackerNames > 0) {
for (int i = 0; i < numTrackerNames; i++) {
- String name = Text.readString(in);
+ String name = StringInterner.weakIntern(Text.readString(in));
activeTrackers.add(name);
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java Thu Nov 8 19:09:46 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.util.StringInterner;
/**************************************************
* A JobProfile is a MapReduce primitive. Tracks a job,
@@ -176,11 +177,11 @@ public class JobProfile implements Writa
public void readFields(DataInput in) throws IOException {
jobid.readFields(in);
- this.jobFile = Text.readString(in);
- this.url = Text.readString(in);
- this.user = Text.readString(in);
- this.name = Text.readString(in);
- this.queueName = Text.readString(in);
+ this.jobFile = StringInterner.weakIntern(Text.readString(in));
+ this.url = StringInterner.weakIntern(Text.readString(in));
+ this.user = StringInterner.weakIntern(Text.readString(in));
+ this.name = StringInterner.weakIntern(Text.readString(in));
+ this.queueName = StringInterner.weakIntern(Text.readString(in));
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Thu Nov 8 19:09:46 2012
@@ -67,6 +67,7 @@ import org.apache.hadoop.util.IndexedSor
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
/** A Map task. */
@@ -343,7 +344,7 @@ class MapTask extends Task {
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
- String className = Text.readString(inFile);
+ String className = StringInterner.weakIntern(Text.readString(inFile));
Class<T> cls;
try {
cls = (Class<T>) conf.getClassByName(className);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Thu Nov 8 19:09:46 2012
@@ -67,6 +67,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
/**
@@ -467,7 +468,7 @@ abstract public class Task implements Wr
}
public void readFields(DataInput in) throws IOException {
- jobFile = Text.readString(in);
+ jobFile = StringInterner.weakIntern(Text.readString(in));
taskId = TaskAttemptID.read(in);
partition = in.readInt();
numSlotsRequired = in.readInt();
@@ -487,7 +488,7 @@ abstract public class Task implements Wr
if (taskCleanup) {
setPhase(TaskStatus.Phase.CLEANUP);
}
- user = Text.readString(in);
+ user = StringInterner.weakIntern(Text.readString(in));
extraData.readFields(in);
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java Thu Nov 8 19:09:46 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
/**************************************************
* Describes the current status of a task. This is
@@ -477,8 +478,8 @@ public abstract class TaskStatus impleme
setProgress(in.readFloat());
this.numSlots = in.readInt();
this.runState = WritableUtils.readEnum(in, State.class);
- setDiagnosticInfo(Text.readString(in));
- setStateString(Text.readString(in));
+ setDiagnosticInfo(StringInterner.weakIntern(Text.readString(in)));
+ setStateString(StringInterner.weakIntern(Text.readString(in)));
this.phase = WritableUtils.readEnum(in, Phase.class);
this.startTime = in.readLong();
this.finishTime = in.readLong();
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java Thu Nov 8 19:09:46 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.InputFor
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringInterner;
/**
* An {@link InputSplit} that tags another InputSplit with extra data for use
@@ -114,7 +115,7 @@ class TaggedInputSplit implements Config
}
private Class<?> readClass(DataInput in) throws IOException {
- String className = Text.readString(in);
+ String className = StringInterner.weakIntern(Text.readString(in));
try {
return conf.getClassByName(className);
} catch (ClassNotFoundException e) {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java Thu Nov 8 19:09:46 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.WritableFact
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.StringInterner;
/**************************************************
* Describes the current status of a job.
@@ -456,15 +457,15 @@ public class JobStatus implements Writab
this.cleanupProgress = in.readFloat();
this.runState = WritableUtils.readEnum(in, State.class);
this.startTime = in.readLong();
- this.user = Text.readString(in);
+ this.user = StringInterner.weakIntern(Text.readString(in));
this.priority = WritableUtils.readEnum(in, JobPriority.class);
- this.schedulingInfo = Text.readString(in);
+ this.schedulingInfo = StringInterner.weakIntern(Text.readString(in));
this.finishTime = in.readLong();
this.isRetired = in.readBoolean();
- this.historyFile = Text.readString(in);
- this.jobName = Text.readString(in);
- this.trackingUrl = Text.readString(in);
- this.jobFile = Text.readString(in);
+ this.historyFile = StringInterner.weakIntern(Text.readString(in));
+ this.jobName = StringInterner.weakIntern(Text.readString(in));
+ this.trackingUrl = StringInterner.weakIntern(Text.readString(in));
+ this.jobFile = StringInterner.weakIntern(Text.readString(in));
this.isUber = in.readBoolean();
// De-serialize the job's ACLs
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Nov 8 19:09:46 2012
@@ -262,6 +262,9 @@ public interface MRJobConfig {
public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
+
+ public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
+ public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
@@ -486,6 +489,9 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;
+ public static final String MR_AM_ENV =
+ MR_AM_PREFIX + "env";
+
public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
"mapreduce.admin.map.child.java.opts";
|