hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1556097 [1/2] - in /hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Date Tue, 07 Jan 2014 01:07:40 GMT
Author: cnauroth
Date: Tue Jan  7 01:07:33 2014
New Revision: 1556097

URL: http://svn.apache.org/r1556097
Log:
Merge trunk to HDFS-4685.

Added:
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java
      - copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
      - copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
      - copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
      - copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
      - copied unchanged from r1556096, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
Modified:
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
    hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java

Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1553225-1556096

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt Tue Jan  7 01:07:33 2014
@@ -77,6 +77,9 @@ Trunk (Unreleased)
     MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
     from YARN. (Carlo Curino via cdouglas)
 
+    MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.
+    (Carlo Curino via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
@@ -193,6 +196,8 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
     with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
 
+    MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
@@ -258,6 +263,15 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5687. Fixed failure in TestYARNRunner caused by YARN-1446. (Jian He
     via vinodkv)
 
+    MAPREDUCE-5694. Fixed MR AppMaster to shutdown the LogManager so as to avoid
+    losing syslog in some conditions. (Mohammad Kamrul Islam via vinodkv)
+
+    MAPREDUCE-5685. Fixed a bug with JobContext getCacheFiles API inside the
+    WrappedReducer class. (Yi Song via vinodkv)
+
+    MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps 
+    cannot be fulfilled. (lohit via kasha)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1553225-1556096

Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1553225-1556096

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Tue Jan  7 01:07:33 2014
@@ -36,7 +36,9 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -45,8 +47,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
@@ -229,6 +231,22 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
+  public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus)
+          throws IOException, InterruptedException {
+    LOG.info("Preempted state update from " + taskAttemptID.toString());
+    // An attempt is telling us that it got preempted.
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+
+    preemptionPolicy.reportSuccessfulPreemption(attemptID);
+    taskHeartbeatHandler.progressing(attemptID);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID,
+            TaskAttemptEventType.TA_PREEMPTED));
+  }
+
+  @Override
   public void done(TaskAttemptID taskAttemptID) throws IOException {
     LOG.info("Done acknowledgement from " + taskAttemptID.toString());
 
@@ -250,6 +268,10 @@ public class TaskAttemptListenerImpl ext
 
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    // handling checkpoints
+    preemptionPolicy.handleFailedContainer(attemptID);
+
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
   }
@@ -264,6 +286,10 @@ public class TaskAttemptListenerImpl ext
 
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    // handling checkpoints
+    preemptionPolicy.handleFailedContainer(attemptID);
+
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
   }
@@ -294,12 +320,6 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
-  public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
-    LOG.info("Ping from " + taskAttemptID.toString());
-    return true;
-  }
-
-  @Override
   public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
  throws IOException {
     diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
@@ -321,11 +341,33 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
-  public boolean statusUpdate(TaskAttemptID taskAttemptID,
+  public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
       TaskStatus taskStatus) throws IOException, InterruptedException {
-    LOG.info("Status update from " + taskAttemptID.toString());
+
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    AMFeedback feedback = new AMFeedback();
+    feedback.setTaskFound(true);
+
+    // Propagating preemption to the task if TASK_PREEMPTION is enabled
+    if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
+        && preemptionPolicy.isPreempted(yarnAttemptID)) {
+      feedback.setPreemption(true);
+      LOG.info("Setting preemption bit for task: "+ yarnAttemptID
+          + " of type " + yarnAttemptID.getTaskId().getTaskType());
+    }
+
+    if (taskStatus == null) {
+      //We are using statusUpdate only as a simple ping
+      LOG.info("Ping from " + taskAttemptID.toString());
+      taskHeartbeatHandler.progressing(yarnAttemptID);
+      return feedback;
+    }
+
+    // if we are here there is an actual status update to be processed
+    LOG.info("Status update from " + taskAttemptID.toString());
+
     taskHeartbeatHandler.progressing(yarnAttemptID);
     TaskAttemptStatus taskAttemptStatus =
         new TaskAttemptStatus();
@@ -386,7 +428,7 @@ public class TaskAttemptListenerImpl ext
     context.getEventHandler().handle(
         new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
             taskAttemptStatus));
-    return true;
+    return feedback;
   }
 
   @Override
@@ -494,4 +536,18 @@ public class TaskAttemptListenerImpl ext
     return ProtocolSignature.getProtocolSignature(this, 
         protocol, clientVersion, clientMethodsHash);
   }
