hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1555021 [1/2] - in /hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Date Fri, 03 Jan 2014 07:27:11 GMT
Author: szetszwo
Date: Fri Jan  3 07:26:52 2014
New Revision: 1555021

URL: http://svn.apache.org/r1555021
Log:
Merge r1550130 through r1555020 from trunk.

Added:
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/
      - copied from r1555020, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
      - copied unchanged from r1555020, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
      - copied unchanged from r1555020, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/
      - copied from r1555020, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/
      - copied from r1555020, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/
Modified:
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksPage.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskAttemptInfo.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskReport.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskReportPBImpl.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestBlocks.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
    hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java

Propchange: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project:r1513717-1550362
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1550130-1555020

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/CHANGES.txt?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/CHANGES.txt Fri Jan  3 07:26:52 2014
@@ -71,6 +71,15 @@ Trunk (Unreleased)
     MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing.
     (Srikanth Sundarrajan via amareshwari)
 
+    MAPREDUCE-5197. Add a service for checkpointing task state.
+    (Carlo Curino via cdouglas)
+
+    MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
+    from YARN. (Carlo Curino via cdouglas)
+
+    MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.
+    (Carlo Curino via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
@@ -181,6 +190,12 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5052. Job History UI and web services confusing job start time and
     job submit time (Chen He via jeagles)
 
+    MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due
+    to speculative execution (Gera Shegalov via Sandy Ryza)
+
+    MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
+    with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
@@ -237,6 +252,21 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits
     (jlowe)
 
+    MAPREDUCE-5623. TestJobCleanup fails because of RejectedExecutionException
+    and NPE. (jlowe)
+
+    MAPREDUCE-5679. TestJobHistoryParsing has race condition (Liyin Liang via
+    jlowe)
+
+    MAPREDUCE-5687. Fixed failure in TestYARNRunner caused by YARN-1446. (Jian He
+    via vinodkv)
+
+    MAPREDUCE-5694. Fixed MR AppMaster to shutdown the LogManager so as to avoid
+    losing syslog in some conditions. (Mohammad Kamrul Islam via vinodkv)
+
+    MAPREDUCE-5685. Fixed a bug with JobContext getCacheFiles API inside the
+    WrappedReducer class. (Yi Song via vinodkv)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -290,6 +320,9 @@ Release 2.3.0 - UNRELEASED
     event: TA_TOO_MANY_FETCH_FAILURE at KILLED for TaskAttemptImpl (Gera
     Shegalov via jlowe)
 
+    MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
+    (Chuan Liu via cnauroth)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt:r1513717-1550362
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1550130-1555020

Propchange: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf:r1513717-1550362
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1550130-1555020

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Jan  3 07:26:52 2014
@@ -36,7 +36,9 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -45,9 +47,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -84,14 +87,17 @@ public class TaskAttemptListenerImpl ext
       .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
   
   private JobTokenSecretManager jobTokenSecretManager = null;
+  private AMPreemptionPolicy preemptionPolicy;
   
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager,
-      RMHeartbeatHandler rmHeartbeatHandler) {
+      RMHeartbeatHandler rmHeartbeatHandler,
+      AMPreemptionPolicy preemptionPolicy) {
     super(TaskAttemptListenerImpl.class.getName());
     this.context = context;
     this.jobTokenSecretManager = jobTokenSecretManager;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
+    this.preemptionPolicy = preemptionPolicy;
   }
 
   @Override
@@ -225,6 +231,22 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
+  public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus)
+          throws IOException, InterruptedException {
+    LOG.info("Preempted state update from " + taskAttemptID.toString());
+    // An attempt is telling us that it got preempted.
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+
+    preemptionPolicy.reportSuccessfulPreemption(attemptID);
+    taskHeartbeatHandler.progressing(attemptID);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID,
+            TaskAttemptEventType.TA_PREEMPTED));
+  }
+
+  @Override
   public void done(TaskAttemptID taskAttemptID) throws IOException {
     LOG.info("Done acknowledgement from " + taskAttemptID.toString());
 
@@ -246,6 +268,10 @@ public class TaskAttemptListenerImpl ext
 
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    // handling checkpoints
+    preemptionPolicy.handleFailedContainer(attemptID);
+
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
   }
@@ -260,6 +286,10 @@ public class TaskAttemptListenerImpl ext
 
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    // handling checkpoints
+    preemptionPolicy.handleFailedContainer(attemptID);
+
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
   }
@@ -290,12 +320,6 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
-  public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
-    LOG.info("Ping from " + taskAttemptID.toString());
-    return true;
-  }
-
-  @Override
   public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
  throws IOException {
     diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
@@ -317,11 +341,33 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
-  public boolean statusUpdate(TaskAttemptID taskAttemptID,
+  public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
       TaskStatus taskStatus) throws IOException, InterruptedException {
-    LOG.info("Status update from " + taskAttemptID.toString());
+
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    AMFeedback feedback = new AMFeedback();
+    feedback.setTaskFound(true);
+
+    // Propagating preemption to the task if TASK_PREEMPTION is enabled
+    if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
+        && preemptionPolicy.isPreempted(yarnAttemptID)) {
+      feedback.setPreemption(true);
+      LOG.info("Setting preemption bit for task: "+ yarnAttemptID
+          + " of type " + yarnAttemptID.getTaskId().getTaskType());
+    }
+
+    if (taskStatus == null) {
+      //We are using statusUpdate only as a simple ping
+      LOG.info("Ping from " + taskAttemptID.toString());
+      taskHeartbeatHandler.progressing(yarnAttemptID);
+      return feedback;
+    }
+
+    // if we are here there is an actual status update to be processed
+    LOG.info("Status update from " + taskAttemptID.toString());
+
     taskHeartbeatHandler.progressing(yarnAttemptID);
     TaskAttemptStatus taskAttemptStatus =
         new TaskAttemptStatus();
@@ -382,7 +428,7 @@ public class TaskAttemptListenerImpl ext
     context.getEventHandler().handle(
         new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
             taskAttemptStatus));
