hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1611531 - in /hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project: ./ bin/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/...
Date Fri, 18 Jul 2014 02:21:28 GMT
Author: szetszwo
Date: Fri Jul 18 02:21:21 2014
New Revision: 1611531

URL: http://svn.apache.org/r1611531
Log:
Merge r1609845 through r1611528 from trunk.

Modified:
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (contents, props changed)
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java
    hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java

Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1609845-1611528

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt Fri Jul 18 02:21:21 2014
@@ -17,6 +17,9 @@ Trunk (Unreleased)
     MAPREDUCE-5232. Add a configuration to be able to log classpath and other
     system properties on mapreduce JVMs startup.  (Sangjin Lee via vinodkv)
 
+    MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving
+    RM-restart. (Rohith via jianhe)
+
   IMPROVEMENTS
 
     MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
@@ -153,6 +156,9 @@ Release 2.6.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-5971. Move the default options for distcp -p to
+    DistCpOptionSwitch. (clamb via wang)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -237,6 +243,9 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5844. Add a configurable delay to reducer-preemption. 
     (Maysam Yabandeh via kasha)
 
+    MAPREDUCE-5790. Made it easier to enable hprof profile options by default.
+    (Gera Shegalov via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES 
@@ -304,6 +313,9 @@ Release 2.5.0 - UNRELEASED
     resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
     vinodkv)
 
+    MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly 
+    assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
+
 Release 2.4.1 - 2014-06-23 
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1609845-1611528

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh Fri Jul 18 02:21:21 2014
@@ -133,6 +133,7 @@ case $startStop in
       else
         echo no $command to stop
       fi