+
+  // task checkpoint bookeeping
+  @Override
+  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+    TaskId tid = TypeConverter.toYarn(taskId);
+    return preemptionPolicy.getCheckpointID(tid);
+  }
+
+  @Override
+  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+    TaskId tid = TypeConverter.toYarn(taskId);
+    preemptionPolicy.setCheckpointID(tid, cid);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Jan  7 01:07:33 2014
@@ -139,6 +139,7 @@ import org.apache.hadoop.yarn.security.c
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.LogManager;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -1395,6 +1396,8 @@ public class MRAppMaster extends Composi
     } catch (Throwable t) {
       LOG.fatal("Error starting MRAppMaster", t);
       System.exit(1);
+    } finally {
+      LogManager.shutdown();
     }
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java Tue Jan  7 01:07:33 2014
@@ -47,6 +47,7 @@ public enum TaskAttemptEventType {
   TA_FAILMSG,
   TA_UPDATE,
   TA_TIMED_OUT,
+  TA_PREEMPTED,
 
   //Producer:TaskCleaner
   TA_CLEANUP_DONE,

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jan  7 01:07:33 2014
@@ -304,6 +304,9 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.KILLED,
+         TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
 
      // Transitions from COMMIT_PENDING state
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
@@ -437,6 +440,7 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
+             TaskAttemptEventType.TA_PREEMPTED,
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -1874,6 +1878,27 @@ public abstract class TaskAttemptImpl im
     }
   }
 
+  private static class PreemptedTransition implements
+      SingleArcTransition<TaskAttemptImpl,TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      taskAttempt.setFinishTime();
+      taskAttempt.taskAttemptListener.unregister(
+          taskAttempt.attemptId, taskAttempt.jvmID);
+      taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+          taskAttempt.attemptId,
+          taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
+          taskAttempt.container.getContainerToken(),
+          ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId,
+          TaskEventType.T_ATTEMPT_KILLED));
+
+    }
+  }
+
   private static class CleanupContainerTransition implements
        SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Jan  7 01:07:33 2014
@@ -229,7 +229,8 @@ public class RMContainerAllocator extend
 
     int completedMaps = getJob().getCompletedMaps();
     int completedTasks = completedMaps + getJob().getCompletedReduces();
-    if (lastCompletedTasks != completedTasks) {
+    if ((lastCompletedTasks != completedTasks) ||
+          (scheduledRequests.maps.size() > 0)) {
       lastCompletedTasks = completedTasks;
       recalculateReduceSchedule = true;
     }
@@ -347,7 +348,7 @@ public class RMContainerAllocator extend
       }
       
     } else if (
-        event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+      event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
   
       LOG.info("Processing the event " + event.toString());
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java Tue Jan  7 01:07:33 2014
@@ -19,10 +19,9 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.List;
 
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -81,7 +80,7 @@ public interface AMPreemptionPolicy {
    * successfully preempted (for bookeeping, counters, etc..)
    * @param attemptID Task attempt that preempted
    */
-  public void reportSuccessfulPreemption(TaskAttemptID attemptID);
+  public void reportSuccessfulPreemption(TaskAttemptId attemptID);
 
   /**
    * Callback informing the policy of containers exiting with a failure. This
@@ -98,20 +97,20 @@ public interface AMPreemptionPolicy {
   public void handleCompletedContainer(TaskAttemptId attemptID);
 
   /**
-   * Method to retrieve the latest checkpoint for a given {@link TaskID}
+   * Method to retrieve the latest checkpoint for a given {@link TaskId}
    * @param taskId TaskID
    * @return CheckpointID associated with this task or null
    */
-  public TaskCheckpointID getCheckpointID(TaskID taskId);
+  public TaskCheckpointID getCheckpointID(TaskId taskId);
 
   /**
    * Method to store the latest {@link
    * org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link
-   * TaskID}. Assigning a null is akin to remove all previous checkpoints for
+   * TaskId}. Assigning a null is akin to remove all previous checkpoints for
    * this task.
    * @param taskId TaskID
    * @param cid Checkpoint to assign or <tt>null</tt> to remove it.
    */
-  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid);
+  public void setCheckpointID(TaskId taskId, TaskCheckpointID cid);
 
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java Tue Jan  7 01:07:33 2014
@@ -19,11 +19,10 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -89,17 +88,17 @@ public class KillAMPreemptionPolicy impl
   }
 
   @Override
-  public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+  public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
     // ignore
   }
 
   @Override
-  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+  public TaskCheckpointID getCheckpointID(TaskId taskId) {
     return null;
   }
 
   @Override
-  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+  public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
     // ignore
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java Tue Jan  7 01:07:33 2014
@@ -17,10 +17,9 @@
 */
 package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
 
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 
@@ -50,17 +49,17 @@ public class NoopAMPreemptionPolicy impl
   }
 
   @Override
-  public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+  public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
     // ignore
   }
 
   @Override
-  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+  public TaskCheckpointID getCheckpointID(TaskId taskId) {
     return null;
   }
 
   @Override
-  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+  public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
     // ignore
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Tue Jan  7 01:07:33 2014
@@ -17,26 +17,23 @@
 */
 package org.apache.hadoop.mapred;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-