-    return true;
+    return feedback;
   }
 
   @Override
@@ -490,4 +536,18 @@ public class TaskAttemptListenerImpl ext
     return ProtocolSignature.getProtocolSignature(this, 
         protocol, clientVersion, clientMethodsHash);
   }
+
+  // task checkpoint bookeeping
+  @Override
+  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+    TaskId tid = TypeConverter.toYarn(taskId);
+    return preemptionPolicy.getCheckpointID(tid);
+  }
+
+  @Override
+  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+    TaskId tid = TypeConverter.toYarn(taskId);
+    preemptionPolicy.setCheckpointID(tid, cid);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Jan  3 07:26:52 2014
@@ -102,6 +102,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@@ -137,6 +139,7 @@ import org.apache.hadoop.yarn.security.c
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.LogManager;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -188,8 +191,8 @@ public class MRAppMaster extends Composi
   private ContainerLauncher containerLauncher;
   private EventHandler<CommitterEvent> committerEventHandler;
   private Speculator speculator;
-  private TaskAttemptListener taskAttemptListener;
-  private JobTokenSecretManager jobTokenSecretManager =
+  protected TaskAttemptListener taskAttemptListener;
+  protected JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
   private JobId jobId;
   private boolean newApiCommitter;
@@ -197,6 +200,7 @@ public class MRAppMaster extends Composi
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
+  private AMPreemptionPolicy preemptionPolicy;
 
   private Job job;
   private Credentials jobCredentials = new Credentials(); // Filled during init
@@ -383,8 +387,12 @@ public class MRAppMaster extends Composi
       committerEventHandler = createCommitterEventHandler(context, committer);
       addIfService(committerEventHandler);
 
+      //policy handling preemption requests from RM
+      preemptionPolicy = createPreemptionPolicy(conf);
+      preemptionPolicy.init(context);
+
       //service to handle requests to TaskUmbilicalProtocol
-      taskAttemptListener = createTaskAttemptListener(context);
+      taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
       addIfService(taskAttemptListener);
 
       //service to log job history events
@@ -475,6 +483,12 @@ public class MRAppMaster extends Composi
     return committer;
   }
 
+  protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
+    return ReflectionUtils.newInstance(conf.getClass(
+          MRJobConfig.MR_AM_PREEMPTION_POLICY,
+          NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf);
+  }
+
   protected boolean keepJobFiles(JobConf conf) {
     return (conf.getKeepTaskFilesPattern() != null || conf
         .getKeepFailedTaskFiles());
@@ -692,10 +706,11 @@ public class MRAppMaster extends Composi
     }
   }
 
-  protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+  protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+      AMPreemptionPolicy preemptionPolicy) {
     TaskAttemptListener lis =
         new TaskAttemptListenerImpl(context, jobTokenSecretManager,
-            getRMHeartbeatHandler());
+            getRMHeartbeatHandler(), preemptionPolicy);
     return lis;
   }
 
@@ -805,7 +820,7 @@ public class MRAppMaster extends Composi
             , containerID);
       } else {
         this.containerAllocator = new RMContainerAllocator(
-            this.clientService, this.context);
+            this.clientService, this.context, preemptionPolicy);
       }
       ((Service)this.containerAllocator).init(getConfig());
       ((Service)this.containerAllocator).start();
@@ -1381,6 +1396,8 @@ public class MRAppMaster extends Composi
     } catch (Throwable t) {
       LOG.fatal("Error starting MRAppMaster", t);
       System.exit(1);
+    } finally {
+      LogManager.shutdown();
     }
   }
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java Fri Jan  3 07:26:52 2014
@@ -47,6 +47,7 @@ public enum TaskAttemptEventType {
   TA_FAILMSG,
   TA_UPDATE,
   TA_TIMED_OUT,
+  TA_PREEMPTED,
 
   //Producer:TaskCleaner
   TA_CLEANUP_DONE,

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Jan  3 07:26:52 2014
@@ -304,6 +304,9 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.KILLED,
+         TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
 
      // Transitions from COMMIT_PENDING state
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
@@ -437,6 +440,7 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
+             TaskAttemptEventType.TA_PREEMPTED,
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -1552,6 +1556,12 @@ public abstract class TaskAttemptImpl im
         TaskAttemptEvent event) {
       //set the finish time
       taskAttempt.setFinishTime();
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
       //send the deallocate event to ContainerAllocator
       taskAttempt.eventHandler.handle(
           new ContainerAllocatorEvent(taskAttempt.attemptId,
@@ -1855,6 +1865,12 @@ public abstract class TaskAttemptImpl im
         LOG.debug("Not generating HistoryFinish event since start event not " +
             "generated for taskAttempt: " + taskAttempt.getID());
       }
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
 //      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
@@ -1862,6 +1878,27 @@ public abstract class TaskAttemptImpl im
     }
   }
 