+      rm -f $pid
     else
       echo no $command to stop
     fi

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Fri Jul 18 02:21:21 2014
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
@@ -438,43 +439,6 @@ public class LocalContainerLauncher exte
     }
 
     /**
-     * Within the _local_ filesystem (not HDFS), all activity takes place within
-     * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
-     * and all sub-MapTasks create the same filename ("file.out").  Rename that
-     * to something unique (e.g., "map_0.out") to avoid collisions.
-     *
-     * Longer-term, we'll modify [something] to use TaskAttemptID-based
-     * filenames instead of "file.out". (All of this is entirely internal,
-     * so there are no particular compatibility issues.)
-     */
-    private MapOutputFile renameMapOutputForReduce(JobConf conf,
-        TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      // move map output to reduce input
-      Path mapOut = subMapOutputFile.getOutputFile();
-      FileStatus mStatus = localFs.getFileStatus(mapOut);      
-      Path reduceIn = subMapOutputFile.getInputFileForWrite(
-          TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
-      Path mapOutIndex = new Path(mapOut.toString() + ".index");
-      Path reduceInIndex = new Path(reduceIn.toString() + ".index");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Renaming map output file for task attempt "
-            + mapId.toString() + " from original location " + mapOut.toString()
-            + " to destination " + reduceIn.toString());
-      }
-      if (!localFs.mkdirs(reduceIn.getParent())) {
-        throw new IOException("Mkdirs failed to create "
-            + reduceIn.getParent().toString());
-      }
-      if (!localFs.rename(mapOut, reduceIn))
-        throw new IOException("Couldn't rename " + mapOut);
-      if (!localFs.rename(mapOutIndex, reduceInIndex))
-        throw new IOException("Couldn't rename " + mapOutIndex);
-      
-      return new RenamedMapOutputFile(reduceIn);
-    }
-
-    /**
      * Also within the local filesystem, we need to restore the initial state
      * of the directory as much as possible.  Compare current contents against
      * the saved original state and nuke everything that doesn't belong, with
@@ -506,7 +470,46 @@ public class LocalContainerLauncher exte
     }
 
   } // end EventHandler
-  
+
+  /**
+   * Within the _local_ filesystem (not HDFS), all activity takes place within
+   * a subdir inside one of the LOCAL_DIRS
+   * (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+   * and all sub-MapTasks create the same filename ("file.out").  Rename that
+   * to something unique (e.g., "map_0.out") to avoid possible collisions.
+   *
+   * Longer-term, we'll modify [something] to use TaskAttemptID-based
+   * filenames instead of "file.out". (All of this is entirely internal,
+   * so there are no particular compatibility issues.)
+   */
+  @VisibleForTesting
+  protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
+      TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    // move map output to reduce input
+    Path mapOut = subMapOutputFile.getOutputFile();
+    FileStatus mStatus = localFs.getFileStatus(mapOut);
+    Path reduceIn = subMapOutputFile.getInputFileForWrite(
+        TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+    Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
+    Path reduceInIndex = new Path(reduceIn.toString() + ".index");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming map output file for task attempt "
+          + mapId.toString() + " from original location " + mapOut.toString()
+          + " to destination " + reduceIn.toString());
+    }
+    if (!localFs.mkdirs(reduceIn.getParent())) {
+      throw new IOException("Mkdirs failed to create "
+          + reduceIn.getParent().toString());
+    }
+    if (!localFs.rename(mapOut, reduceIn))
+      throw new IOException("Couldn't rename " + mapOut);
+    if (!localFs.rename(mapOutIndex, reduceInIndex))
+      throw new IOException("Couldn't rename " + mapOutIndex);
+
+    return new RenamedMapOutputFile(reduceIn);
+  }
+
   private static class RenamedMapOutputFile extends MapOutputFile {
     private Path path;
     

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Jul 18 02:21:21 2014
@@ -64,6 +64,7 @@ public class LocalContainerAllocator ext
   private int nmPort;
   private int nmHttpPort;
   private ContainerId containerId;
+  protected int lastResponseID;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -119,6 +120,11 @@ public class LocalContainerAllocator ext
     if (allocateResponse.getAMCommand() != null) {
       switch(allocateResponse.getAMCommand()) {
       case AM_RESYNC:
+        LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+            + " hence resyncing.");        
+        this.lastResponseID = 0;
+        register();
+        break;
       case AM_SHUTDOWN:
         LOG.info("Event from RM: shutting down Application Master");
         // This can happen if the RM has been restarted. If it is in that state,

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Jul 18 02:21:21 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -216,20 +217,27 @@ public abstract class RMCommunicator ext
     FinishApplicationMasterRequest request =
         FinishApplicationMasterRequest.newInstance(finishState,
           sb.toString(), historyUrl);
-    while (true) {
-      FinishApplicationMasterResponse response =
-          scheduler.finishApplicationMaster(request);
-      if (response.getIsUnregistered()) {
-        // When excepting ClientService, other services are already stopped,
-        // it is safe to let clients know the final states. ClientService
-        // should wait for some time so clients have enough time to know the
-        // final states.
-        RunningAppContext raContext = (RunningAppContext) context;
-        raContext.markSuccessfulUnregistration();
-        break;
+    try {
+      while (true) {
+        FinishApplicationMasterResponse response =
+            scheduler.finishApplicationMaster(request);
+        if (response.getIsUnregistered()) {
+          // When excepting ClientService, other services are already stopped,
+          // it is safe to let clients know the final states. ClientService
+          // should wait for some time so clients have enough time to know the
+          // final states.
+          RunningAppContext raContext = (RunningAppContext) context;
+          raContext.markSuccessfulUnregistration();
+          break;
+        }
+        LOG.info("Waiting for application to be successfully unregistered.");
+        Thread.sleep(rmPollInterval);
       }
-      LOG.info("Waiting for application to be successfully unregistered.");
-      Thread.sleep(rmPollInterval);
+    } catch (ApplicationMasterNotRegisteredException e) {
+      // RM might have restarted or failed over and so lost the fact that AM had
+      // registered before.
+      register();
+      doUnregistration();
     }
   }
 

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Jul 18 02:21:21 2014
@@ -389,6 +389,7 @@ public class RMContainerAllocator extend
           removed = true;
           assignedRequests.remove(aId);
           containersReleased++;
+          pendingRelease.add(containerId);
           release(containerId);
         }
       }
@@ -641,6 +642,15 @@ public class RMContainerAllocator extend
     if (response.getAMCommand() != null) {
       switch(response.getAMCommand()) {
       case AM_RESYNC:
+          LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+              + " hence resyncing.");
+          lastResponseID = 0;
+
+          // Registering to allow RM to discover an active AM for this
+          // application
+          register();
+          addOutstandingRequestOnResync();
+          break;
       case AM_SHUTDOWN:
         // This can happen if the RM has been restarted. If it is in that state,
         // this application must clean itself up.
@@ -700,6 +710,7 @@ public class RMContainerAllocator extend
         LOG.error("Container complete event for unknown container id "
             + cont.getContainerId());
       } else {
+        pendingRelease.remove(cont.getContainerId());
         assignedRequests.remove(attemptID);
         
         // send the container completed event to Task attempt
@@ -991,6 +1002,7 @@ public class RMContainerAllocator extend
     
     private void containerNotAssigned(Container allocated) {
       containersReleased++;
+      pendingRelease.add(allocated.getId());
       release(allocated.getId());      
     }
     

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri Jul 18 02:21:21 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,7 +59,7 @@ public abstract class RMContainerRequest
   
   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
 
-  private int lastResponseID;
+  protected int lastResponseID;
   private Resource availableResources;
 
   private final RecordFactory recordFactory =
@@ -77,8 +78,11 @@ public abstract class RMContainerRequest
   // numContainers dont end up as duplicates
   private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
-  private final Set<ContainerId> release = new TreeSet<ContainerId>(); 
-
+  private final Set<ContainerId> release = new TreeSet<ContainerId>();
+  // pendingRelease holds history or release requests.request is removed only if
+  // RM sends completedContainer.
+  // How it different from release? --> release is for per allocate() request.
+  protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
   private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@@ -186,6 +190,10 @@ public abstract class RMContainerRequest
     } catch (YarnException e) {
       throw new IOException(e);
     }
+
+    if (isResyncCommand(allocateResponse)) {
+      return allocateResponse;
+    }
     lastResponseID = allocateResponse.getResponseId();
     availableResources = allocateResponse.getAvailableResources();
     lastClusterNmCount = clusterNmCount;
@@ -214,6 +222,28 @@ public abstract class RMContainerRequest
     return allocateResponse;
   }
 