-import junit.framework.Assert;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -46,21 +43,31 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.SystemClock;
+
 import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestTaskAttemptListenerImpl {
-  public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
+  public static class MockTaskAttemptListenerImpl
+      extends TaskAttemptListenerImpl {
 
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
         RMHeartbeatHandler rmHeartbeatHandler,
-        TaskHeartbeatHandler hbHandler) {
-      super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
+        TaskHeartbeatHandler hbHandler,
+        AMPreemptionPolicy policy) {
+
+      super(context, jobTokenSecretManager, rmHeartbeatHandler, policy);
       this.taskHeartbeatHandler = hbHandler;
     }
     
@@ -87,9 +94,16 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
     MockTaskAttemptListenerImpl listener = 
       new MockTaskAttemptListenerImpl(appCtx, secret,
-          rmHeartbeatHandler, hbHandler);
+          rmHeartbeatHandler, hbHandler, policy);
     Configuration conf = new Configuration();
     listener.init(conf);
     listener.start();
@@ -144,7 +158,7 @@ public class TestTaskAttemptListenerImpl
     assertNotNull(jvmid);
     try {
       JVMId.forName("jvm_001_002_m_004_006");
-      Assert.fail();
+      fail();
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
           "TaskId string : jvm_001_002_m_004_006 is not properly formed");
@@ -190,8 +204,14 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -219,7 +239,8 @@ public class TestTaskAttemptListenerImpl
         isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
             : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
     TaskAttemptCompletionEvent tce = recordFactory
         .newRecordInstance(TaskAttemptCompletionEvent.class);
     tce.setEventId(eventId);
@@ -244,8 +265,14 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -270,4 +297,88 @@ public class TestTaskAttemptListenerImpl
 
     listener.stop();
   }