+  private static class PreemptedTransition implements
+      SingleArcTransition<TaskAttemptImpl,TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      taskAttempt.setFinishTime();
+      taskAttempt.taskAttemptListener.unregister(
+          taskAttempt.attemptId, taskAttempt.jvmID);
+      taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+          taskAttempt.attemptId,
+          taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
+          taskAttempt.container.getContainerToken(),
+          ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId,
+          TaskEventType.T_ATTEMPT_KILLED));
+
+    }
+  }
+
   private static class CleanupContainerTransition implements
        SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")
@@ -1872,6 +1909,12 @@ public abstract class TaskAttemptImpl im
       // for it
       taskAttempt.taskAttemptListener.unregister(
           taskAttempt.attemptId, taskAttempt.jvmID);
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
       taskAttempt.reportedStatus.progress = 1.0f;
       taskAttempt.updateProgressSplits();
       //send the cleanup event to containerLauncher

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Jan  3 07:26:52 2014
@@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -100,6 +101,7 @@ import com.google.common.annotations.Vis
 public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+  private static final String SPECULATION = "Speculation: ";
 
   protected final JobConf conf;
   protected final Path jobFile;
@@ -374,11 +376,15 @@ public abstract class TaskImpl implement
     TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
     readLock.lock();
     try {
+      TaskAttempt bestAttempt = selectBestAttempt();
       report.setTaskId(taskId);
       report.setStartTime(getLaunchTime());
       report.setFinishTime(getFinishTime());
       report.setTaskState(getState());
-      report.setProgress(getProgress());
+      report.setProgress(bestAttempt == null ? 0f : bestAttempt.getProgress());
+      report.setStatus(bestAttempt == null
+          ? ""
+          : bestAttempt.getReport().getStateString());
 
       for (TaskAttempt attempt : attempts.values()) {
         if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
@@ -398,7 +404,9 @@ public abstract class TaskImpl implement
 
       // Add a copy of counters as the last step so that their lifetime on heap
       // is as small as possible.
-      report.setCounters(TypeConverter.toYarn(getCounters()));
+      report.setCounters(TypeConverter.toYarn(bestAttempt == null
+          ? TaskAttemptImpl.EMPTY_COUNTERS
+          : bestAttempt.getCounters()));
 
       return report;
     } finally {
@@ -906,8 +914,8 @@ public abstract class TaskImpl implement
         LOG.info(task.commitAttempt
             + " already given a go for committing the task output, so killing "
             + attemptID);
-        task.eventHandler.handle(new TaskAttemptEvent(
-            attemptID, TaskAttemptEventType.TA_KILL));
+        task.eventHandler.handle(new TaskAttemptKillEvent(attemptID,
+            SPECULATION + task.commitAttempt + " committed first!"));
       }
     }
   }
@@ -932,9 +940,8 @@ public abstract class TaskImpl implement
             //  other reasons.
             !attempt.isFinished()) {
           LOG.info("Issuing kill to other attempt " + attempt.getID());
-          task.eventHandler.handle(
-              new TaskAttemptEvent(attempt.getID(), 
-                  TaskAttemptEventType.TA_KILL));
+          task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(),
+              SPECULATION + task.successfulAttempt + " succeeded first!"));
         }
       }
       task.finished(TaskStateInternal.SUCCEEDED);
@@ -1199,8 +1206,7 @@ public abstract class TaskImpl implement
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
     if (attempt != null && !attempt.isFinished()) {
       eventHandler.handle(
-          new TaskAttemptEvent(attempt.getID(),
-              TaskAttemptEventType.TA_KILL));
+          new TaskAttemptKillEvent(attempt.getID(), logMsg));
     }
   }
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Jan  3 07:26:52 2014
@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -147,13 +149,17 @@ public class RMContainerAllocator extend
   private long retryInterval;
   private long retrystartTime;
 
+  private final AMPreemptionPolicy preemptionPolicy;
+
   BlockingQueue<ContainerAllocatorEvent> eventQueue
     = new LinkedBlockingQueue<ContainerAllocatorEvent>();
 
   private ScheduleStats scheduleStats = new ScheduleStats();
 
-  public RMContainerAllocator(ClientService clientService, AppContext context) {
+  public RMContainerAllocator(ClientService clientService, AppContext context,
+      AMPreemptionPolicy preemptionPolicy) {
     super(clientService, context);
+    this.preemptionPolicy = preemptionPolicy;
     this.stopped = new AtomicBoolean(false);
   }
 
@@ -341,7 +347,7 @@ public class RMContainerAllocator extend
       }
       
     } else if (
-        event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+      event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
   
       LOG.info("Processing the event " + event.toString());
 
@@ -361,11 +367,15 @@ public class RMContainerAllocator extend
         LOG.error("Could not deallocate container for task attemptId " + 
             aId);
       }