+  protected boolean isResyncCommand(AllocateResponse allocateResponse) {
+    return allocateResponse.getAMCommand() != null
+        && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+  }
+
+  protected void addOutstandingRequestOnResync() {
+    for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
+        .values()) {
+      for (Map<Resource, ResourceRequest> capabalities : rr.values()) {
+        for (ResourceRequest request : capabalities.values()) {
+          addResourceRequestToAsk(request);
+        }
+      }
+    }
+    if (!ignoreBlacklisting.get()) {
+      blacklistAdditions.addAll(blacklistedNodes);
+    }
+    if (!pendingRelease.isEmpty()) {
+      release.addAll(pendingRelease);
+    }
+  }
+
   // May be incorrect if there's multiple NodeManagers running on a single host.
   // knownNodeCount is based on node managers, not hosts. blacklisting is
   // currently based on hosts.

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java Fri Jul 18 02:21:21 2014
@@ -18,17 +18,26 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -46,6 +55,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -53,6 +65,36 @@ import org.mockito.stubbing.Answer;
 public class TestLocalContainerLauncher {
   private static final Log LOG =
       LogFactory.getLog(TestLocalContainerLauncher.class);
+  private static File testWorkDir;
+  private static final String[] localDirs = new String[2];
+
+  private static void delete(File dir) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
+    fs.delete(p, true);
+  }
+
+  @BeforeClass
+  public static void setupTestDirs() throws IOException {
+    testWorkDir = new File("target",
+        TestLocalContainerLauncher.class.getCanonicalName());
+    testWorkDir.delete();
+    testWorkDir.mkdirs();
+    testWorkDir = testWorkDir.getAbsoluteFile();
+    for (int i = 0; i < localDirs.length; i++) {
+      final File dir = new File(testWorkDir, "local-" + i);
+      dir.mkdirs();
+      localDirs[i] = dir.toString();
+    }
+  }
+
+  @AfterClass
+  public static void cleanupTestDirs() throws IOException {
+    if (testWorkDir != null) {
+      delete(testWorkDir);
+    }
+  }
 
   @SuppressWarnings("rawtypes")
   @Test(timeout=10000)
@@ -141,4 +183,35 @@ public class TestLocalContainerLauncher 
     when(container.getNodeId()).thenReturn(nodeId);
     return container;
   }