+
+  @Test
+  public void testCheckpointIDTracking()
+    throws IOException, InterruptedException{
+
+    SystemClock clock = new SystemClock();
+
+    org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+        mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+    when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
+
+    AppContext appCtx = mock(AppContext.class);
+    when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+    when(appCtx.getClock()).thenReturn(clock);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
+      @Override
+      protected void registerHeartbeatHandler(Configuration conf) {
+        taskHeartbeatHandler = hbHandler;
+      }
+    };
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.TASK_PREEMPTION, true);
+    //conf.setBoolean("preemption.reduce", true);
+
+    listener.init(conf);
+    listener.start();
+
+    TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+
+    List<Path> partialOut = new ArrayList<Path>();
+    partialOut.add(new Path("/prev1"));
+    partialOut.add(new Path("/prev2"));
+
+    Counters counters = mock(Counters.class);
+    final long CBYTES = 64L * 1024 * 1024;
+    final long CTIME = 4344L;
+    final Path CLOC = new Path("/test/1");
+    Counter cbytes = mock(Counter.class);
+    when(cbytes.getValue()).thenReturn(CBYTES);
+    Counter ctime = mock(Counter.class);
+    when(ctime.getValue()).thenReturn(CTIME);
+    when(counters.findCounter(eq(EnumCounter.CHECKPOINT_BYTES)))
+        .thenReturn(cbytes);
+    when(counters.findCounter(eq(EnumCounter.CHECKPOINT_MS)))
+        .thenReturn(ctime);
+
+    // propagating a taskstatus that contains a checkpoint id
+    TaskCheckpointID incid = new TaskCheckpointID(new FSCheckpointID(
+          CLOC), partialOut, counters);
+    listener.setCheckpointID(
+        org.apache.hadoop.mapred.TaskID.downgrade(tid.getTaskID()), incid);
+
+    // and try to get it back
+    CheckpointID outcid = listener.getCheckpointID(tid.getTaskID());
+    TaskCheckpointID tcid = (TaskCheckpointID) outcid;
+    assertEquals(CBYTES, tcid.getCheckpointBytes());
+    assertEquals(CTIME, tcid.getCheckpointTime());
+    assertTrue(partialOut.containsAll(tcid.getPartialCommittedOutput()));
+    assertTrue(tcid.getPartialCommittedOutput().containsAll(partialOut));
+
+    //assert it worked
+    assert outcid == incid;
+
+    listener.stop();
+
+  }
+
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Jan  7 01:07:33 2014
@@ -1604,6 +1604,21 @@ public class TestRMContainerAllocator {
         numPendingReduces, 
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampDownReduces(anyInt());
+
+    // Test reduce ramp-down for when there are scheduled maps
+    // Since we have two scheduled Maps, rampDownReducers 
+    // should be invoked twice.
+    scheduledMaps = 2;
+    assignedReduces = 2;
+    doReturn(10 * 1024).when(allocator).getMemLimit();
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, times(2)).rampDownReduces(anyInt());
   }
 
   private static class RecalculateContainerAllocator extends MyContainerAllocator {

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jan  7 01:07:33 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.Queue
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -575,10 +576,17 @@ public class LocalJobRunner implements C
 
     // TaskUmbilicalProtocol methods
 
+    @Override
     public JvmTask getTask(JvmContext context) { return null; }
     
-    public synchronized boolean statusUpdate(TaskAttemptID taskId,
+    @Override
+    public synchronized AMFeedback statusUpdate(TaskAttemptID taskId,
         TaskStatus taskStatus) throws IOException, InterruptedException {
+      AMFeedback feedback = new AMFeedback();
+      feedback.setTaskFound(true);
+      if (null == taskStatus) {
+        return feedback;
+      }
       // Serialize as we would if distributed in order to make deep copy
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       DataOutputStream dos = new DataOutputStream(baos);
@@ -618,7 +626,7 @@ public class LocalJobRunner implements C
       }
 
       // ignore phase
-      return true;
+      return feedback;
     }
 
     /** Return the current values of the counters for this job,
@@ -654,24 +662,24 @@ public class LocalJobRunner implements C
       statusUpdate(taskid, taskStatus);
     }
 
+    @Override
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
       // Ignore for now
     }
     
+    @Override
     public void reportNextRecordRange(TaskAttemptID taskid, 
         SortedRanges.Range range) throws IOException {
       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
     }
 
-    public boolean ping(TaskAttemptID taskid) throws IOException {
-      return true;
-    }
-    
+    @Override
     public boolean canCommit(TaskAttemptID taskid) 
     throws IOException {
       return true;
     }
     
+    @Override
     public void done(TaskAttemptID taskId) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
@@ -681,11 +689,13 @@ public class LocalJobRunner implements C
       }
     }
 
+    @Override
     public synchronized void fsError(TaskAttemptID taskId, String message) 
     throws IOException {
       LOG.fatal("FSError: "+ message + "from task: " + taskId);
     }
 
+    @Override
     public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
       LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
     }
@@ -695,12 +705,30 @@ public class LocalJobRunner implements C
       LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
     }
     
+    @Override
     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
         int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
       return new MapTaskCompletionEventsUpdate(
         org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
     }
-    
+
+    @Override
+    public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      // ignore
+    }
+
+    @Override
+    public TaskCheckpointID getCheckpointID(TaskID taskId) {
+      // ignore
+      return null;
+    }
+
+    @Override
+    public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+      // ignore
+    }
+
   }
 
   public LocalJobRunner(Configuration conf) throws IOException {

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Tue Jan  7 01:07:33 2014
@@ -44,6 +44,8 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -82,12 +84,11 @@ public class TestMRWithDistributedCache 
 
   private static final Log LOG =
     LogFactory.getLog(TestMRWithDistributedCache.class);
+  
+  private static class DistributedCacheChecker {
 
-  public static class DistributedCacheChecker extends
-      Mapper<LongWritable, Text, NullWritable, NullWritable> {
-
-    @Override
-    public void setup(Context context) throws IOException {
+    public void setup(TaskInputOutputContext<?, ?, ?, ?> context)
+        throws IOException {
       Configuration conf = context.getConfiguration();
       Path[] localFiles = context.getLocalCacheFiles();
       URI[] files = context.getCacheFiles();
@@ -101,6 +102,10 @@ public class TestMRWithDistributedCache 
       TestCase.assertEquals(2, files.length);
       TestCase.assertEquals(2, archives.length);
 
+      // Check the file name
+      TestCase.assertTrue(files[0].getPath().endsWith("distributed.first"));
+      TestCase.assertTrue(files[1].getPath().endsWith("distributed.second.jar"));
+      
       // Check lengths of the files
       TestCase.assertEquals(1, fs.getFileStatus(localFiles[0]).getLen());
       TestCase.assertTrue(fs.getFileStatus(localFiles[1]).getLen() > 1);
@@ -130,8 +135,28 @@ public class TestMRWithDistributedCache 
       TestCase.assertTrue("second file should be symlinked too",
           expectedAbsentSymlinkFile.exists());
     }
+
   }
-  
+
+  public static class DistributedCacheCheckerMapper extends
+      Mapper<LongWritable, Text, NullWritable, NullWritable> {
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      new DistributedCacheChecker().setup(context);
+    }
+  }
+
+  public static class DistributedCacheCheckerReducer extends
+      Reducer<LongWritable, Text, NullWritable, NullWritable> {
+
+    @Override
+    public void setup(Context context) throws IOException {
+      new DistributedCacheChecker().setup(context);
+    }
+  }
+
   private void testWithConf(Configuration conf) throws IOException,
       InterruptedException, ClassNotFoundException, URISyntaxException {
     // Create a temporary file of length 1.
@@ -146,7 +171,8 @@ public class TestMRWithDistributedCache 
 
 
     Job job = Job.getInstance(conf);
-    job.setMapperClass(DistributedCacheChecker.class);
+    job.setMapperClass(DistributedCacheCheckerMapper.class);
+    job.setReducerClass(DistributedCacheCheckerReducer.class);
     job.setOutputFormatClass(NullOutputFormat.class);
     FileInputFormat.setInputPaths(job, first);
     // Creates the Job Configuration

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Tue Jan  7 01:07:33 2014
@@ -949,12 +949,29 @@ public class JobConf extends Configurati
     return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
   }
 
+  /**
+   * Get the user defined {@link WritableComparable} comparator for
+   * grouping keys of inputs to the combiner.
+   *
+   * @return comparator set by the user for grouping values.
+   * @see #setCombinerKeyGroupingComparator(Class) for details.
+   */
+  public RawComparator getCombinerKeyGroupingComparator() {
+    Class<? extends RawComparator> theClass = getClass(
+        JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
+    if (theClass == null) {
+      return getOutputKeyComparator();
+    }
+
+    return ReflectionUtils.newInstance(theClass, this);
+  }
+
   /** 
    * Get the user defined {@link WritableComparable} comparator for 
    * grouping keys of inputs to the reduce.
    * 
    * @return comparator set by the user for grouping values.
-   * @see #setOutputValueGroupingComparator(Class) for details.  
+   * @see #setOutputValueGroupingComparator(Class) for details.
    */
   public RawComparator getOutputValueGroupingComparator() {
     Class<? extends RawComparator> theClass = getClass(
@@ -966,6 +983,37 @@ public class JobConf extends Configurati
     return ReflectionUtils.newInstance(theClass, this);
   }
 
+  /**
+   * Set the user defined {@link RawComparator} comparator for
+   * grouping keys in the input to the combiner.
+   * <p/>
+   * <p>This comparator should be provided if the equivalence rules for keys
+   * for sorting the intermediates are different from those for grouping keys
+   * before each call to
+   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
+   * <p/>
+   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
+   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
+   * <p/>
+   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
+   * how keys are sorted, this can be used in conjunction to simulate
+   * <i>secondary sort on values</i>.</p>
+   * <p/>
+   * <p><i>Note</i>: This is not a guarantee of the combiner sort being
+   * <i>stable</i> in any sense. (In any case, with the order of available
+   * map-outputs to the combiner being non-deterministic, it wouldn't make
+   * that much sense.)</p>
+   *
+   * @param theClass the comparator class to be used for grouping keys for the
+   * combiner. It should implement <code>RawComparator</code>.
+   * @see #setOutputKeyComparatorClass(Class)
+   */
+  public void setCombinerKeyGroupingComparator(
+      Class<? extends RawComparator> theClass) {
+    setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
+        theClass, RawComparator.class);
+  }
+
   /** 
    * Set the user defined {@link RawComparator} comparator for 
    * grouping keys in the input to the reduce.
@@ -989,7 +1037,8 @@ public class JobConf extends Configurati
    * 
    * @param theClass the comparator class to be used for grouping keys. 
    *                 It should implement <code>RawComparator</code>.
-   * @see #setOutputKeyComparatorClass(Class)                 
+   * @see #setOutputKeyComparatorClass(Class)
+   * @see #setCombinerKeyGroupingComparator(Class)
    */
   public void setOutputValueGroupingComparator(
       Class<? extends RawComparator> theClass) {

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue Jan  7 01:07:33 2014
@@ -187,6 +187,7 @@ abstract public class Task implements Wr
   protected SecretKey tokenSecret;
   protected SecretKey shuffleSecret;
   protected GcTimeUpdater gcUpdater;
+  final AtomicBoolean mustPreempt = new AtomicBoolean(false);
 
   ////////////////////////////////////////////
   // Constructors
@@ -711,6 +712,7 @@ abstract public class Task implements Wr
         }
         try {
           boolean taskFound = true; // whether TT knows about this task
+          AMFeedback amFeedback = null;
           // sleep for a bit
           synchronized(lock) {
             if (taskDone.get()) {
@@ -728,12 +730,14 @@ abstract public class Task implements Wr
             taskStatus.statusUpdate(taskProgress.get(),
                                     taskProgress.toString(), 
                                     counters);
-            taskFound = umbilical.statusUpdate(taskId, taskStatus);
+            amFeedback = umbilical.statusUpdate(taskId, taskStatus);
+            taskFound = amFeedback.getTaskFound();
             taskStatus.clearStatus();
           }
           else {
             // send ping 
-            taskFound = umbilical.ping(taskId);
+            amFeedback = umbilical.statusUpdate(taskId, null);
+            taskFound = amFeedback.getTaskFound();
           }
 
           // if Task Tracker is not aware of our task ID (probably because it died and 
@@ -744,6 +748,17 @@ abstract public class Task implements Wr
             System.exit(66);
           }
 
+          // Set a flag that says we should preempt this is read by
+          // ReduceTasks in places of the execution where it is
+          // safe/easy to preempt
+          boolean lastPreempt = mustPreempt.get();
+          mustPreempt.set(mustPreempt.get() || amFeedback.getPreemption());
+
+          if (lastPreempt ^ mustPreempt.get()) {
+            LOG.info("PREEMPTION TASK: setting mustPreempt to " +
+                mustPreempt.get() + " given " + amFeedback.getPreemption() +
+                " for "+ taskId + " task status: " +taskStatus.getPhase());
+          }
           sendProgress = resetProgressFlag(); 
           remainingRetries = MAX_RETRIES;
         } 
@@ -992,10 +1007,17 @@ abstract public class Task implements Wr
   public void done(TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, InterruptedException {
-    LOG.info("Task:" + taskId + " is done."
-             + " And is in the process of committing");
     updateCounters();
-
+    if (taskStatus.getRunState() == TaskStatus.State.PREEMPTED ) {
+      // If we are preempted, do no output promotion; signal done and exit
+      committer.commitTask(taskContext);
+      umbilical.preempted(taskId, taskStatus);
+      taskDone.set(true);
+      reporter.stopCommunicationThread();
+      return;
+    }
+    LOG.info("Task:" + taskId + " is done."
+        + " And is in the process of committing");
     boolean commitRequired = isCommitRequired();
     if (commitRequired) {
       int retries = MAX_RETRIES;
@@ -1054,7 +1076,7 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+        if (!umbilical.statusUpdate(getTaskID(), taskStatus).getTaskFound()) {
           LOG.warn("Parent died.  Exiting "+taskId);
           System.exit(66);
         }
@@ -1098,8 +1120,8 @@ abstract public class Task implements Wr
     if (isMapTask() && conf.getNumReduceTasks() > 0) {
       try {
         Path mapOutput =  mapOutputFile.getOutputFile();
-        FileSystem localFS = FileSystem.getLocal(conf);
-        return localFS.getFileStatus(mapOutput).getLen();
+        FileSystem fs = mapOutput.getFileSystem(conf);
+        return fs.getFileStatus(mapOutput).getLen();
       } catch (IOException e) {
         LOG.warn ("Could not find output size " , e);
       }
@@ -1553,7 +1575,8 @@ abstract public class Task implements Wr
       combinerClass = cls;
       keyClass = (Class<K>) job.getMapOutputKeyClass();
       valueClass = (Class<V>) job.getMapOutputValueClass();
-      comparator = (RawComparator<K>) job.getOutputKeyComparator();
+      comparator = (RawComparator<K>)
+          job.getCombinerKeyGroupingComparator();
     }
 
     @SuppressWarnings("unchecked")
@@ -1602,7 +1625,7 @@ abstract public class Task implements Wr
       this.taskId = taskId;
       keyClass = (Class<K>) context.getMapOutputKeyClass();
       valueClass = (Class<V>) context.getMapOutputValueClass();
-      comparator = (RawComparator<K>) context.getSortComparator();
+      comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
       this.committer = committer;
     }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java Tue Jan  7 01:07:33 2014
@@ -51,7 +51,7 @@ public abstract class TaskStatus impleme
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
+                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN, PREEMPTED}
     
   private final TaskAttemptID taskid;
   private float progress;

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Jan  7 01:07:33 2014
@@ -24,6 +24,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -64,9 +67,10 @@ public interface TaskUmbilicalProtocol e
    * Version 17 Modified TaskID to be aware of the new TaskTypes
    * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * Version 19 Added fatalError for child to communicate fatal errors to TT
+   * Version 20 Added methods to manage checkpoints
    * */
 
-  public static final long versionID = 19L;
+  public static final long versionID = 20L;
   
   /**
    * Called when a child task process starts, to get its task.
@@ -78,7 +82,8 @@ public interface TaskUmbilicalProtocol e
   JvmTask getTask(JvmContext context) throws IOException;
   
   /**
-   * Report child's progress to parent.
+   * Report child's progress to parent. Also invoked to report still alive (used
+   * to be in ping). It reports an AMFeedback used to propagate preemption requests.
    * 
    * @param taskId task-id of the child
    * @param taskStatus status of the child
@@ -86,7 +91,7 @@ public interface TaskUmbilicalProtocol e
    * @throws InterruptedException
    * @return True if the task is known
    */
-  boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
+  AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
   throws IOException, InterruptedException;
   
   /** Report error messages back to parent.  Calls should be sparing, since all
@@ -105,11 +110,6 @@ public interface TaskUmbilicalProtocol e
   void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) 
     throws IOException;
 
-  /** Periodically called by child to check if parent is still alive. 
-   * @return True if the task is known
-   */
-  boolean ping(TaskAttemptID taskid) throws IOException;
-
   /** Report that the task is successfully completed.  Failure is assumed if
    * the task process exits without calling this.
    * @param taskid task's id
@@ -161,4 +161,33 @@ public interface TaskUmbilicalProtocol e
                                                        TaskAttemptID id) 
   throws IOException;
 
+  /**
+   * Report to the AM that the task has been succesfully preempted.
+   *
+   * @param taskId task's id
+   * @param taskStatus status of the child
+   * @throws IOException
+   */
+  void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+      throws IOException, InterruptedException;
+
+  /**
+   * Return the latest CheckpointID for the given TaskID. This provides
+   * the task with a way to locate the checkpointed data and restart from
+   * that point in the computation.
+   *
+   * @param taskID task's id
+   * @return the most recent checkpoint (if any) for this task
+   * @throws IOException
+   */
+  TaskCheckpointID getCheckpointID(TaskID taskID);
+
+  /**
+   * Send a CheckpointID for a given TaskID to be stored in the AM,
+   * to later restart a task from this checkpoint.
+   * @param tid
+   * @param cid
+   */
+  void setCheckpointID(TaskID tid, TaskCheckpointID cid);
+
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Tue Jan  7 01:07:33 2014
@@ -949,10 +949,26 @@ public class Job extends JobContextImpl 
   }
 
   /**
+   * Define the comparator that controls which keys are grouped together
+   * for a single call to combiner,
+   * {@link Reducer#reduce(Object, Iterable,
+   * org.apache.hadoop.mapreduce.Reducer.Context)}
+   *
+   * @param cls the raw comparator to use
+   * @throws IllegalStateException if the job is submitted
+   */
+  public void setCombinerKeyGroupingComparatorClass(
+      Class<? extends RawComparator> cls) throws IllegalStateException {
+    ensureState(JobState.DEFINE);
+    conf.setCombinerKeyGroupingComparator(cls);
+  }
+
+  /**
    * Define the comparator that controls how the keys are sorted before they
    * are passed to the {@link Reducer}.
    * @param cls the raw comparator
    * @throws IllegalStateException if the job is submitted
+   * @see #setCombinerKeyGroupingComparatorClass(Class)
    */
   public void setSortComparatorClass(Class<? extends RawComparator> cls
                                      ) throws IllegalStateException {
@@ -967,6 +983,7 @@ public class Job extends JobContextImpl 
    *                       org.apache.hadoop.mapreduce.Reducer.Context)}
    * @param cls the raw comparator to use
    * @throws IllegalStateException if the job is submitted
+   * @see #setCombinerKeyGroupingComparatorClass(Class)
    */
   public void setGroupingComparatorClass(Class<? extends RawComparator> cls
                                          ) throws IllegalStateException {

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Tue Jan  7 01:07:33 2014
@@ -167,13 +167,23 @@ public interface JobContext extends MRJo
    */
   public String getJar();
 
-  /** 
-   * Get the user defined {@link RawComparator} comparator for 
-   * grouping keys of inputs to the reduce.
-   * 
+  /**
+   * Get the user defined {@link RawComparator} comparator for
+   * grouping keys of inputs to the combiner.
+   *
    * @return comparator set by the user for grouping values.
-   * @see Job#setGroupingComparatorClass(Class) for details.  
+   * @see Job#setCombinerKeyGroupingComparatorClass(Class)
    */
+  public RawComparator<?> getCombinerKeyGroupingComparator();
+
+    /**
+     * Get the user defined {@link RawComparator} comparator for
+     * grouping keys of inputs to the reduce.
+     *
+     * @return comparator set by the user for grouping values.
+     * @see Job#setGroupingComparatorClass(Class)
+     * @see #getCombinerKeyGroupingComparator()
+     */
   public RawComparator<?> getGroupingComparator();
   
   /**

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Jan  7 01:07:33 2014
@@ -93,6 +93,8 @@ public interface MRJobConfig {
 
   public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
 
+  public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class";
+
   public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
 
   public static final String WORKING_DIR = "mapreduce.job.working.dir";

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java Tue Jan  7 01:07:33 2014
@@ -34,37 +34,31 @@ import org.apache.hadoop.mapred.Counters
  * cost of checkpoints and other counters. This is sent by the task to the AM
  * to be stored and provided to the next execution of the same task.
  */
-public class TaskCheckpointID implements CheckpointID{
+public class TaskCheckpointID implements CheckpointID {
 
-  FSCheckpointID rawId;
-  private List<Path> partialOutput;
-  private Counters counters;
+  final FSCheckpointID rawId;
+  private final List<Path> partialOutput;
+  private final Counters counters;
 
   public TaskCheckpointID() {
-    this.rawId = new FSCheckpointID();
-    this.partialOutput = new ArrayList<Path>();
+    this(new FSCheckpointID(), new ArrayList<Path>(), new Counters());
   }
 
   public TaskCheckpointID(FSCheckpointID rawId, List<Path> partialOutput,
           Counters counters) {
     this.rawId = rawId;
     this.counters = counters;
-    if(partialOutput == null)
-      this.partialOutput = new ArrayList<Path>();
-    else
-      this.partialOutput = partialOutput;
+    this.partialOutput = null == partialOutput
+      ? new ArrayList<Path>()
+      : partialOutput;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     counters.write(out);
-    if (partialOutput == null) {
-      WritableUtils.writeVLong(out, 0L);
-    } else {
-      WritableUtils.writeVLong(out, partialOutput.size());
-      for(Path p:partialOutput){
-        Text.writeString(out, p.toString());
-      }
+    WritableUtils.writeVLong(out, partialOutput.size());
+    for (Path p : partialOutput) {
+      Text.writeString(out, p.toString());
     }
     rawId.write(out);
   }
@@ -74,21 +68,22 @@ public class TaskCheckpointID implements
     partialOutput.clear();
     counters.readFields(in);
     long numPout = WritableUtils.readVLong(in);
-    for(int i=0;i<numPout;i++)
+    for (int i = 0; i < numPout; i++) {
       partialOutput.add(new Path(Text.readString(in)));
+    }
     rawId.readFields(in);
   }
 
   @Override
-  public boolean equals(Object other){
+  public boolean equals(Object other) {
     if (other instanceof TaskCheckpointID){
-      return this.rawId.equals(((TaskCheckpointID)other).rawId) &&
-             this.counters.equals(((TaskCheckpointID) other).counters) &&
-             this.partialOutput.containsAll(((TaskCheckpointID) other).partialOutput) &&
-             ((TaskCheckpointID) other).partialOutput.containsAll(this.partialOutput);
-    } else {
-      return false;
+      TaskCheckpointID o = (TaskCheckpointID) other;
+      return rawId.equals(o.rawId) &&
+             counters.equals(o.counters) &&
+             partialOutput.containsAll(o.partialOutput) &&
+             o.partialOutput.containsAll(partialOutput);
     }
+    return false;
   }
 
   @Override
@@ -110,7 +105,7 @@ public class TaskCheckpointID implements
     return counters.findCounter(EnumCounter.CHECKPOINT_MS).getValue();
   }
 
-  public String toString(){
+  public String toString() {
     return rawId.toString() + " counters:" + counters;
 
   }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Tue Jan  7 01:07:33 2014
@@ -167,6 +167,11 @@ class ChainMapContextImpl<KEYIN, VALUEIN
   }
 
   @Override
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return base.getCombinerKeyGroupingComparator();
+  }
+
+  @Override
   public RawComparator<?> getGroupingComparator() {
     return base.getGroupingComparator();
   }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Tue Jan  7 01:07:33 2014
@@ -160,6 +160,11 @@ class ChainReduceContextImpl<KEYIN, VALU
   }
 
   @Override
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return base.getCombinerKeyGroupingComparator();
+  }
+
+  @Override
   public RawComparator<?> getGroupingComparator() {
     return base.getGroupingComparator();
   }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Tue Jan  7 01:07:33 2014
@@ -169,6 +169,11 @@ public class WrappedMapper<KEYIN, VALUEI
     }
 
     @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return mapContext.getCombinerKeyGroupingComparator();
