Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskController.java
(original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskController.java
Tue Jun 5 02:33:44 2012
@@ -14,11 +14,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -26,16 +28,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
/**
* Controls initialization, finalization and clean up of tasks, and
@@ -47,402 +45,249 @@ import org.apache.hadoop.classification.
* performing the actual actions.
*
* <br/>
+ *
+ * NOTE: This class is internal only class and not intended for users!!
*/
-@InterfaceAudience.Private
public abstract class TaskController implements Configurable {
+ /**
+ * The constants for the signals.
+ */
+ public enum Signal {
+ NULL(0, "NULL"), QUIT(3, "SIGQUIT"),
+ KILL(9, "SIGKILL"), TERM(15, "SIGTERM");
+ private final int value;
+ private final String str;
+ private Signal(int value, String str) {
+ this.str = str;
+ this.value = value;
+ }
+ public int getValue() {
+ return value;
+ }
+ @Override
+ public String toString() {
+ return str;
+ }
+ }
+
private Configuration conf;
-
+
public static final Log LOG = LogFactory.getLog(TaskController.class);
+ //Name of the executable script that will contain the child
+ // JVM command line. See writeCommand for details.
+ protected static final String COMMAND_FILE = "taskjvm.sh";
+
+ protected LocalDirAllocator allocator;
+
+ final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+
public Configuration getConf() {
return conf;
}
- // The list of directory paths specified in the variable Configs.LOCAL_DIR
- // This is used to determine which among the list of directories is picked up
- // for storing data for a particular task.
- protected String[] mapredLocalDirs;
-
public void setConf(Configuration conf) {
this.conf = conf;
- mapredLocalDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
}
/**
- * Sets up the permissions of the following directories on all the configured
- * disks:
- * <ul>
- * <li>mapreduce.cluster.local.directories</li>
- * <li>Hadoop log directories</li>
- * </ul>
+ * Does initialization and setup.
+ * @param allocator the local dir allocator to use
*/
- public void setup() throws IOException {
- FileSystem localFs = FileSystem.getLocal(conf);
-
- for (String localDir : this.mapredLocalDirs) {
- // Set up the mapreduce.cluster.local.directories.
- File mapredlocalDir = new File(localDir);
- if (!mapredlocalDir.isDirectory() && !mapredlocalDir.mkdirs()) {
- LOG.warn("Unable to create mapreduce.cluster.local.directory : "
- + mapredlocalDir.getPath());
- } else {
- localFs.setPermission(new Path(mapredlocalDir.getCanonicalPath()),
- new FsPermission((short)0755));
- }
- }
-
- // Set up the user log directory
- File taskLog = TaskLog.getUserLogDir();
- if (!taskLog.isDirectory() && !taskLog.mkdirs()) {
- LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
- } else {
- localFs.setPermission(new Path(taskLog.getCanonicalPath()),
- new FsPermission((short)0755));
- }
- DiskChecker.checkDir(TaskLog.getUserLogDir());
- }
-
+ public abstract void setup(LocalDirAllocator allocator) throws IOException;
+
/**
- * Take task-controller specific actions to initialize job. This involves
- * setting appropriate permissions to job-files so as to secure the files to
- * be accessible only by the user's tasks.
- *
+ * Create all of the directories necessary for the job to start and download
+ * all of the job and private distributed cache files.
+ * Creates both the user directories and the job log directory.
+ * @param user the user name
+ * @param jobid the job
+ * @param credentials a filename containing the job secrets
+ * @param jobConf the path to the localized configuration file
+ * @param taskTracker the connection the task tracker
+ * @param ttAddr the tasktracker's RPC address
* @throws IOException
*/
- abstract void initializeJob(JobInitializationContext context) throws IOException;
-
+ public abstract void initializeJob(String user, String jobid,
+ Path credentials, Path jobConf,
+ TaskUmbilicalProtocol taskTracker,
+ InetSocketAddress ttAddr)
+ throws IOException, InterruptedException;
+
/**
- * Take task-controller specific actions to initialize the distributed cache
- * file. This involves setting appropriate permissions for these files so as
- * to secure them to be accessible only their owners.
- *
- * @param context
+ * Create all of the directories for the task and launches the child jvm.
+ * @param user the user name
+ * @param jobId the jobId in question
+ * @param attemptId the attempt id (cleanup attempts have .cleanup suffix)
+ * @param setup list of shell commands to execute before the jvm
+ * @param jvmArguments list of jvm arguments
+ * @param currentWorkDirectory the full path of the cwd for the task
+ * @param stdout the file to redirect stdout to
+ * @param stderr the file to redirect stderr to
+ * @return the exit code for the task
* @throws IOException
*/
- public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
- throws IOException;
-
- /**
- * Launch a task JVM
- *
- * This method defines how a JVM will be launched to run a task. Each
- * task-controller should also do an
- * {@link #initializeTask(TaskControllerContext)} inside this method so as to
- * initialize the task before launching it. This is for reasons of
- * task-controller specific optimizations w.r.t combining initialization and
- * launching of tasks.
- *
- * @param context the context associated to the task
- */
- abstract void launchTaskJVM(TaskControllerContext context)
- throws IOException;
-
+ public abstract
+ int launchTask(String user,
+ String jobId,
+ String attemptId,
+ List<String> setup,
+ List<String> jvmArguments,
+ File currentWorkDirectory,
+ String stdout,
+ String stderr) throws IOException;
+
/**
- * Top level cleanup a task JVM method.
- * <ol>
- * <li>Sends a graceful termiante signal to task JVM to allow subprocesses
- * to cleanup.</li>
- * <li>Sends a forceful kill signal to task JVM, terminating all its
- * sub-processes forcefully.</li>
- * </ol>
- *
- * @param context the task for which kill signal has to be sent.
+ * Send a signal to a task pid as the user. Always signal the process group.
+ * An implementation may elect to signal the pid directly if the former is
+ * unavailable or fails.
+ * @param user the user name
+ * @param taskPid the pid of the task
+ * @param signal the id of the signal to send
+ * @return false if the process does not exist
+ * @throws IOException If the task controller failed to signal the process
+ * (group), but the process exists.
*/
- final void destroyTaskJVM(TaskControllerContext context) {
- // Send SIGTERM to try to ask for a polite exit.
- terminateTask(context);
-
- try {
- Thread.sleep(context.sleeptimeBeforeSigkill);
- } catch (InterruptedException e) {
- LOG.warn("Sleep interrupted : " +
- StringUtils.stringifyException(e));
- }
-
- killTask(context);
- }
-
- /** Perform initializing actions required before a task can run.
- *
- * For instance, this method can be used to setup appropriate
- * access permissions for files and directories that will be
- * used by tasks. Tasks use the job cache, log, and distributed cache
- * directories and files as part of their functioning. Typically,
- * these files are shared between the daemon and the tasks
- * themselves. So, a TaskController that is launching tasks
- * as different users can implement this method to setup
- * appropriate ownership and permissions for these directories
- * and files.
- */
- abstract void initializeTask(TaskControllerContext context)
- throws IOException;
-
- static class TaskExecContext {
- // task being executed
- Task task;
- }
+ public abstract boolean signalTask(String user, int taskPid,
+ Signal signal) throws IOException;
+
/**
- * Contains task information required for the task controller.
+ * Delete the user's files under all of the task tracker root directories.
+ * @param user the user name
+ * @param subDir the path relative to base directories
+ * @param baseDirs the base directories (absolute paths)
+ * @throws IOException
*/
- static class TaskControllerContext extends TaskExecContext {
- ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
-
- // Information used only when this context is used for launching new tasks.
- JvmEnv env; // the JVM environment for the task.
-
- // Information used only when this context is used for destroying a task jvm.
- String pid; // process handle of task JVM.
- long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after
sending SIGTERM
- }
-
+ public abstract void deleteAsUser(String user,
+ String subDir,
+ String... baseDirs) throws IOException;
+
/**
- * Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the file/dir
+ * Delete the user's files under the userlogs directory.
+ * @param user the user to work as
+ * @param subDir the path under the userlogs directory.
+ * @throws IOException
*/
- static abstract class TaskControllerPathDeletionContext
- extends PathDeletionContext {
- TaskController taskController;
- String user;
-
- /**
- * mapredLocalDir is the base dir under which to-be-deleted jobLocalDir,
- * taskWorkDir or taskAttemptDir exists. fullPath of jobLocalDir,
- * taskAttemptDir or taskWorkDir is built using mapredLocalDir, jobId,
- * taskId, etc.
- */
- Path mapredLocalDir;
-
- public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
- TaskController taskController,
- String user) {
- super(fs, null);
- this.taskController = taskController;
- this.mapredLocalDir = mapredLocalDir;
+ public abstract void deleteLogAsUser(String user,
+ String subDir) throws IOException;
+
+ static class DeletionContext extends CleanupQueue.PathDeletionContext {
+ private TaskController controller;
+ private boolean isLog;
+ private String user;
+ private String subDir;
+ private String[] baseDirs;
+ DeletionContext(TaskController controller, boolean isLog, String user,
+ String subDir, String[] baseDirs) {
+ super(null, null);
+ this.controller = controller;
+ this.isLog = isLog;
this.user = user;
+ this.subDir = subDir;
+ this.baseDirs = baseDirs;
}
-
+
@Override
- protected String getPathForCleanup() {
- if (fullPath == null) {
- fullPath = buildPathForDeletion();
+ protected void deletePath() throws IOException {
+ if (isLog) {
+ controller.deleteLogAsUser(user, subDir);
+ } else {
+ controller.deleteAsUser(user, subDir, baseDirs);
}
- return fullPath;
}
- /**
- * Return the component of the path under the {@link #mapredLocalDir} to be
- * cleaned up. Its the responsibility of the class that extends
- * {@link TaskControllerPathDeletionContext} to provide the correct
- * component. For example
- * - For task related cleanups, either the task-work-dir or task-local-dir
- * might be returned depending on jvm reuse.
- * - For job related cleanup, simply the job-local-dir might be returned.
- */
- abstract protected String getPath();
-
- /**
- * Builds the path of taskAttemptDir OR taskWorkDir based on
- * mapredLocalDir, jobId, taskId, etc
- */
- String buildPathForDeletion() {
- return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + getPath();
+ @Override
+ public String toString() {
+ return (isLog ? "log(" : "dir(") +
+ user + "," + subDir + ")";
}
}
-
- /** Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the task-work-dir or
- * task-local-dir depending on whether the jvm is reused or not.
- */
- static class TaskControllerTaskPathDeletionContext
- extends TaskControllerPathDeletionContext {
- final Task task;
- final boolean isWorkDir;
-
- public TaskControllerTaskPathDeletionContext(FileSystem fs,
- Path mapredLocalDir, Task task, boolean isWorkDir,
- TaskController taskController) {
- super(fs, mapredLocalDir, taskController, task.getUser());
- this.task = task;
- this.isWorkDir = isWorkDir;
- }
-
- /**
- * Returns the taskWorkDir or taskLocalDir based on whether
- * {@link TaskControllerTaskPathDeletionContext} is configured to delete
- * the workDir.
- */
- @Override
- protected String getPath() {
- String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask())
- : TaskTracker.getLocalTaskDir(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask());
- return subDir;
+
+ /**
+ * Returns the local unix user that a given job will run as.
+ */
+ public String getRunAsUser(JobConf conf) {
+ return System.getProperty("user.name");
+ }
+
+ //Write the JVM command line to a file under the specified directory
+ // Note that the JVM will be launched using a setuid executable, and
+ // could potentially contain strings defined by a user. Hence, to
+ // prevent special character attacks, we write the command line to
+ // a file and execute it.
+ protected static String writeCommand(String cmdLine, FileSystem fs,
+ Path commandFile) throws IOException {
+ PrintWriter pw = null;
+ LOG.info("Writing commands to " + commandFile);
+ try {
+ pw = new PrintWriter(FileSystem.create(
+ fs, commandFile, TASK_LAUNCH_SCRIPT_PERMISSION));
+ pw.write(cmdLine);
+ } catch (IOException ioe) {
+ LOG.error("Caught IOException while writing JVM command line to file. ",
+ ioe);
+ } finally {
+ if (pw != null) {
+ pw.close();
+ }
}
-
- /**
- * Makes the path(and its subdirectories recursively) fully deletable by
- * setting proper permissions(770) by task-controller
- */
- @Override
- protected void enablePathForCleanup() throws IOException {
- getPathForCleanup();// allow init of fullPath, if not inited already
- if (fs.exists(new Path(fullPath))) {
- taskController.enableTaskForCleanup(this);
+ return commandFile.makeQualified(fs).toUri().getPath();
+ }
+
+ protected void logOutput(String output) {
+ String shExecOutput = output;
+ if (shExecOutput != null) {
+ for (String str : shExecOutput.split("\n")) {
+ LOG.info(str);
}
}
}
- /** Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the job-local-dir.
- */
- static class TaskControllerJobPathDeletionContext
- extends TaskControllerPathDeletionContext {
- final JobID jobId;
-
- public TaskControllerJobPathDeletionContext(FileSystem fs,
- Path mapredLocalDir, JobID id, String user,
+ public static final boolean isSetsidAvailable = isSetsidSupported();
+ private static boolean isSetsidSupported() {
+ ShellCommandExecutor shexec = null;
+ boolean setsidSupported = true;
+ try {
+ String[] args = {"setsid", "bash", "-c", "echo $$"};
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("setsid is not available on this machine. So not using it.");
+ setsidSupported = false;
+ } finally { // handle the exit code
+ LOG.info("setsid exited with exit code " + shexec.getExitCode());
+ }
+ return setsidSupported;
+ }
+
+ public static class DelayedProcessKiller extends Thread {
+ private final String user;
+ private final int pid;
+ private final long delay;
+ private final Signal signal;
+ private final TaskController taskController;
+ public DelayedProcessKiller(String user, int pid, long delay, Signal signal,
TaskController taskController) {
- super(fs, mapredLocalDir, taskController, user);
- this.jobId = id;
- }
-
- /**
- * Returns the jobLocalDir of the job to be cleaned up.
- */
- @Override
- protected String getPath() {
- return TaskTracker.getLocalJobDir(user, jobId.toString());
+ this.user = user;
+ this.pid = pid;
+ this.delay = delay;
+ this.signal = signal;
+ this.taskController = taskController;
+ setName("Task killer for " + pid);
+ setDaemon(false);
}
-
- /**
- * Makes the path(and its sub-directories recursively) fully deletable by
- * setting proper permissions(770) by task-controller
- */
@Override
- protected void enablePathForCleanup() throws IOException {
- getPathForCleanup();// allow init of fullPath, if not inited already
- if (fs.exists(new Path(fullPath))) {
- taskController.enableJobForCleanup(this);
+ public void run() {
+ try {
+ Thread.sleep(delay);
+ taskController.signalTask(user, pid, signal);
+ } catch (InterruptedException e) {
+ return;
+ } catch (IOException e) {
+ LOG.warn("Exception when killing task " + pid, e);
}
}
}
-
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static class InitializationContext {
- public File workDir;
- public String user;
-
- public InitializationContext() {
- }
-
- public InitializationContext(String user, File workDir) {
- this.user = user;
- this.workDir = workDir;
- }
- }
-
- /**
- * This is used for initializing the private localized files in distributed
- * cache. Initialization would involve changing permission, ownership and etc.
- */
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static class DistributedCacheFileContext extends InitializationContext {
- // base directory under which file has been localized
- Path localizedBaseDir;
- // the unique string used to construct the localized path
- String uniqueString;
-
- public DistributedCacheFileContext(String user, File workDir,
- Path localizedBaseDir, String uniqueString) {
- super(user, workDir);
- this.localizedBaseDir = localizedBaseDir;
- this.uniqueString = uniqueString;
- }
-
- public Path getLocalizedUniqueDir() {
- return new Path(localizedBaseDir, new Path(TaskTracker
- .getPrivateDistributedCacheDir(user), uniqueString));
- }
- }
-
- static class JobInitializationContext extends InitializationContext {
- JobID jobid;
- }
-
- static class DebugScriptContext extends TaskExecContext {
- List<String> args;
- File workDir;
- File stdout;
- }
-
- /**
- * Sends a graceful terminate signal to taskJVM and it sub-processes.
- *
- * @param context task context
- */
- abstract void terminateTask(TaskControllerContext context);
-
- /**
- * Sends a KILL signal to forcefully terminate the taskJVM and its
- * sub-processes.
- *
- * @param context task context
- */
- abstract void killTask(TaskControllerContext context);
-
-
- /**
- * Sends a QUIT signal to direct the task JVM (and sub-processes) to
- * dump their stack to stdout.
- *
- * @param context task context.
- */
- abstract void dumpTaskStack(TaskControllerContext context);
-
- /**
- * Initialize user on this TaskTracer in a TaskController specific manner.
- *
- * @param context
- * @throws IOException
- */
- public abstract void initializeUser(InitializationContext context)
- throws IOException;
-
- /**
- * Launch the task debug script
- *
- * @param context
- * @throws IOException
- */
- abstract void runDebugScript(DebugScriptContext context)
- throws IOException;
-
- /**
- * Enable the task for cleanup by changing permissions of the path
- * @param context path deletion context
- * @throws IOException
- */
- abstract void enableTaskForCleanup(PathDeletionContext context)
- throws IOException;
-
- /**
- * Enable the job for cleanup by changing permissions of the path
- * @param context path deletion context
- * @throws IOException
- */
- abstract void enableJobForCleanup(PathDeletionContext context)
- throws IOException;
-
- /**
- * Returns the local unix user that a given job will run as.
- */
- String getRunAsUser(JobConf conf) {
- return System.getProperty("user.name");
- }
}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java
(original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java
Tue Jun 5 02:33:44 2012
@@ -43,8 +43,8 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -168,7 +168,14 @@ public class TaskLog {
static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
String cleanupSuffix = isCleanup ? ".cleanup" : "";
- return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
+ return getAttemptDir(taskid.getJobID().toString(),
+ taskid.toString() + cleanupSuffix);
+ }
+
+ static File getAttemptDir(String jobid, String taskid) {
+ // taskid should be fully formed and it should have the optional
+ // .cleanup suffix
+ return new File(getJobDir(jobid), taskid);
}
private static long prevOutLength;
private static long prevErrLength;
@@ -487,21 +494,23 @@ public class TaskLog {
String stdout = FileUtil.makeShellPath(stdoutFilename);
String stderr = FileUtil.makeShellPath(stderrFilename);
- StringBuffer mergedCmd = new StringBuffer();
+ StringBuilder mergedCmd = new StringBuilder();
// Export the pid of taskJvm to env variable JVM_PID.
// Currently pid is not used on Windows
if (!Shell.WINDOWS) {
- mergedCmd.append(" export JVM_PID=`echo $$` ; ");
+ mergedCmd.append("export JVM_PID=`echo $$` ; ");
}
- if (setup != null && setup.size() > 0) {
- mergedCmd.append(addCommand(setup, false));
- mergedCmd.append(";");
+ if (setup != null) {
+ for (String s : setup) {
+ mergedCmd.append(s);
+ mergedCmd.append("\n");
+ }
}
if (tailLength > 0) {
mergedCmd.append("(");
- } else if(ProcessTree.isSetsidAvailable && useSetsid &&
+ } else if(TaskController.isSetsidAvailable && useSetsid &&
!Shell.WINDOWS) {
mergedCmd.append("exec setsid ");
} else {
@@ -574,7 +583,7 @@ public class TaskLog {
*/
public static String addCommand(List<String> cmd, boolean isExecutable)
throws IOException {
- StringBuffer command = new StringBuffer();
+ StringBuilder command = new StringBuilder();
for(String s: cmd) {
command.append('\'');
if (isExecutable) {
@@ -623,11 +632,21 @@ public class TaskLog {
/**
* Get the user log directory for the job jobid.
*
- * @param jobid
+ * @param jobid string representation of the jobid
+ * @return user log directory for the job
+ */
+ public static File getJobDir(String jobid) {
+ return new File(getUserLogDir(), jobid);
+ }
+
+ /**
+ * Get the user log directory for the job jobid.
+ *
+ * @param jobid the jobid object
* @return user log directory for the job
*/
public static File getJobDir(JobID jobid) {
- return new File(getUserLogDir(), jobid.toString());
+ return getJobDir(jobid.toString());
}
} // TaskLog
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
(original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
Tue Jun 5 02:33:44 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.TaskTrac
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.StringUtils;
/**
@@ -46,14 +45,13 @@ class TaskMemoryManagerThread extends Th
private static Log LOG = LogFactory.getLog(TaskMemoryManagerThread.class);
private TaskTracker taskTracker;
- private long monitoringInterval;
-
- private long maxMemoryAllowedForAllTasks;
private long maxRssMemoryAllowedForAllTasks;
- private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
- private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
- private List<TaskAttemptID> tasksToBeRemoved;
+ private final long monitoringInterval;
+ private final long maxMemoryAllowedForAllTasks;
+ private final Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
+ private final Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
+ private final List<TaskAttemptID> tasksToBeRemoved;
private static final String MEMORY_USAGE_STRING =
"Memory usage of ProcessTree %s for task-id %s : Virutal %d bytes, " +
@@ -91,7 +89,8 @@ class TaskMemoryManagerThread extends Th
this.monitoringInterval = monitoringInterval;
}
- public void addTask(TaskAttemptID tid, long memLimit, long memLimitPhysical) {
+ public void addTask(TaskAttemptID tid, long memLimit,
+ long memLimitPhysical) {
synchronized (tasksToBeAdded) {
LOG.debug("Tracking ProcessTree " + tid + " for the first time");
ProcessTreeInfo ptInfo =
@@ -204,19 +203,10 @@ class TaskMemoryManagerThread extends Th
if (pId != null) {
// pId will be null, either if the JVM is not spawned yet or if
// the JVM is removed from jvmIdToPid
- long sleeptimeBeforeSigkill =
- taskTracker
- .getJobConf()
- .getLong(
- TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
- ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
- // create process tree object
- ProcfsBasedProcessTree pt =
- new ProcfsBasedProcessTree(pId,
- ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
LOG.debug("Tracking ProcessTree " + pId + " for the first time");
+ ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId,
+ TaskController.isSetsidAvailable);
ptInfo.setPid(pId);
ptInfo.setProcessTree(pt);
}
@@ -280,9 +270,13 @@ class TaskMemoryManagerThread extends Th
// Virtual or physical memory over limit. Fail the task and remove
// the corresponding process tree
LOG.warn(msg);
+ // warn if not a leader
+ if (!pTree.checkPidPgrpidForMatch()) {
+ LOG.error("Killed task process with PID " + pId +
+ " but it is not a process group leader.");
+ }
+ // kill the task
taskTracker.cleanUpOverMemoryTask(tid, true, msg);
- // Now destroy the ProcessTree, remove it from monitoring map.
- pTree.destroy(true/*in the background*/);
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
} else {
@@ -294,8 +288,7 @@ class TaskMemoryManagerThread extends Th
} catch (Exception e) {
// Log the exception and proceed to the next task.
LOG.warn("Uncaught exception in TaskMemoryManager "
- + "while managing memory of " + tid + " : "
- + StringUtils.stringifyException(e));
+ + "while managing memory of " + tid, e);
}
}
@@ -518,8 +511,6 @@ class TaskMemoryManagerThread extends Th
taskTracker.cleanUpOverMemoryTask(tid, false, msg);
// Now destroy the ProcessTree, remove it from monitoring map.
ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
- ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
- pTree.destroy(true/*in the background*/);
processTreeInfoMap.remove(tid);
LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskRunner.java
Tue Jun 5 02:33:44 2012
@@ -20,16 +20,16 @@ package org.apache.hadoop.mapred;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Vector;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -67,21 +67,30 @@ abstract class TaskRunner extends Thread
private boolean exitCodeSet = false;
private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+ static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
private TaskTracker tracker;
private TaskDistributedCacheManager taskDistributedCacheManager;
+ private String[] localdirs;
+ final private static Random rand;
+ static {
+ rand = new Random();
+ }
protected JobConf conf;
JvmManager jvmManager;
public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker,
- JobConf conf) {
+ JobConf conf, TaskTracker.RunningJob rjob
+ ) throws IOException {
this.tip = tip;
this.t = tip.getTask();
this.tracker = tracker;
this.conf = conf;
this.jvmManager = tracker.getJvmManagerInstance();
+ this.localdirs = conf.getLocalDirs();
+ taskDistributedCacheManager = rjob.distCacheMgr;
}
public Task getTask() { return t; }
@@ -154,27 +163,13 @@ abstract class TaskRunner extends Thread
//before preparing the job localize
//all the archives
TaskAttemptID taskid = t.getTaskID();
- final LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
- final File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
-
- // We don't create any symlinks yet, so presence/absence of workDir
- // actually on the file system doesn't matter.
- tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws IOException {
- taskDistributedCacheManager =
- tracker.getTrackerDistributedCacheManager()
- .newTaskDistributedCacheManager(conf);
- taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
- .getPrivateDistributedCacheDir(conf.getUser()),
- TaskTracker.getPublicDistributedCacheDir());
- return null;
- }
- });
-
- // Set up the child task's configuration. After this call, no localization
- // of files should happen in the TaskTracker's process space. Any changes to
- // the conf object after this will NOT be reflected to the child.
- setupChildTaskConfiguration(lDirAlloc);
+ //simply get the location of the workDir and pass it to the child. The
+ //child will do the actual dir creation
+ final File workDir =
+ new File(new Path(localdirs[rand.nextInt(localdirs.length)],
+ TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
+ taskid.toString(),
+ t.isTaskCleanupTask())).toString());
// Build classpath
List<String> classPaths =
@@ -189,7 +184,7 @@ abstract class TaskRunner extends Thread
tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
- List<String> setup = getVMSetupCmd();
+ String setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams
File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
@@ -202,7 +197,20 @@ abstract class TaskRunner extends Thread
errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
taskid, logSize);
- launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
+ // flatten the env as a set of export commands
+ List <String> setupCmds = new ArrayList<String>();
+ for(Entry<String, String> entry : env.entrySet()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("export ");
+ sb.append(entry.getKey());
+ sb.append("=\"");
+ sb.append(entry.getValue());
+ sb.append("\"");
+ setupCmds.add(sb.toString());
+ }
+ setupCmds.add(setup);
+
+ launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
if (exitCodeSet) {
if (!killed && exitCode != 0) {
@@ -231,14 +239,6 @@ abstract class TaskRunner extends Thread
LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
}
} finally {
- try{
- if (taskDistributedCacheManager != null) {
- taskDistributedCacheManager.release();
- }
- }catch(IOException ie){
- LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
- }
-
// It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
// *false* since the task has either
// a) SUCCEEDED - which means commit has been done
@@ -247,11 +247,11 @@ abstract class TaskRunner extends Thread
}
}
- void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,
- File stderr, long logSize, File workDir, Map<String, String> env)
- throws InterruptedException {
+ void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
+ File stderr, long logSize, File workDir)
+ throws InterruptedException, IOException {
jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
- stderr, logSize, workDir, env, conf));
+ stderr, logSize, workDir, conf));
synchronized (lock) {
while (!done) {
lock.wait();
@@ -303,7 +303,7 @@ abstract class TaskRunner extends Thread
.isTaskCleanupTask()), conf);
// write the child's task configuration file to the local disk
- writeLocalTaskFile(localTaskFile.toString(), conf);
+ JobLocalizer.writeLocalJobFile(localTaskFile, conf);
// Set the final job file in the task. The child needs to know the correct
// path to job.xml. So set this path accordingly.
@@ -313,21 +313,21 @@ abstract class TaskRunner extends Thread
/**
* @return
*/
- private List<String> getVMSetupCmd() {
-
- int ulimit = getChildUlimit(conf);
+ private String getVMSetupCmd() {
+ final int ulimit = getChildUlimit(conf);
if (ulimit <= 0) {
- return null;
+ return "";
}
- List<String> setup = null;
- String[] ulimitCmd = Shell.getUlimitMemoryCommand(ulimit);
- if (ulimitCmd != null) {
- setup = new ArrayList<String>();
- for (String arg : ulimitCmd) {
- setup.add(arg);
- }
+ String setup[] = Shell.getUlimitMemoryCommand(ulimit);
+ StringBuilder command = new StringBuilder();
+ for (String str : setup) {
+ command.append('\'');
+ command.append(str);
+ command.append('\'');
+ command.append(" ");
}
- return setup;
+ command.append("\n");
+ return command.toString();
}
/**
@@ -423,7 +423,7 @@ abstract class TaskRunner extends Thread
vargs.add(javaOptsSplit[i]);
}
- Path childTmpDir = createChildTmpDir(workDir, conf);
+ Path childTmpDir = createChildTmpDir(workDir, conf, false);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
// Add classpath.
@@ -473,7 +473,7 @@ abstract class TaskRunner extends Thread
* @throws IOException
*/
static Path createChildTmpDir(File workDir,
- JobConf conf)
+ JobConf conf, boolean createDir)
throws IOException {
// add java.io.tmpdir given by mapreduce.task.tmp.dir
@@ -483,10 +483,13 @@ abstract class TaskRunner extends Thread
// if temp directory path is not absolute, prepend it with workDir.
if (!tmpDir.isAbsolute()) {
tmpDir = new Path(workDir.toString(), tmp);
-
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && localFs.getFileStatus(tmpDir).isFile()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+ if (createDir) {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.mkdirs(tmpDir) &&
+ !localFs.getFileStatus(tmpDir).isDir()) {
+ throw new IOException("Mkdirs failed to create " +
+ tmpDir.toString());
+ }
}
}
return tmpDir;
@@ -533,6 +536,7 @@ abstract class TaskRunner extends Thread
ldLibraryPath.append(oldLdLibraryPath);
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+ env.put(HADOOP_WORK_DIR, workDir.toString());
// put jobTokenFile name into env
String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
@@ -593,25 +597,6 @@ abstract class TaskRunner extends Thread
}
/**
- * Write the task specific job-configuration file.
- *
- * @param localFs
- * @throws IOException
- */
- private static void writeLocalTaskFile(String jobFile, JobConf conf)
- throws IOException {
- Path localTaskFile = new Path(jobFile);
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(localTaskFile, true);
- OutputStream out = localFs.create(localTaskFile);
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
-
- /**
* Prepare the Configs.LOCAL_DIR for the child. The child is sand-boxed now.
* Whenever it uses LocalDirAllocator from now on inside the child, it will
* only see files inside the attempt-directory. This is done in the Child's
@@ -635,15 +620,11 @@ abstract class TaskRunner extends Thread
}
/** Creates the working directory pathname for a task attempt. */
- static File formWorkDir(LocalDirAllocator lDirAlloc,
- TaskAttemptID task, boolean isCleanup, JobConf conf)
+ static Path formWorkDir(LocalDirAllocator lDirAlloc, JobConf conf)
throws IOException {
Path workDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
- conf.getUser(), task.getJobID().toString(), task.toString(),
- isCleanup), conf);
-
- return new File(workDir.toString());
+ lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, conf);
+ return workDir;
}
private static void appendSystemClasspaths(List<String> classPaths) {
@@ -735,7 +716,7 @@ abstract class TaskRunner extends Thread
}
}
- createChildTmpDir(workDir, conf);
+ createChildTmpDir(workDir, conf, true);
}
/**
@@ -759,8 +740,10 @@ abstract class TaskRunner extends Thread
/**
* Kill the child process
+ * @throws InterruptedException
+ * @throws IOException
*/
- public void kill() {
+ public void kill() throws IOException, InterruptedException {
killed = true;
jvmManager.taskKilled(this);
signalDone();
|