+
+
+  @Test
+  public void testRenameMapOutputForReduce() throws Exception {
+    final JobConf conf = new JobConf();
+
+    final MROutputFiles mrOutputFiles = new MROutputFiles();
+    mrOutputFiles.setConf(conf);
+
+    // make sure both dirs are distinct
+    //
+    conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString());
+    final Path mapOut = mrOutputFiles.getOutputFileForWrite(1);
+    conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString());
+    final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1);
+    Assert.assertNotEquals("Paths must be different!",
+        mapOut.getParent(), mapOutIdx.getParent());
+
+    // make both dirs part of LOCAL_DIR
+    conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
+
+    final FileContext lfc = FileContext.getLocalFSFileContext(conf);
+    lfc.create(mapOut, EnumSet.of(CREATE)).close();
+    lfc.create(mapOutIdx, EnumSet.of(CREATE)).close();
+
+    final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2);
+    final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0);
+
+    LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles);
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Fri Jul 18 02:21:21 2014
@@ -78,6 +78,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -95,9 +97,13 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -618,6 +624,10 @@ public class TestRMContainerAllocator {
       super(conf);
     }
 
+    public MyResourceManager(Configuration conf, RMStateStore store) {
+      super(conf, store);
+    }
+
     @Override
     public void serviceStart() throws Exception {
       super.serviceStart();
@@ -1426,6 +1436,13 @@ public class TestRMContainerAllocator {
         rm.getMyFifoScheduler().lastBlacklistRemovals.size());
   }
 