+    }
+
+    @Override
     public RawComparator<?> getGroupingComparator() {
       return mapContext.getGroupingComparator();
     }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Tue Jan  7 01:07:33 2014
@@ -137,7 +137,7 @@ public class WrappedReducer<KEYIN, VALUE
 
     @Override
     public URI[] getCacheFiles() throws IOException {
-      return reduceContext.getCacheArchives();
+      return reduceContext.getCacheFiles();
     }
 
     @Override
@@ -162,6 +162,11 @@ public class WrappedReducer<KEYIN, VALUE
     }
 
     @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return reduceContext.getCombinerKeyGroupingComparator();
+    }
+
+    @Override
     public RawComparator<?> getGroupingComparator() {
       return reduceContext.getGroupingComparator();
     }

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Tue Jan  7 01:07:33 2014
@@ -252,6 +252,17 @@ public class JobContextImpl implements J
     return conf.getJar();
   }
 
+  /**
+   * Get the user defined {@link RawComparator} comparator for
+   * grouping keys of inputs to the combiner.
+   *
+   * @return comparator set by the user for grouping values.
+   * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
+   */
+  public RawComparator<?> getCombinerKeyGroupingComparator() {
+    return conf.getCombinerKeyGroupingComparator();
+  }
+
   /** 
    * Get the user defined {@link RawComparator} comparator for 
    * grouping keys of inputs to the reduce.

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Jan  7 01:07:33 2014
@@ -582,7 +582,7 @@ public class MergeManagerImpl<K, V> impl
     Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
     Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
     RawComparator<K> comparator = 
-      (RawComparator<K>)job.getOutputKeyComparator();
+      (RawComparator<K>)job.getCombinerKeyGroupingComparator();
     try {
       CombineValuesIterator values = new CombineValuesIterator(
           kvIter, comparator, keyClass, valClass, job, Reporter.NULL,

Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1553225-1556096

Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1556097&r1=1556096&r2=1556097&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Tue Jan  7 01:07:33 2014
@@ -88,6 +88,8 @@ import org.apache.hadoop.yarn.util.Recor
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This module is responsible for talking to the
  * JobClient (user facing).
@@ -142,7 +144,8 @@ public class HistoryClientService extend
     super.serviceStart();
   }
 
-  private void initializeWebApp(Configuration conf) {
+  @VisibleForTesting
+  protected void initializeWebApp(Configuration conf) {
     webApp = new HsWebApp(history);
     InetSocketAddress bindAddress = MRWebAppUtil.getJHSWebBindAddress(conf);
     // NOTE: there should be a .at(InetSocketAddress)



Mime
View raw message