+      preemptionPolicy.handleCompletedContainer(event.getAttemptID());
     } else if (
         event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
       ContainerFailedEvent fEv = (ContainerFailedEvent) event;
       String host = getHost(fEv.getContMgrAddress());
       containerFailedOnHost(host);
+      // propagate failures to preemption policy to discard checkpoints for
+      // failed tasks
+      preemptionPolicy.handleFailedContainer(event.getAttemptID());
     }
   }
 
@@ -399,7 +409,7 @@ public class RMContainerAllocator extend
         }
         scheduledRequests.reduces.clear();
         
-        //preempt for making space for atleast one map
+        //preempt for making space for at least one map
         int premeptionLimit = Math.max(mapResourceReqt, 
             (int) (maxReducePreemptionLimit * memLimit));
         
@@ -409,7 +419,7 @@ public class RMContainerAllocator extend
         int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
         toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
         
-        LOG.info("Going to preempt " + toPreempt);
+        LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
         assignedRequests.preemptReduce(toPreempt);
       }
     }
@@ -595,6 +605,14 @@ public class RMContainerAllocator extend
     }
     
     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
+
+    // propagate preemption requests
+    final PreemptionMessage preemptReq = response.getPreemptionMessage();
+    if (preemptReq != null) {
+      preemptionPolicy.preempt(
+          new PreemptionContext(assignedRequests), preemptReq);
+    }
+
     if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
       //something changed
       recalculateReduceSchedule = true;
@@ -630,7 +648,9 @@ public class RMContainerAllocator extend
         String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
             diagnostics));
-      }      
+
+        preemptionPolicy.handleCompletedContainer(attemptID);
+      }
     }
     return newContainers;
   }
@@ -1232,4 +1252,27 @@ public class RMContainerAllocator extend
         " RackLocal:" + rackLocalAssigned);
     }
   }