+  private static void assertAsksAndReleases(int expectedAsk,
+      int expectedRelease, MyResourceManager rm) {
+    Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size());
+    Assert.assertEquals(expectedRelease,
+        rm.getMyFifoScheduler().lastRelease.size());
+  }
+
   private static class MyFifoScheduler extends FifoScheduler {
 
     public MyFifoScheduler(RMContext rmContext) {
@@ -1440,6 +1457,7 @@ public class TestRMContainerAllocator {
     }
     
     List<ResourceRequest> lastAsk = null;
+    List<ContainerId> lastRelease = null;
     List<String> lastBlacklistAdditions;
     List<String> lastBlacklistRemovals;
     
@@ -1458,6 +1476,7 @@ public class TestRMContainerAllocator {
         askCopy.add(reqCopy);
       }
       lastAsk = ask;
+      lastRelease = release;
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
       return super.allocate(
@@ -1505,6 +1524,20 @@ public class TestRMContainerAllocator {
     return new ContainerFailedEvent(attemptId, host);    
   }
   
+  private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
+      int taskAttemptId, boolean reduce) {
+    TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    TaskAttemptId attemptId =
+        MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId);
+    return new ContainerAllocatorEvent(attemptId,
+        ContainerAllocator.EventType.CONTAINER_DEALLOCATE);
+  }
+
   private void checkAssignments(ContainerRequestEvent[] requests,
       List<TaskAttemptContainerAssignedEvent> assignments,
       boolean checkHostMatch) {
@@ -1557,6 +1590,7 @@ public class TestRMContainerAllocator {
     = new ArrayList<JobUpdatedNodesEvent>();
     private MyResourceManager rm;
     private boolean isUnregistered = false;
+    private AllocateResponse allocateResponse;
     private static AppContext createAppContext(
         ApplicationAttemptId appAttemptId, Job job) {
       AppContext context = mock(AppContext.class);
@@ -1668,6 +1702,10 @@ public class TestRMContainerAllocator {
       super.handleEvent(f);
     }
     
+    public void sendDeallocate(ContainerAllocatorEvent f) {
+      super.handleEvent(f);
+    }
+
     // API to be used by tests
     public List<TaskAttemptContainerAssignedEvent> schedule()
         throws Exception {
@@ -1713,6 +1751,20 @@ public class TestRMContainerAllocator {
     public boolean isUnregistered() {
       return isUnregistered;
     }
+
+    public void updateSchedulerProxy(MyResourceManager rm) {
+      scheduler = rm.getApplicationMasterService();
+    }
+
+    @Override
+    protected AllocateResponse makeRemoteRequest() throws IOException {
+      allocateResponse = super.makeRemoteRequest();
+      return allocateResponse;
+    }
+
+    public boolean isResyncCommand() {
+      return super.isResyncCommand(allocateResponse);
+    }
   }
 
   @Test
@@ -2022,6 +2074,198 @@ public class TestRMContainerAllocator {
     Assert.assertTrue(allocator.isUnregistered());
   }
   
+  // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+  // blackListeNode
+  // Step-2 : 2 containers are allocated by RM.
+  // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+  // RM
+  // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+  // additional containerRequest(event4) and blacklisted nodes.
+  // Intern RM send resync command
+  // Step-5 : On Resync,AM sends all outstanding
+  // asks,release,blacklistAaddition
+  // and another containerRequest(event5)
+  // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+  @Test
+  public void testRMContainerAllocatorResendsRequestsOnRMRestart()
+      throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+    rm1.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm1.submitApp(1024);
+    dispatcher.await();
+
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    rm1.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator =
+        new MyContainerAllocator(rm1, conf, appAttemptId, mockJob);
+
+    // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+    // blackListeNode
+    // create the container request
+    // send MAP request
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 1024, new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    ContainerRequestEvent event2 =
+        createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
+    allocator.sendRequest(event2);
+
+    // Send events to blacklist h2
+    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false);
+    allocator.sendFailure(f1);
+
+    // send allocate request and 1 blacklisted nodes
+    List<TaskAttemptContainerAssignedEvent> assignedContainers =
+        allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
+    assertAsksAndReleases(3, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(1, 0, rm1);
+
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    // Step-2 : 2 containers are allocated by RM.
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 2", 2,
+        assignedContainers.size());
+    assertAsksAndReleases(0, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    assignedContainers = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    assertAsksAndReleases(3, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+    // RM
+    // send container request
+    ContainerRequestEvent event3 =
+        createReq(jobId, 3, 1000, new String[] { "h1" });
+    allocator.sendRequest(event3);
+
+    // send deallocate request
+    ContainerAllocatorEvent deallocate1 =
+        createDeallocateEvent(jobId, 1, false);
+    allocator.sendDeallocate(deallocate1);
+
+    assignedContainers = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    assertAsksAndReleases(3, 1, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    // Phase-2 start 2nd RM is up
+    MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    allocator.updateSchedulerProxy(rm2);
+    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+    // NM should be rebooted on heartbeat, even first heartbeat for nm2
+    NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+    // new NM to represent NM re-register
+    nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+    // additional containerRequest(event4) and blacklisted nodes.
+    // Intern RM send resync command
+
+    // send deallocate request, release=1
+    ContainerAllocatorEvent deallocate2 =
+        createDeallocateEvent(jobId, 2, false);
+    allocator.sendDeallocate(deallocate2);
+
+    // Send events to blacklist nodes h3
+    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false);
+    allocator.sendFailure(f2);
+
+    ContainerRequestEvent event4 =
+        createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
+    allocator.sendRequest(event4);
+
+    // send allocate request to 2nd RM and get resync command
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertTrue("Last allocate response is not RESYNC",
+        allocator.isResyncCommand());
+
+    // Step-5 : On Resync,AM sends all outstanding
+    // asks,release,blacklistAaddition
+    // and another containerRequest(event5)
+    ContainerRequestEvent event5 =
+        createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
+    allocator.sendRequest(event5);
+
+    // send all outstanding request again.
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+    assertAsksAndReleases(3, 2, rm2);
+    assertBlacklistAdditionsAndRemovals(2, 0, rm2);
+
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+
+    Assert.assertEquals("Number of container should be 3", 3,
+        assignedContainers.size());
+
+    for (TaskAttemptContainerAssignedEvent assig : assignedContainers) {
+      Assert.assertTrue("Assigned count not correct",
+          "h1".equals(assig.getContainer().getNodeId().getHost()));
+    }
+
+    rm1.stop();
+    rm2.stop();
+
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Jul 18 02:21:21 2014
@@ -671,7 +671,7 @@
 
   <property>
     <name>mapreduce.task.profile.params</name>
-    <value></value>
+    <value>-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s</value>
     <description>JVM profiler parameters used to profile map and reduce task
       attempts. This string may contain a single format specifier %s that will
       be replaced by the path to profile.out in the task attempt log directory.

Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1609845-1611528

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java Fri Jul 18 02:21:21 2014
@@ -29,11 +29,7 @@ public class TestJobConf {
   @Test
   public void testProfileParamsDefaults() {
     JobConf configuration = new JobConf();
-
-    Assert.assertNull(configuration.get(MRJobConfig.TASK_PROFILE_PARAMS));
-
     String result = configuration.getProfileParams();
-
     Assert.assertNotNull(result);
     Assert.assertTrue(result.contains("file=%s"));
     Assert.assertTrue(result.startsWith("-agentlib:hprof"));

Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java Fri Jul 18 02:21:21 2014
@@ -24,6 +24,7 @@ import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.junit.AfterClass;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
@@ -39,8 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestMRJobsWithProfiler {
@@ -51,6 +51,8 @@ public class TestMRJobsWithProfiler {
   private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
     EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
 
+  private static final int PROFILED_TASK_ID = 1;
+
   private static MiniMRYarnCluster mrCluster;
 
   private static final Configuration CONF = new Configuration();
@@ -69,8 +71,8 @@ public class TestMRJobsWithProfiler {
 
   private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
 
-  @Before
-  public void setup() throws InterruptedException, IOException {
+  @BeforeClass
+  public static void setup() throws InterruptedException, IOException {
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -79,7 +81,7 @@ public class TestMRJobsWithProfiler {
     }
 
     if (mrCluster == null) {
-      mrCluster = new MiniMRYarnCluster(getClass().getName());
+      mrCluster = new MiniMRYarnCluster(TestMRJobsWithProfiler.class.getName());
       mrCluster.init(CONF);
       mrCluster.start();
     }
@@ -90,8 +92,8 @@ public class TestMRJobsWithProfiler {
     localFs.setPermission(APP_JAR, new FsPermission("700"));
   }
 
-  @After
-  public void tearDown() {
+  @AfterClass
+  public static void tearDown() {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
           + " not found. Not running test.");
@@ -103,10 +105,19 @@ public class TestMRJobsWithProfiler {
     }
   }
 
+  @Test (timeout = 150000)
+  public void testDefaultProfiler() throws Exception {
+    LOG.info("Starting testDefaultProfiler");
+    testProfilerInternal(true);
+  }
 
   @Test (timeout = 150000)
-  public void testProfiler() throws IOException, InterruptedException,
-      ClassNotFoundException {
+  public void testDifferentProfilers() throws Exception {
+    LOG.info("Starting testDefaultProfiler");
+    testProfilerInternal(false);
+  }
+
+  private void testProfilerInternal(boolean useDefault) throws Exception {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
         + " not found. Not running test.");
@@ -117,18 +128,19 @@ public class TestMRJobsWithProfiler {
     final JobConf sleepConf = new JobConf(mrCluster.getConfig());
 
     sleepConf.setProfileEnabled(true);
-    // profile map split 1
-    sleepConf.setProfileTaskRange(true, "1");
-    // profile reduce of map output partitions 1
-    sleepConf.setProfileTaskRange(false, "1");
-
-    // use hprof for map to profile.out
-    sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
-        "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
-      + "file=%s");
+    sleepConf.setProfileTaskRange(true, String.valueOf(PROFILED_TASK_ID));
+    sleepConf.setProfileTaskRange(false, String.valueOf(PROFILED_TASK_ID));
+
+    if (!useDefault) {
+      // use hprof for map to profile.out
+      sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
+          "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
+              + "file=%s");
+
+      // use Xprof for reduce to stdout
+      sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
+    }
 
-    // use Xprof for reduce to stdout
-    sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
     sleepJob.setConf(sleepConf);
 
     // 2-map-2-reduce SleepJob
@@ -205,8 +217,8 @@ public class TestMRJobsWithProfiler {
         TaskLog.LogName.PROFILE.toString());
       final Path stdoutPath = new Path(dirEntry.getValue(),
         TaskLog.LogName.STDOUT.toString());
-      if (tid.getTaskType() == TaskType.MAP) {
-        if (tid.getTaskID().getId() == 1) {
+      if (useDefault || tid.getTaskType() == TaskType.MAP) {
+        if (tid.getTaskID().getId() == PROFILED_TASK_ID) {
           // verify profile.out
           final BufferedReader br = new BufferedReader(new InputStreamReader(
             localFs.open(profilePath)));
@@ -222,7 +234,8 @@ public class TestMRJobsWithProfiler {
       } else {
         Assert.assertFalse("hprof file should not exist",
           localFs.exists(profilePath));
-        if (tid.getTaskID().getId() == 1) {
+        if (tid.getTaskID().getId() == PROFILED_TASK_ID) {
+          // reducer is profiled with Xprof
           final BufferedReader br = new BufferedReader(new InputStreamReader(
             localFs.open(stdoutPath)));
           boolean flatProfFound = false;



Mime
View raw message