+
+  static class PreemptionContext extends AMPreemptionPolicy.Context {
+    final AssignedRequests reqs;
+
+    PreemptionContext(AssignedRequests reqs) {
+      this.reqs = reqs;
+    }
+    @Override
+    public TaskAttemptId getTaskAttempt(ContainerId container) {
+      return reqs.get(container);
+    }
+
+    @Override
+    public List<Container> getContainers(TaskType t){
+      if(TaskType.REDUCE.equals(t))
+        return new ArrayList<Container>(reqs.reduces.values());
+      if(TaskType.MAP.equals(t))
+        return new ArrayList<Container>(reqs.maps.values());
+      return null;
+    }
+
+  }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Fri Jan  3 07:26:52 2014
@@ -63,6 +63,7 @@ public class TaskPage extends AppView {
             th(".id", "Attempt").
             th(".progress", "Progress").
             th(".state", "State").
+            th(".status", "Status").
             th(".node", "Node").
             th(".logs", "Logs").
             th(".tsh", "Started").
@@ -84,6 +85,7 @@ public class TaskPage extends AppView {
         .append(ta.getId()).append("\",\"")
         .append(progress).append("\",\"")
         .append(ta.getState().toString()).append("\",\"")
+        .append(ta.getStatus()).append("\",\"")
 
         .append(nodeHttpAddr == null ? "N/A" :
           "<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>"
@@ -144,13 +146,13 @@ public class TaskPage extends AppView {
     .append("\n,aoColumnDefs:[\n")
 
     //logs column should not filterable (it includes container ID which may pollute searches)
-    .append("\n{'aTargets': [ 4 ]")
+    .append("\n{'aTargets': [ 5 ]")
     .append(", 'bSearchable': false }")
 
-    .append("\n, {'sType':'numeric', 'aTargets': [ 5, 6")
+    .append("\n, {'sType':'numeric', 'aTargets': [ 6, 7")
     .append(" ], 'mRender': renderHadoopDate }")
 
-    .append("\n, {'sType':'numeric', 'aTargets': [ 7")
+    .append("\n, {'sType':'numeric', 'aTargets': [ 8")
     .append(" ], 'mRender': renderHadoopElapsedTime }]")
 
     // Sort by id upon page load

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java Fri Jan  3 07:26:52 2014
@@ -59,6 +59,7 @@ public class TasksBlock extends HtmlBloc
           tr().
             th("Task").
             th("Progress").
+            th("Status").
             th("State").
             th("Start Time").
             th("Finish Time").
@@ -81,6 +82,7 @@ public class TasksBlock extends HtmlBloc
       .append(join(pct, '%')).append("'> ").append("<div class='")
       .append(C_PROGRESSBAR_VALUE).append("' style='")
       .append(join("width:", pct, '%')).append("'> </div> </div>\",\"")
+      .append(info.getStatus()).append("\",\"")
 
       .append(info.getState()).append("\",\"")
       .append(info.getStartTime()).append("\",\"")

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksPage.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksPage.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksPage.java Fri Jan  3 07:26:52 2014
@@ -50,10 +50,10 @@ public class TasksPage extends AppView {
       .append(", 'mRender': parseHadoopProgress }")
 
 
-      .append("\n, {'sType':'numeric', 'aTargets': [3, 4]")
+      .append("\n, {'sType':'numeric', 'aTargets': [4, 5]")
       .append(", 'mRender': renderHadoopDate }")
 
-      .append("\n, {'sType':'numeric', 'aTargets': [5]")
+      .append("\n, {'sType':'numeric', 'aTargets': [6]")
       .append(", 'mRender': renderHadoopElapsedTime }]")
 
       // Sort by id upon page load

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskAttemptInfo.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskAttemptInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskAttemptInfo.java Fri Jan  3 07:26:52 2014
@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlRoot
 import javax.xml.bind.annotation.XmlSeeAlso;
 import javax.xml.bind.annotation.XmlTransient;
 
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -45,6 +46,7 @@ public class TaskAttemptInfo {
   protected String id;
   protected String rack;
   protected TaskAttemptState state;
+  protected String status;
   protected String nodeHttpAddress;
   protected String diagnostics;
   protected String type;
@@ -61,29 +63,23 @@ public class TaskAttemptInfo {
   }
 
   public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) {
+    final TaskAttemptReport report = ta.getReport();
     this.type = type.toString();
     this.id = MRApps.toString(ta.getID());
     this.nodeHttpAddress = ta.getNodeHttpAddress();
-    this.startTime = ta.getLaunchTime();
-    this.finishTime = ta.getFinishTime();
-    this.assignedContainerId = ConverterUtils.toString(ta
-        .getAssignedContainerID());
-    this.assignedContainer = ta.getAssignedContainerID();
-    this.progress = ta.getProgress() * 100;
-    this.state = ta.getState();
+    this.startTime = report.getStartTime();
+    this.finishTime = report.getFinishTime();
+    this.assignedContainerId = ConverterUtils.toString(report.getContainerId());
+    this.assignedContainer = report.getContainerId();
+    this.progress = report.getProgress() * 100;
+    this.status = report.getStateString();
+    this.state = report.getTaskAttemptState();
     this.elapsedTime = Times
         .elapsed(this.startTime, this.finishTime, isRunning);
     if (this.elapsedTime == -1) {
       this.elapsedTime = 0;
     }
-    List<String> diagnostics = ta.getDiagnostics();
-    if (diagnostics != null && !diagnostics.isEmpty()) {
-      StringBuffer b = new StringBuffer();
-      for (String diag : diagnostics) {
-        b.append(diag);
-      }
-      this.diagnostics = b.toString();
-    }
+    this.diagnostics = report.getDiagnosticInfo();
     this.rack = ta.getNodeRackName();
   }
 
@@ -99,6 +95,10 @@ public class TaskAttemptInfo {
     return this.state.toString();
   }
 
+  public String getStatus() {
+    return status;
+  }
+
   public String getId() {
     return this.id;
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java Fri Jan  3 07:26:52 2014
@@ -43,6 +43,7 @@ public class TaskInfo {
   protected TaskState state;
   protected String type;
   protected String successfulAttempt;
+  protected String status;
 
   @XmlTransient
   int taskNum;
@@ -66,6 +67,7 @@ public class TaskInfo {
       this.elapsedTime = 0;
     }
     this.progress = report.getProgress() * 100;
+    this.status =  report.getStatus();
     this.id = MRApps.toString(task.getID());
     this.taskNum = task.getID().getId();
     this.successful = getSuccessfulAttempt(task);
@@ -121,4 +123,7 @@ public class TaskInfo {
     return null;
   }
 
+  public String getStatus() {
+    return status;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Jan  3 07:26:52 2014
@@ -17,26 +17,23 @@
 */
 package org.apache.hadoop.mapred;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-
-import junit.framework.Assert;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -46,21 +43,31 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.SystemClock;
+
 import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestTaskAttemptListenerImpl {
-  public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
+  public static class MockTaskAttemptListenerImpl
+      extends TaskAttemptListenerImpl {
 
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
         RMHeartbeatHandler rmHeartbeatHandler,
-        TaskHeartbeatHandler hbHandler) {
-      super(context, jobTokenSecretManager, rmHeartbeatHandler);
+        TaskHeartbeatHandler hbHandler,
+        AMPreemptionPolicy policy) {
+
+      super(context, jobTokenSecretManager, rmHeartbeatHandler, policy);
       this.taskHeartbeatHandler = hbHandler;
     }
     
@@ -87,9 +94,16 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
     MockTaskAttemptListenerImpl listener = 
       new MockTaskAttemptListenerImpl(appCtx, secret,
-          rmHeartbeatHandler, hbHandler);
+          rmHeartbeatHandler, hbHandler, policy);
     Configuration conf = new Configuration();
     listener.init(conf);
     listener.start();
@@ -144,7 +158,7 @@ public class TestTaskAttemptListenerImpl
     assertNotNull(jvmid);
     try {
       JVMId.forName("jvm_001_002_m_004_006");
-      Assert.fail();
+      fail();
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
           "TaskId string : jvm_001_002_m_004_006 is not properly formed");
@@ -190,8 +204,14 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -219,7 +239,8 @@ public class TestTaskAttemptListenerImpl
         isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
             : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
     TaskAttemptCompletionEvent tce = recordFactory
         .newRecordInstance(TaskAttemptCompletionEvent.class);
     tce.setEventId(eventId);
@@ -244,8 +265,14 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -270,4 +297,88 @@ public class TestTaskAttemptListenerImpl
 
     listener.stop();
   }
+
+  @Test
+  public void testCheckpointIDTracking()
+    throws IOException, InterruptedException{
+
+    SystemClock clock = new SystemClock();
+
+    org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+        mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+    when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
+
+    AppContext appCtx = mock(AppContext.class);
+    when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+    when(appCtx.getClock()).thenReturn(clock);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
+      @Override
+      protected void registerHeartbeatHandler(Configuration conf) {
+        taskHeartbeatHandler = hbHandler;
+      }
+    };
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.TASK_PREEMPTION, true);
+    //conf.setBoolean("preemption.reduce", true);
+
+    listener.init(conf);
+    listener.start();
+
+    TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+
+    List<Path> partialOut = new ArrayList<Path>();
+    partialOut.add(new Path("/prev1"));
+    partialOut.add(new Path("/prev2"));
+
+    Counters counters = mock(Counters.class);
+    final long CBYTES = 64L * 1024 * 1024;
+    final long CTIME = 4344L;
+    final Path CLOC = new Path("/test/1");
+    Counter cbytes = mock(Counter.class);
+    when(cbytes.getValue()).thenReturn(CBYTES);
+    Counter ctime = mock(Counter.class);
+    when(ctime.getValue()).thenReturn(CTIME);
+    when(counters.findCounter(eq(EnumCounter.CHECKPOINT_BYTES)))
+        .thenReturn(cbytes);
+    when(counters.findCounter(eq(EnumCounter.CHECKPOINT_MS)))
+        .thenReturn(ctime);
+
+    // propagating a taskstatus that contains a checkpoint id
+    TaskCheckpointID incid = new TaskCheckpointID(new FSCheckpointID(
+          CLOC), partialOut, counters);
+    listener.setCheckpointID(
+        org.apache.hadoop.mapred.TaskID.downgrade(tid.getTaskID()), incid);
+
+    // and try to get it back
+    CheckpointID outcid = listener.getCheckpointID(tid.getTaskID());
+    TaskCheckpointID tcid = (TaskCheckpointID) outcid;
+    assertEquals(CBYTES, tcid.getCheckpointBytes());
+    assertEquals(CTIME, tcid.getCheckpointTime());
+    assertTrue(partialOut.containsAll(tcid.getPartialCommittedOutput()));
+    assertTrue(tcid.getPartialCommittedOutput().containsAll(partialOut));
+
+    //assert it worked
+    assert outcid == incid;
+
+    listener.stop();
+
+  }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Jan  3 07:26:52 2014
@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
@@ -467,7 +468,8 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+  protected TaskAttemptListener createTaskAttemptListener(
+      AppContext context, AMPreemptionPolicy policy) {
     return new TaskAttemptListener(){
       @Override
       public InetSocketAddress getAddress() {

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Fri Jan  3 07:26:52 2014
@@ -33,6 +33,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -61,6 +63,8 @@ public class MRAppBenchmark {
 
   /**
    * Runs memory and time benchmark with Mock MRApp.
+   * @param app Application to submit
+   * @throws Exception On application failure
    */
   public void run(MRApp app) throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
@@ -133,6 +137,7 @@ public class MRAppBenchmark {
       protected void serviceStart() throws Exception {
         thread = new Thread(new Runnable() {
           @Override
+          @SuppressWarnings("unchecked")
           public void run() {
             ContainerAllocatorEvent event = null;
             while (!Thread.currentThread().isInterrupted()) {
@@ -192,7 +197,9 @@ public class MRAppBenchmark {
       @Override
       protected ContainerAllocator createContainerAllocator(
           ClientService clientService, AppContext context) {
-        return new RMContainerAllocator(clientService, context) {
+
+        AMPreemptionPolicy policy = new NoopAMPreemptionPolicy();
+        return new RMContainerAllocator(clientService, context, policy) {
           @Override
           protected ApplicationMasterProtocol createSchedulerProxy() {
             return new ApplicationMasterProtocol() {

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Jan  3 07:26:52 2014
@@ -174,22 +174,37 @@ public class MockJobs extends MockApps {
     report.setFinishTime(System.currentTimeMillis()
         + (int) (Math.random() * DT) + 1);
     report.setProgress((float) Math.random());
+    report.setStatus("Moving average: " + Math.random());
     report.setCounters(TypeConverter.toYarn(newCounters()));
     report.setTaskState(TASK_STATES.next());
     return report;
   }
 
   public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        id.getTaskId().getJobId().getAppId(), 0);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
     TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
     report.setTaskAttemptId(id);
     report
         .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
     report.setFinishTime(System.currentTimeMillis()
         + (int) (Math.random() * DT) + 1);
+
+    if (id.getTaskId().getTaskType() == TaskType.REDUCE) {
+      report.setShuffleFinishTime(
+          (report.getFinishTime() + report.getStartTime()) / 2);
+      report.setSortFinishTime(
+          (report.getFinishTime() + report.getShuffleFinishTime()) / 2);
+    }
+
     report.setPhase(PHASES.next());
     report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
     report.setProgress((float) Math.random());
     report.setCounters(TypeConverter.toYarn(newCounters()));
+    report.setContainerId(containerId);
+    report.setDiagnosticInfo(DIAGS.next());
+    report.setStateString("Moving average " + Math.random());
     return report;
   }
 
@@ -230,8 +245,6 @@ public class MockJobs extends MockApps {
     taid.setTaskId(tid);
     taid.setId(i);
     final TaskAttemptReport report = newTaskAttemptReport(taid);
-    final List<String> diags = Lists.newArrayList();
-    diags.add(DIAGS.next());
     return new TaskAttempt() {
       @Override
       public NodeId getNodeId() throws UnsupportedOperationException{
@@ -250,12 +263,12 @@ public class MockJobs extends MockApps {
 
       @Override
       public long getLaunchTime() {
-        return 0;
+        return report.getStartTime();
       }
 
       @Override
       public long getFinishTime() {
-        return 0;
+        return report.getFinishTime();
       }
 
       @Override
@@ -313,7 +326,7 @@ public class MockJobs extends MockApps {
 
       @Override
       public List<String> getDiagnostics() {
-        return diags;
+        return Lists.newArrayList(report.getDiagnosticInfo());
       }
 
       @Override
@@ -323,12 +336,12 @@ public class MockJobs extends MockApps {
 
       @Override
       public long getShuffleFinishTime() {
-        return 0;
+        return report.getShuffleFinishTime();
       }
 
       @Override
       public long getSortFinishTime() {
-        return 0;
+        return report.getSortFinishTime();
       }
 
       @Override

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Fri Jan  3 07:26:52 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -247,13 +248,14 @@ public class TestFail {
       super(maps, reduces, false, "TimeOutTaskMRApp", true);
     }
     @Override
-    protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+    protected TaskAttemptListener createTaskAttemptListener(
+        AppContext context, AMPreemptionPolicy policy) {
       //This will create the TaskAttemptListener with TaskHeartbeatHandler
       //RPC servers are not started
       //task time out is reduced
       //when attempt times out, heartbeat handler will send the lost event
       //leading to Attempt failure
-      return new TaskAttemptListenerImpl(getContext(), null, null) {
+      return new TaskAttemptListenerImpl(getContext(), null, null, policy) {
         @Override
         public void startRpcServer(){};
         @Override

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Jan  3 07:26:52 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
+
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
@@ -1428,14 +1430,15 @@ public class TestRMContainerAllocator {
     // Use this constructor when using a real job.
     MyContainerAllocator(MyResourceManager rm,
         ApplicationAttemptId appAttemptId, AppContext context) {
-      super(createMockClientService(), context);
+      super(createMockClientService(), context, new NoopAMPreemptionPolicy());
       this.rm = rm;
     }
 
     // Use this constructor when you are using a mocked job.
     public MyContainerAllocator(MyResourceManager rm, Configuration conf,
         ApplicationAttemptId appAttemptId, Job job) {
-      super(createMockClientService(), createAppContext(appAttemptId, job));
+      super(createMockClientService(), createAppContext(appAttemptId, job),
+          new NoopAMPreemptionPolicy());
       this.rm = rm;
       super.init(conf);
       super.start();
@@ -1444,7 +1447,8 @@ public class TestRMContainerAllocator {
     public MyContainerAllocator(MyResourceManager rm, Configuration conf,
         ApplicationAttemptId appAttemptId, Job job, Clock clock) {
       super(createMockClientService(),
-          createAppContext(appAttemptId, job, clock));
+          createAppContext(appAttemptId, job, clock),
+          new NoopAMPreemptionPolicy());
       this.rm = rm;
       super.init(conf);
       super.start();
@@ -1671,7 +1675,8 @@ public class TestRMContainerAllocator {
         ApplicationId.newInstance(1, 1));
 
     RMContainerAllocator allocator = new RMContainerAllocator(
-        mock(ClientService.class), appContext) {
+        mock(ClientService.class), appContext,
+        new NoopAMPreemptionPolicy()) {
           @Override
           protected void register() {
           }
@@ -1721,7 +1726,8 @@ public class TestRMContainerAllocator {
   @Test
   public void testCompletedContainerEvent() {
     RMContainerAllocator allocator = new RMContainerAllocator(
-        mock(ClientService.class), mock(AppContext.class));
+        mock(ClientService.class), mock(AppContext.class),
+        new NoopAMPreemptionPolicy());
     
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
         MRBuilderUtils.newTaskId(

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesAttempts.java Fri Jan  3 07:26:52 2014
@@ -1,4 +1,5 @@
 /**
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -425,9 +426,9 @@ public class TestAMWebServicesAttempts e
   public void verifyAMTaskAttempt(JSONObject info, TaskAttempt att,
       TaskType ttype) throws JSONException {
     if (ttype == TaskType.REDUCE) {
-      assertEquals("incorrect number of elements", 16, info.length());
+      assertEquals("incorrect number of elements", 17, info.length());
     } else {
-      assertEquals("incorrect number of elements", 11, info.length());
+      assertEquals("incorrect number of elements", 12, info.length());
     }
 
     verifyTaskAttemptGeneric(att, ttype, info.getString("id"),
@@ -532,11 +533,11 @@ public class TestAMWebServicesAttempts e
     assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(),
         mergeFinishTime);
     assertEquals("elapsedShuffleTime wrong",
-        ta.getLaunchTime() - ta.getShuffleFinishTime(), elapsedShuffleTime);
+        ta.getShuffleFinishTime() - ta.getLaunchTime(), elapsedShuffleTime);
     assertEquals("elapsedMergeTime wrong",
-        ta.getShuffleFinishTime() - ta.getSortFinishTime(), elapsedMergeTime);
+        ta.getSortFinishTime() - ta.getShuffleFinishTime(), elapsedMergeTime);
     assertEquals("elapsedReduceTime wrong",
-        ta.getSortFinishTime() - ta.getFinishTime(), elapsedReduceTime);
+        ta.getFinishTime() - ta.getSortFinishTime(), elapsedReduceTime);
   }
 
   @Test

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesTasks.java Fri Jan  3 07:26:52 2014
@@ -525,12 +525,13 @@ public class TestAMWebServicesTasks exte
 
   public void verifyAMSingleTask(JSONObject info, Task task)
       throws JSONException {
-    assertEquals("incorrect number of elements", 8, info.length());
+    assertEquals("incorrect number of elements", 9, info.length());
 
     verifyTaskGeneric(task, info.getString("id"), info.getString("state"),
         info.getString("type"), info.getString("successfulAttempt"),
         info.getLong("startTime"), info.getLong("finishTime"),
-        info.getLong("elapsedTime"), (float) info.getDouble("progress"));
+        info.getLong("elapsedTime"), (float) info.getDouble("progress"),
+        info.getString("status"));
   }
 
   public void verifyAMTask(JSONArray arr, Job job, String type)
@@ -555,7 +556,7 @@ public class TestAMWebServicesTasks exte
 
   public void verifyTaskGeneric(Task task, String id, String state,
       String type, String successfulAttempt, long startTime, long finishTime,
-      long elapsedTime, float progress) {
+      long elapsedTime, float progress, String status) {
 
     TaskId taskid = task.getID();
     String tid = MRApps.toString(taskid);
@@ -572,6 +573,7 @@ public class TestAMWebServicesTasks exte
     assertEquals("finishTime wrong", report.getFinishTime(), finishTime);
     assertEquals("elapsedTime wrong", finishTime - startTime, elapsedTime);
     assertEquals("progress wrong", report.getProgress() * 100, progress, 1e-3f);
+    assertEquals("status wrong", report.getStatus(), status);
   }
 
   public void verifyAMSingleTaskXML(Element element, Task task) {
@@ -582,7 +584,8 @@ public class TestAMWebServicesTasks exte
         WebServicesTestUtils.getXmlLong(element, "startTime"),
         WebServicesTestUtils.getXmlLong(element, "finishTime"),
         WebServicesTestUtils.getXmlLong(element, "elapsedTime"),
-        WebServicesTestUtils.getXmlFloat(element, "progress"));
+        WebServicesTestUtils.getXmlFloat(element, "progress"),
+        WebServicesTestUtils.getXmlString(element, "status"));
   }
 
   public void verifyAMTaskXML(NodeList nodes, Job job) {

Modified: hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Jan  3 07:26:52 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.Queue
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -575,10 +576,17 @@ public class LocalJobRunner implements C
 
     // TaskUmbilicalProtocol methods
 
+    @Override
     public JvmTask getTask(JvmContext context) { return null; }
     
-    public synchronized boolean statusUpdate(TaskAttemptID taskId,
+    @Override
+    public synchronized AMFeedback statusUpdate(TaskAttemptID taskId,
         TaskStatus taskStatus) throws IOException, InterruptedException {
+      AMFeedback feedback = new AMFeedback();
+      feedback.setTaskFound(true);
+      if (null == taskStatus) {
+        return feedback;
+      }
       // Serialize as we would if distributed in order to make deep copy
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       DataOutputStream dos = new DataOutputStream(baos);
@@ -618,7 +626,7 @@ public class LocalJobRunner implements C
       }
 
       // ignore phase
-      return true;
+      return feedback;
     }
 
     /** Return the current values of the counters for this job,
@@ -654,24 +662,24 @@ public class LocalJobRunner implements C
       statusUpdate(taskid, taskStatus);
     }
 
+    @Override
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
       // Ignore for now
     }
     
+    @Override
     public void reportNextRecordRange(TaskAttemptID taskid, 
         SortedRanges.Range range) throws IOException {
       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
     }
 
-    public boolean ping(TaskAttemptID taskid) throws IOException {
-      return true;
-    }
-    
+    @Override
     public boolean canCommit(TaskAttemptID taskid) 
     throws IOException {
       return true;
     }
     
+    @Override
     public void done(TaskAttemptID taskId) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
@@ -681,11 +689,13 @@ public class LocalJobRunner implements C
       }
     }
 
+    @Override
     public synchronized void fsError(TaskAttemptID taskId, String message) 
     throws IOException {
       LOG.fatal("FSError: "+ message + "from task: " + taskId);
     }
 
+    @Override
     public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
       LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
     }
@@ -695,12 +705,30 @@ public class LocalJobRunner implements C
       LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
     }
     
+    @Override
     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
         int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
       return new MapTaskCompletionEventsUpdate(
         org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
     }
-    
+
+    @Override
+    public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      // ignore
+    }
+
+    @Override
+    public TaskCheckpointID getCheckpointID(TaskID taskId) {
+      // ignore
+      return null;
+    }
+
+    @Override
+    public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+      // ignore
+    }
+
   }
 
   public LocalJobRunner(Configuration conf) throws IOException {



Mime
View raw message