Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java Tue Jun 5 02:33:44 2012
@@ -1,20 +1,20 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.hadoop.mapred;
@@ -30,280 +30,264 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DelayedProcessKiller;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
+import static org.apache.hadoop.mapred.TaskController.Signal;
class JvmManager {
- public static final Log LOG =
- LogFactory.getLog(JvmManager.class);
+public static final Log LOG =
+ LogFactory.getLog(JvmManager.class);
- private JvmManagerForType mapJvmManager;
+private JvmManagerForType mapJvmManager;
- private JvmManagerForType reduceJvmManager;
-
- public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
- File stdout,File stderr,long logSize, File workDir,
- Map<String,String> env, JobConf conf) {
- return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf);
- }
-
- public JvmManager(TaskTracker tracker) {
- mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),
- true, tracker);
- reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
- false, tracker);
- }
+private JvmManagerForType reduceJvmManager;
- JvmManagerForType getJvmManagerForType(TaskType type) {
- if (type.equals(TaskType.MAP)) {
- return mapJvmManager;
- } else if (type.equals(TaskType.REDUCE)) {
- return reduceJvmManager;
- }
- return null;
- }
-
- public void stop() {
- mapJvmManager.stop();
- reduceJvmManager.stop();
+public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
+ File stdout,File stderr,long logSize, File workDir,
+ JobConf conf) {
+ return new JvmEnv(setup,vargs,stdout,stderr,workDir,conf);
+}
+
+public JvmManager(TaskTracker tracker) {
+ mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),
+ true, tracker);
+ reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
+ false, tracker);
+}
+
+JvmManagerForType getJvmManagerForType(TaskType type) {
+ if (type.equals(TaskType.MAP)) {
+ return mapJvmManager;
+ } else if (type.equals(TaskType.REDUCE)) {
+ return reduceJvmManager;
}
+ return null;
+}
- public boolean isJvmKnown(JVMId jvmId) {
- if (jvmId.isMapJVM()) {
- return mapJvmManager.isJvmknown(jvmId);
- } else {
- return reduceJvmManager.isJvmknown(jvmId);
- }
+public void stop() throws IOException, InterruptedException {
+ mapJvmManager.stop();
+ reduceJvmManager.stop();
+}
+
+public boolean isJvmKnown(JVMId jvmId) {
+ if (jvmId.isMapJVM()) {
+ return mapJvmManager.isJvmknown(jvmId);
+ } else {
+ return reduceJvmManager.isJvmknown(jvmId);
}
+}
- /*
- * Saves pid of the given taskJvm
- */
- void setPidToJvm(JVMId jvmId, String pid) {
- if (jvmId.isMapJVM()) {
- mapJvmManager.setPidForJvm(jvmId, pid);
- }
- else {
- reduceJvmManager.setPidForJvm(jvmId, pid);
- }
+/*
+ * Saves pid of the given taskJvm
+ */
+void setPidToJvm(JVMId jvmId, String pid) {
+ if (jvmId.isMapJVM()) {
+ mapJvmManager.setPidForJvm(jvmId, pid);
}
-
- /*
- * Returns the pid of the task
- */
- String getPid(TaskRunner t) {
- if (t != null && t.getTask() != null) {
- if (t.getTask().isMapTask()) {
- return mapJvmManager.getPidByRunningTask(t);
- } else {
- return reduceJvmManager.getPidByRunningTask(t);
- }
- }
- return null;
+ else {
+ reduceJvmManager.setPidForJvm(jvmId, pid);
}
-
- public void launchJvm(TaskRunner t, JvmEnv env) {
+}
+
+/*
+ * Returns the pid of the task
+ */
+String getPid(TaskRunner t) {
+ if (t != null && t.getTask() != null) {
if (t.getTask().isMapTask()) {
- mapJvmManager.reapJvm(t, env);
+ return mapJvmManager.getPidByRunningTask(t);
} else {
- reduceJvmManager.reapJvm(t, env);
+ return reduceJvmManager.getPidByRunningTask(t);
}
}
+ return null;
+}
- public TaskInProgress getTaskForJvm(JVMId jvmId)
- throws IOException {
- if (jvmId.isMapJVM()) {
- return mapJvmManager.getTaskForJvm(jvmId);
- } else {
- return reduceJvmManager.getTaskForJvm(jvmId);
- }
- }
- public void taskFinished(TaskRunner tr) {
- if (tr.getTask().isMapTask()) {
- mapJvmManager.taskFinished(tr);
- } else {
- reduceJvmManager.taskFinished(tr);
- }
+public void launchJvm(TaskRunner t, JvmEnv env)
+ throws IOException, InterruptedException {
+ if (t.getTask().isMapTask()) {
+ mapJvmManager.reapJvm(t, env);
+ } else {
+ reduceJvmManager.reapJvm(t, env);
}
+}
- public void taskKilled(TaskRunner tr) {
- if (tr.getTask().isMapTask()) {
- mapJvmManager.taskKilled(tr);
- } else {
- reduceJvmManager.taskKilled(tr);
- }
+public TaskInProgress getTaskForJvm(JVMId jvmId)
+ throws IOException {
+ if (jvmId.isMapJVM()) {
+ return mapJvmManager.getTaskForJvm(jvmId);
+ } else {
+ return reduceJvmManager.getTaskForJvm(jvmId);
}
-
- void dumpStack(TaskRunner tr) {
- if (tr.getTask().isMapTask()) {
- mapJvmManager.dumpStack(tr);
- } else {
- reduceJvmManager.dumpStack(tr);
- }
+}
+public void taskFinished(TaskRunner tr) {
+ if (tr.getTask().isMapTask()) {
+ mapJvmManager.taskFinished(tr);
+ } else {
+ reduceJvmManager.taskFinished(tr);
}
+}
- public void killJvm(JVMId jvmId) {
- if (jvmId.isMap) {
- mapJvmManager.killJvm(jvmId);
- } else {
- reduceJvmManager.killJvm(jvmId);
- }
- }
-
- /**
- * Adds the task's work dir to the cleanup queue of taskTracker for
- * asynchronous deletion of work dir.
- * @param tracker taskTracker
- * @param task the task whose work dir needs to be deleted
- * @throws IOException
- */
- static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
- tracker.getCleanupThread().addToQueue(
- TaskTracker.buildTaskControllerTaskPathDeletionContexts(
- tracker.getLocalFileSystem(),
- tracker.getLocalFiles(tracker.getJobConf(), ""),
- task, true /* workDir */,
- tracker.getTaskController()));
- }
-
- static class JvmManagerForType {
- //Mapping from the JVM IDs to running Tasks
- Map <JVMId,TaskRunner> jvmToRunningTask =
- new HashMap<JVMId, TaskRunner>();
- //Mapping from the tasks to JVM IDs
- Map <TaskRunner,JVMId> runningTaskToJvm =
- new HashMap<TaskRunner, JVMId>();
- //Mapping from the JVM IDs to Reduce JVM processes
- Map <JVMId, JvmRunner> jvmIdToRunner =
- new HashMap<JVMId, JvmRunner>();
-
- int maxJvms;
- boolean isMap;
-
- TaskTracker tracker;
-
- Random rand = new Random(System.currentTimeMillis());
-
- public JvmManagerForType(int maxJvms, boolean isMap,
- TaskTracker tracker) {
- this.maxJvms = maxJvms;
- this.isMap = isMap;
- this.tracker = tracker;
- }
-
- synchronized public void setRunningTaskForJvm(JVMId jvmId,
- TaskRunner t) {
- jvmToRunningTask.put(jvmId, t);
- runningTaskToJvm.put(t,jvmId);
- jvmIdToRunner.get(jvmId).setTaskRunner(t);
- }
-
- synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
- throws IOException {
- if (jvmToRunningTask.containsKey(jvmId)) {
- //Incase of JVM reuse, tasks are returned to previously launched
- //JVM via this method. However when a new task is launched
- //the task being returned has to be initialized.
- TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
- JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
- Task task = taskRunner.getTaskInProgress().getTask();
-
- // Initialize task dirs
- TaskControllerContext context =
- new TaskController.TaskControllerContext();
- context.env = jvmRunner.env;
- context.task = task;
- // If we are returning the same task as which the JVM was launched
- // we don't initialize task once again.
- if (!jvmRunner.env.conf.get(JobContext.TASK_ATTEMPT_ID).equals(
- task.getTaskID().toString())) {
- try {
- tracker.getTaskController().initializeTask(context);
- } catch (IOException e) {
- LOG.warn("Failed to initialize the new task "
- + task.getTaskID().toString() + " to be given to JVM with id "
- + jvmId);
- throw e;
- }
- }
-
- return taskRunner.getTaskInProgress();
- }
- return null;
- }
+public void taskKilled(TaskRunner tr
+ ) throws IOException, InterruptedException {
+ if (tr.getTask().isMapTask()) {
+ mapJvmManager.taskKilled(tr);
+ } else {
+ reduceJvmManager.taskKilled(tr);
+ }
+}
- synchronized String getPidByRunningTask(TaskRunner t) {
- JVMId id = runningTaskToJvm.get(t);
- if (id != null) {
- return jvmIdToRunner.get(id).getPid();
- }
- return null;
- }
+public void killJvm(JVMId jvmId) throws IOException, InterruptedException {
+ if (jvmId.isMap) {
+ mapJvmManager.killJvm(jvmId);
+ } else {
+ reduceJvmManager.killJvm(jvmId);
+ }
+}
- synchronized void setPidForJvm(JVMId jvmId, String pid) {
- JvmRunner runner = jvmIdToRunner.get(jvmId);
- assert runner != null : "Task must have a runner to set a pid";
- runner.setPid(pid);
- }
-
- synchronized public boolean isJvmknown(JVMId jvmId) {
- return jvmIdToRunner.containsKey(jvmId);
- }
+/**
+ * Adds the task's work dir to the cleanup queue of taskTracker for
+ * asynchronous deletion of work dir.
+ * @param tracker taskTracker
+ * @param task the task whose work dir needs to be deleted
+ */
+static void deleteWorkDir(TaskTracker tracker, Task task) {
+ String user = task.getUser();
+ String jobid = task.getJobID().toString();
+ String taskid = task.getTaskID().toString();
+ String workDir = TaskTracker.getTaskWorkDir(user, jobid, taskid,
+ task.isTaskCleanupTask());
+ tracker.getCleanupThread().addToQueue(
+ new TaskController.DeletionContext(tracker.getTaskController(), false,
+ user,
+ workDir, tracker.getLocalDirs()));
+
+}
- synchronized public void taskFinished(TaskRunner tr) {
- JVMId jvmId = runningTaskToJvm.remove(tr);
- if (jvmId != null) {
- jvmToRunningTask.remove(jvmId);
- JvmRunner jvmRunner;
- if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
- jvmRunner.taskRan();
- }
- }
+static class JvmManagerForType {
+ //Mapping from the JVM IDs to running Tasks
+ Map <JVMId,TaskRunner> jvmToRunningTask =
+ new HashMap<JVMId, TaskRunner>();
+ //Mapping from the tasks to JVM IDs
+ Map <TaskRunner,JVMId> runningTaskToJvm =
+ new HashMap<TaskRunner, JVMId>();
+ //Mapping from the JVM IDs to Reduce JVM processes
+ Map <JVMId, JvmRunner> jvmIdToRunner =
+ new HashMap<JVMId, JvmRunner>();
+ //Mapping from the JVM IDs to process IDs
+ Map <JVMId, String> jvmIdToPid =
+ new HashMap<JVMId, String>();
+
+ final int maxJvms;
+ final boolean isMap;
+ final TaskTracker tracker;
+ final long sleeptimeBeforeSigkill;
+ final Random rand = new Random();
+
+ static final String DELAY_BEFORE_KILL_KEY =
+ "mapred.tasktracker.tasks.sleeptime-before-sigkill";
+ // number of milliseconds to wait between TERM and KILL.
+ private static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 250;
+
+ public JvmManagerForType(int maxJvms, boolean isMap,
+ TaskTracker tracker) {
+ this.maxJvms = maxJvms;
+ this.isMap = isMap;
+ this.tracker = tracker;
+ sleeptimeBeforeSigkill =
+ tracker.getJobConf().getLong(DELAY_BEFORE_KILL_KEY,
+ DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ }
+
+ synchronized public void setRunningTaskForJvm(JVMId jvmId,
+ TaskRunner t) {
+ jvmToRunningTask.put(jvmId, t);
+ runningTaskToJvm.put(t,jvmId);
+ jvmIdToRunner.get(jvmId).setBusy(true);
+ }
+
+ synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
+ throws IOException {
+ final TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+ return null == taskRunner ? null : taskRunner.getTaskInProgress();
+ //if (jvmToRunningTask.containsKey(jvmId)) {
+ // //Incase of JVM reuse, tasks are returned to previously launched
+ // //JVM via this method. However when a new task is launched
+ // //the task being returned has to be initialized.
+ // TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+ // // TODO retained for MAPREDUCE-1100
+ // JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+ // Task task = taskRunner.getTaskInProgress().getTask();
+
+ // return taskRunner.getTaskInProgress();
+ //}
+ //return null;
+ }
+
+ synchronized String getPidByRunningTask(TaskRunner t) {
+ JVMId id = runningTaskToJvm.get(t);
+ if (id != null) {
+ return jvmIdToPid.get(id);
}
+ return null;
+ }
- synchronized public void taskKilled(TaskRunner tr) {
- JVMId jvmId = runningTaskToJvm.remove(tr);
- if (jvmId != null) {
- jvmToRunningTask.remove(jvmId);
- killJvm(jvmId);
- }
- }
+ synchronized void setPidForJvm(JVMId jvmId, String pid) {
+ JvmRunner runner = jvmIdToRunner.get(jvmId);
+ assert runner != null : "Task must have a runner to set a pid";
+ jvmIdToPid.put(jvmId, pid);
+ }
+
+ synchronized public boolean isJvmknown(JVMId jvmId) {
+ return jvmIdToRunner.containsKey(jvmId);
+ }
- synchronized public void killJvm(JVMId jvmId) {
+ synchronized public void taskFinished(TaskRunner tr) {
+ JVMId jvmId = runningTaskToJvm.remove(tr);
+ if (jvmId != null) {
+ jvmToRunningTask.remove(jvmId);
JvmRunner jvmRunner;
if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
- killJvmRunner(jvmRunner);
+ jvmRunner.taskRan();
}
}
-
- private synchronized void killJvmRunner(JvmRunner jvmRunner) {
- jvmRunner.kill();
- removeJvm(jvmRunner.jvmId);
- }
+ }
- void dumpStack(TaskRunner tr) {
- JvmRunner jvmRunner = null;
- synchronized (this) {
- JVMId jvmId = runningTaskToJvm.get(tr);
- if (null != jvmId) {
- jvmRunner = jvmIdToRunner.get(jvmId);
- }
- }
+ synchronized public void taskKilled(TaskRunner tr
+ ) throws IOException,
+ InterruptedException {
+ JVMId jvmId = runningTaskToJvm.remove(tr);
+ if (jvmId != null) {
+ jvmToRunningTask.remove(jvmId);
+ killJvm(jvmId);
+ }
+ }
- // Don't want to hold JvmManager lock while dumping stacks for one
- // task.
- if (null != jvmRunner) {
- jvmRunner.dumpChildStacks();
- }
+ synchronized public void killJvm(JVMId jvmId)
+ throws IOException, InterruptedException {
+ JvmRunner jvmRunner;
+ if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
+ killJvmRunner(jvmRunner);
}
+ }
+
+ private synchronized void killJvmRunner(JvmRunner jvmRunner)
+ throws IOException, InterruptedException {
+ jvmRunner.kill();
+ removeJvm(jvmRunner.jvmId);
+ }
- synchronized public void stop() {
+
+ synchronized public void stop()
+ throws IOException, InterruptedException {
//since the kill() method invoked later on would remove
//an entry from the jvmIdToRunner map, we create a
//copy of the values and iterate over it (if we don't
@@ -320,7 +304,7 @@ class JvmManager {
jvmIdToRunner.remove(jvmId);
}
private synchronized void reapJvm(
- TaskRunner t, JvmEnv env) {
+ TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
if (t.getTaskInProgress().wasKilled()) {
//the task was killed in-flight
//no need to do the rest of the operations
@@ -409,7 +393,7 @@ class JvmManager {
private synchronized void spawnNewJvm(JobID jobId, JvmEnv env,
TaskRunner t) {
- JvmRunner jvmRunner = new JvmRunner(env,jobId);
+ JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
//spawn the JVM in a new thread. Note that there will be very little
//extra overhead of launching the new thread for a new JVM since
@@ -443,83 +427,90 @@ class JvmManager {
volatile int numTasksRan;
final int numTasksToRun;
JVMId jvmId;
- private ShellCommandExecutor shexec; // shell terminal for running the task
- //context used for starting JVM
- private TaskControllerContext initalContext;
+ volatile boolean busy = true;
+ private Task firstTask;
- public JvmRunner(JvmEnv env, JobID jobId) {
+ public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) {
this.env = env;
this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
-
- this.initalContext = new TaskControllerContext();
- initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
- .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
- ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ this.firstTask = firstTask;
LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
}
+
+ @Override
public void run() {
- runChild(env);
+ try {
+ runChild(env);
+ } catch (InterruptedException ie) {
+ return;
+ } catch (IOException e) {
+ LOG.warn("Caught IOException in JVMRunner", e);
+ } catch (Throwable e) {
+ LOG.error("Caught Throwable in JVMRunner. Aborting TaskTracker.", e);
+ System.exit(1);
+ } finally {
+ // TODO MR-1100
+ //jvmFinished();
+ }
}
- public void runChild(JvmEnv env) {
+ public void runChild(JvmEnv env)
+ throws IOException, InterruptedException {
+ int exitCode = 0;
try {
env.vargs.add(Integer.toString(jvmId.getId()));
- //Launch the task controller to run task JVM
- initalContext.env = env;
- tracker.getTaskController().launchTaskJVM(initalContext);
+ TaskRunner runner = jvmToRunningTask.get(jvmId);
+ if (runner != null) {
+ Task task = runner.getTask();
+ //Launch the task controller to run task JVM
+ String user = task.getUser();
+ TaskAttemptID taskAttemptId = task.getTaskID();
+ String taskAttemptIdStr = task.isTaskCleanupTask() ?
+ (taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) :
+ taskAttemptId.toString();
+ exitCode = tracker.getTaskController().launchTask(user,
+ jvmId.jobId.toString(), taskAttemptIdStr, env.setup,
+ env.vargs, env.workDir, env.stdout.toString(),
+ env.stderr.toString());
+ }
} catch (IOException ioe) {
// do nothing
// error and output are appropriately redirected
} finally { // handle the exit code
- shexec = initalContext.shExec;
- if (shexec == null) {
- return;
- }
-
+ // although the process has exited before we get here,
+ // make sure the entire process group has also been killed.
kill();
- int exitCode = shexec.getExitCode();
updateOnJvmExit(jvmId, exitCode);
LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
+ ". Number of tasks it ran: " + numTasksRan);
- try {
- // In case of jvm-reuse,
- //the task jvm cleans up the common workdir for every
- //task at the beginning of each task in the task JVM.
- //For the last task, we do it here.
- if (env.conf.getNumTasksToExecutePerJvm() != 1) {
- deleteWorkDir(tracker, initalContext.task);
- }
- } catch (IOException ie){}
- }
- }
-
- synchronized void setPid(String pid) {
- assert initalContext != null;
- initalContext.pid = pid;
- }
-
- synchronized String getPid() {
- if (initalContext != null) {
- return initalContext.pid;
- } else {
- return null;
+ deleteWorkDir(tracker, firstTask);
}
}
/**
- * Kills the process. Also kills its subprocesses if the process(root of subtree
- * of processes) is created using setsid.
+ * Kills the process. Also kills its subprocesses if the process(root of
+ * subtree of processes) is created using setsid.
*/
- synchronized void kill() {
+ synchronized void kill() throws IOException, InterruptedException {
if (!killed) {
TaskController controller = tracker.getTaskController();
// Check inital context before issuing a kill to prevent situations
// where kill is issued before task is launched.
- if (initalContext != null && initalContext.env != null) {
- // Destroy the task jvm
- controller.destroyTaskJVM(initalContext);
+ String pidStr = jvmIdToPid.get(jvmId);
+ if (pidStr != null) {
+ String user = env.conf.getUser();
+ int pid = Integer.parseInt(pidStr);
+ // start a thread that will kill the process dead
+ if (sleeptimeBeforeSigkill > 0) {
+ controller.signalTask(user, pid, Signal.QUIT);
+ controller.signalTask(user, pid, Signal.TERM);
+ new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill,
+ Signal.KILL, tracker.getTaskController()).start();
+ } else {
+ controller.signalTask(user, pid, Signal.KILL);
+ }
} else {
LOG.info(String.format("JVM Not killed %s but just removed", jvmId
.toString()));
@@ -528,46 +519,19 @@ class JvmManager {
}
}
- /** Send a signal to the JVM requesting that it dump a stack trace,
- * and wait for a timeout interval to give this signal time to be
- * processed.
- */
- void dumpChildStacks() {
- if (!killed) {
- TaskController controller = tracker.getTaskController();
- // Check inital context before issuing a signal to prevent situations
- // where signal is issued before task is launched.
- if (initalContext != null && initalContext.env != null) {
- // signal the task jvm
- controller.dumpTaskStack(initalContext);
-
- // We're going to kill the jvm with SIGKILL after this,
- // so we should wait for a few seconds first to ensure that
- // the SIGQUIT has time to be processed.
- try {
- Thread.sleep(initalContext.sleeptimeBeforeSigkill);
- } catch (InterruptedException e) {
- LOG.warn("Sleep interrupted : " +
- StringUtils.stringifyException(e));
- }
- }
- }
- }
-
- public synchronized void taskRan() {
- initalContext.task = null;
+ public void taskRan() {
+ busy = false;
numTasksRan++;
}
public boolean ranAll() {
return(numTasksRan == numTasksToRun);
}
- public synchronized void setTaskRunner(TaskRunner runner) {
- initalContext.task = runner.getTask();
- assert initalContext.task != null;
+ public void setBusy(boolean busy) {
+ this.busy = busy;
}
public synchronized boolean isBusy() {
- return initalContext.task != null;
+ return busy;
}
}
}
@@ -577,19 +541,15 @@ class JvmManager {
File stdout;
File stderr;
File workDir;
- long logSize;
JobConf conf;
- Map<String, String> env;
- public JvmEnv(List<String> setup, Vector<String> vargs, File stdout,
- File stderr, long logSize, File workDir, Map<String,String> env,
- JobConf conf) {
+ public JvmEnv(List <String> setup, Vector<String> vargs, File stdout,
+ File stderr, File workDir, JobConf conf) {
this.setup = setup;
this.vargs = vargs;
this.stdout = stdout;
this.stderr = stderr;
this.workDir = workDir;
- this.env = env;
this.conf = conf;
}
}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Jun 5 02:33:44 2012
@@ -14,30 +14,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.hadoop.mapred;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.PrintWriter;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.TaskController.Signal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A {@link TaskController} that runs the task JVMs as the user
@@ -48,8 +45,8 @@ import org.apache.hadoop.util.Shell.Shel
* JVM and killing it when needed, and also initializing and
* finalizing the task environment.
* <p> The setuid executable is launched using the command line:</p>
- * <p>task-controller mapreduce.job.user.name command command-args, where</p>
- * <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
+ * <p>task-controller user-name command command-args, where</p>
+ * <p>user-name is the name of the owner who submits the job</p>
* <p>command is one of the cardinal value of the
* {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
* <p>command-args depends on the command being launched.</p>
@@ -62,52 +59,78 @@ class LinuxTaskController extends TaskCo
private static final Log LOG =
LogFactory.getLog(LinuxTaskController.class);
-
- // Name of the executable script that will contain the child
- // JVM command line. See writeCommand for details.
- private static final String COMMAND_FILE = "taskjvm.sh";
// Path to the setuid executable.
- private static String taskControllerExe;
+ private String taskControllerExe;
+ private static final String TASK_CONTROLLER_EXEC_KEY =
+ "mapreduce.tasktracker.task-controller.exe";
- static {
- // the task-controller is expected to be under the $HADOOP_HOME/bin
- // directory.
- File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
- taskControllerExe =
- new File(hadoopBin, "task-controller").getAbsolutePath();
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ taskControllerExe = getTaskControllerExecutablePath(conf);
}
-
+
public LinuxTaskController() {
super();
}
-
+
+ protected String getTaskControllerExecutablePath(Configuration conf) {
+ File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+ String defaultTaskController =
+ new File(hadoopBin, "task-controller").getAbsolutePath();
+ return null == conf
+ ? defaultTaskController
+ : conf.get(TASK_CONTROLLER_EXEC_KEY, defaultTaskController);
+ }
+
/**
* List of commands that the setuid script will execute.
*/
- enum TaskControllerCommands {
- INITIALIZE_USER,
- INITIALIZE_JOB,
- INITIALIZE_DISTRIBUTEDCACHE_FILE,
- LAUNCH_TASK_JVM,
- INITIALIZE_TASK,
- TERMINATE_TASK_JVM,
- KILL_TASK_JVM,
- RUN_DEBUG_SCRIPT,
- SIGQUIT_TASK_JVM,
- ENABLE_TASK_FOR_CLEANUP,
- ENABLE_JOB_FOR_CLEANUP
+ enum Commands {
+ INITIALIZE_JOB(0),
+ LAUNCH_TASK_JVM(1),
+ SIGNAL_TASK(2),
+ DELETE_AS_USER(3),
+ DELETE_LOG_AS_USER(4);
+
+ private int value;
+ Commands(int value) {
+ this.value = value;
+ }
+ int getValue() {
+ return value;
+ }
+ }
+
+ /**
+ * Result codes returned from the C task-controller.
+ * These must match the values in task-controller.h.
+ */
+ enum ResultCode {
+ OK(0),
+ INVALID_USER_NAME(2),
+ INVALID_TASK_PID(9),
+ INVALID_TASKCONTROLLER_PERMISSIONS(22),
+ INVALID_CONFIG_FILE(24);
+
+ private final int value;
+ ResultCode(int value) {
+ this.value = value;
+ }
+ int getValue() {
+ return value;
+ }
}
@Override
- public void setup() throws IOException {
- super.setup();
-
+ public void setup(LocalDirAllocator allocator) throws IOException {
+
// Check the permissions of the task-controller binary by running it plainly.
- // If permissions are correct, it returns an error code 1, else it returns
+ // If permissions are correct, it returns an error code 1, else it returns
// 24 or something else if some other bugs are also present.
String[] taskControllerCmd =
- new String[] { getTaskControllerExecutablePath() };
+ new String[] { taskControllerExe };
ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
try {
shExec.execute();
@@ -120,52 +143,96 @@ class LinuxTaskController extends TaskCo
+ "permissions/ownership with exit code " + exitCode, e);
}
}
+ this.allocator = allocator;
}
+
- /**
- * Launch a task JVM that will run as the owner of the job.
- *
- * This method launches a task JVM by executing a setuid executable that will
- * switch to the user and run the task. Also does initialization of the first
- * task in the same setuid process launch.
- */
@Override
- void launchTaskJVM(TaskController.TaskControllerContext context)
- throws IOException {
- JvmEnv env = context.env;
- // get the JVM command line.
- String cmdLine =
- TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
- env.logSize, true);
-
- StringBuffer sb = new StringBuffer();
- //export out all the environment variable before child command as
- //the setuid/setgid binaries would not be getting, any environmental
- //variables which begin with LD_*.
- for(Entry<String, String> entry : env.env.entrySet()) {
- sb.append("export ");
- sb.append(entry.getKey());
- sb.append("=");
- sb.append(entry.getValue());
- sb.append("\n");
- }
- sb.append(cmdLine);
- // write the command to a file in the
- // task specific cache directory
- writeCommand(sb.toString(), getTaskCacheDirectory(context,
- context.env.workDir));
-
- // Call the taskcontroller with the right parameters.
- List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context,
- context.env.workDir);
- ShellCommandExecutor shExec = buildTaskControllerExecutor(
- TaskControllerCommands.LAUNCH_TASK_JVM,
- env.conf.getUser(),
- launchTaskJVMArgs, env.workDir, env.env);
- context.shExec = shExec;
+ public void initializeJob(String user, String jobid, Path credentials,
+ Path jobConf, TaskUmbilicalProtocol taskTracker,
+ InetSocketAddress ttAddr
+ ) throws IOException, InterruptedException {
+ List<String> command = new ArrayList<String>(
+ Arrays.asList(taskControllerExe,
+ user,
+ Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+ jobid,
+ credentials.toUri().getPath().toString(),
+ jobConf.toUri().getPath().toString()));
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+ command.add(jvm.toString());
+ command.add("-classpath");
+ command.add(System.getProperty("java.class.path"));
+ command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir());
+ command.add("-Dhadoop.root.logger=INFO,console");
+ command.add(JobLocalizer.class.getName()); // main of JobLocalizer
+ command.add(user);
+ command.add(jobid);
+ // add the task tracker's reporting address
+ command.add(ttAddr.getHostName());
+ command.add(Integer.toString(ttAddr.getPort()));
+ String[] commandArray = command.toArray(new String[0]);
+ ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("initializeJob: " + Arrays.toString(commandArray));
+ }
try {
shExec.execute();
+ if (LOG.isDebugEnabled()) {
+ logOutput(shExec.getOutput());
+ }
+ } catch (ExitCodeException e) {
+ int exitCode = shExec.getExitCode();
+ logOutput(shExec.getOutput());
+ throw new IOException("Job initialization failed (" + exitCode + ")", e);
+ }
+ }
+
+ @Override
+ public int launchTask(String user,
+ String jobId,
+ String attemptId,
+ List<String> setup,
+ List<String> jvmArguments,
+ File currentWorkDirectory,
+ String stdout,
+ String stderr) throws IOException {
+
+ ShellCommandExecutor shExec = null;
+ try {
+ FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+ long logSize = 0; //TODO, Ref BUG:2854624
+ // get the JVM command line.
+ String cmdLine =
+ TaskLog.buildCommandLine(setup, jvmArguments,
+ new File(stdout), new File(stderr), logSize, true);
+
+ // write the command to a file in the
+ // task specific cache directory
+ Path p = new Path(allocator.getLocalPathForWrite(
+ TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+ getConf()), COMMAND_FILE);
+ String commandFile = writeCommand(cmdLine, rawFs, p);
+
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
+ jobId,
+ attemptId,
+ currentWorkDirectory.toString(),
+ commandFile};
+ shExec = new ShellCommandExecutor(command);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("launchTask: " + Arrays.toString(command));
+ }
+ shExec.execute();
} catch (Exception e) {
+ if (shExec == null) {
+ return -1;
+ }
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from task is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
@@ -177,481 +244,78 @@ class LinuxTaskController extends TaskCo
LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
logOutput(shExec.getOutput());
}
- throw new IOException(e);
+ return exitCode;
}
if (LOG.isDebugEnabled()) {
- LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ LOG.debug("Output from LinuxTaskController's launchTask follows:");
logOutput(shExec.getOutput());
}
+ return 0;
}
-
- /**
- * Launch the debug script process that will run as the owner of the job.
- *
- * This method launches the task debug script process by executing a setuid
- * executable that will switch to the user and run the task.
- */
+
@Override
- void runDebugScript(DebugScriptContext context) throws IOException {
- String debugOut = FileUtil.makeShellPath(context.stdout);
- String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
- writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
- // Call the taskcontroller with the right parameters.
- List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
- runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(),
- launchTaskJVMArgs, context.workDir, null);
- }
- /**
- * Helper method that runs a LinuxTaskController command
- *
- * @param taskControllerCommand
- * @param user
- * @param cmdArgs
- * @param env
- * @throws IOException
- */
- private void runCommand(TaskControllerCommands taskControllerCommand,
- String user, List<String> cmdArgs, File workDir, Map<String, String> env)
- throws IOException {
-
- ShellCommandExecutor shExec =
- buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs,
- workDir, env);
- try {
- shExec.execute();
- } catch (Exception e) {
- LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
- + shExec.getExitCode());
- LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
- + StringUtils.stringifyException(e));
- LOG.info("Output from LinuxTaskController's "
- + taskControllerCommand.toString() + " follows:");
- logOutput(shExec.getOutput());
- throw new IOException(e);
+ public void deleteAsUser(String user, String subDir, String... baseDirs)
+ throws IOException {
+ List<String> command = new ArrayList<String>(
+ Arrays.asList(
+ taskControllerExe,
+ user,
+ Integer.toString(Commands.DELETE_AS_USER.getValue()),
+ subDir));
+ for (String baseDir : baseDirs) {
+ command.add(baseDir);
}
+ String[] commandArray = command.toArray(new String[0]);
+ ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
if (LOG.isDebugEnabled()) {
- LOG.info("Output from LinuxTaskController's "
- + taskControllerCommand.toString() + " follows:");
- logOutput(shExec.getOutput());
- }
- }
-
- /**
- * Returns list of arguments to be passed while initializing a new task. See
- * {@code buildTaskControllerExecutor(TaskControllerCommands, String,
- * List<String>, JvmEnv)} documentation.
- *
- * @param context
- * @return Argument to be used while launching Task VM
- */
- private List<String> buildInitializeTaskArgs(TaskExecContext context) {
- List<String> commandArgs = new ArrayList<String>(3);
- String taskId = context.task.getTaskID().toString();
- String jobId = getJobId(context);
- commandArgs.add(jobId);
- if (!context.task.isTaskCleanupTask()) {
- commandArgs.add(taskId);
- } else {
- commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+ LOG.debug("deleteAsUser: " + Arrays.toString(commandArray));
}
- return commandArgs;
+ shExec.execute();
}
@Override
- void initializeTask(TaskControllerContext context)
- throws IOException {
+ public void deleteLogAsUser(String user, String subDir) throws IOException {
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
+ subDir};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(command);
if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do "
- + TaskControllerCommands.INITIALIZE_TASK.toString()
- + " for " + context.task.getTaskID().toString());
- }
- runCommand(TaskControllerCommands.INITIALIZE_TASK,
- context.env.conf.getUser(),
- buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
- }
-
- /**
- * Builds the args to be passed to task-controller for enabling of task for
- * cleanup. Last arg in this List is either $attemptId or $attemptId/work
- */
- private List<String> buildTaskCleanupArgs(
- TaskControllerTaskPathDeletionContext context) {
- List<String> commandArgs = new ArrayList<String>(3);
- commandArgs.add(context.mapredLocalDir.toUri().getPath());
- commandArgs.add(context.task.getJobID().toString());
-
- String workDir = "";
- if (context.isWorkDir) {
- workDir = "/work";
- }
- if (context.task.isTaskCleanupTask()) {
- commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
- + workDir);
- } else {
- commandArgs.add(context.task.getTaskID() + workDir);
+ LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
}
-
- return commandArgs;
+ shExec.execute();
}
- /**
- * Builds the args to be passed to task-controller for enabling of job for
- * cleanup. Last arg in this List is $jobid.
- */
- private List<String> buildJobCleanupArgs(
- TaskControllerJobPathDeletionContext context) {
- List<String> commandArgs = new ArrayList<String>(2);
- commandArgs.add(context.mapredLocalDir.toUri().getPath());
- commandArgs.add(context.jobId.toString());
-
- return commandArgs;
- }
-
- /**
- * Enables the task for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
- @Override
- void enableTaskForCleanup(PathDeletionContext context)
- throws IOException {
- if (context instanceof TaskControllerTaskPathDeletionContext) {
- TaskControllerTaskPathDeletionContext tContext =
- (TaskControllerTaskPathDeletionContext) context;
- enablePathForCleanup(tContext,
- TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
- buildTaskCleanupArgs(tContext));
- }
- else {
- throw new IllegalArgumentException("PathDeletionContext provided is not "
- + "TaskControllerTaskPathDeletionContext.");
- }
- }
-
- /**
- * Enables the job for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
@Override
- void enableJobForCleanup(PathDeletionContext context)
- throws IOException {
- if (context instanceof TaskControllerJobPathDeletionContext) {
- TaskControllerJobPathDeletionContext tContext =
- (TaskControllerJobPathDeletionContext) context;
- enablePathForCleanup(tContext,
- TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
- buildJobCleanupArgs(tContext));
- } else {
- throw new IllegalArgumentException("PathDeletionContext provided is not "
- + "TaskControllerJobPathDeletionContext.");
- }
- }
-
- /**
- * Enable a path for cleanup
- * @param c {@link TaskControllerPathDeletionContext} for the path to be
- * cleaned up
- * @param command {@link TaskControllerCommands} for task/job cleanup
- * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable
- * path cleanup
- */
- private void enablePathForCleanup(TaskControllerPathDeletionContext c,
- TaskControllerCommands command,
- List<String> cleanupArgs) {
+ public boolean signalTask(String user, int taskPid,
+ Signal signal) throws IOException {
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.SIGNAL_TASK.getValue()),
+ Integer.toString(taskPid),
+ Integer.toString(signal.getValue())};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(command);
if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
- }
-
- if ( c.user != null && c.fs instanceof LocalFileSystem) {
- try {
- runCommand(command, c.user, cleanupArgs, null, null);
- } catch(IOException e) {
- LOG.warn("Unable to change permissions for " + c.fullPath);
- }
- }
- else {
- throw new IllegalArgumentException("Either user is null or the "
- + "file system is not local file system.");
- }
- }
-
- private void logOutput(String output) {
- String shExecOutput = output;
- if (shExecOutput != null) {
- for (String str : shExecOutput.split("\n")) {
- LOG.info(str);
- }
+ LOG.debug("signalTask: " + Arrays.toString(command));
}
- }
-
- private String getJobId(TaskExecContext context) {
- String taskId = context.task.getTaskID().toString();
- TaskAttemptID tId = TaskAttemptID.forName(taskId);
- String jobId = tId.getJobID().toString();
- return jobId;
- }
-
- /**
- * Returns list of arguments to be passed while launching task VM.
- * See {@code buildTaskControllerExecutor(TaskControllerCommands,
- * String, List<String>, JvmEnv)} documentation.
- * @param context
- * @return Argument to be used while launching Task VM
- */
- private List<String> buildLaunchTaskArgs(TaskExecContext context,
- File workDir) {
- List<String> commandArgs = new ArrayList<String>(3);
- LOG.debug("getting the task directory as: "
- + getTaskCacheDirectory(context, workDir));
- LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context, workDir)),
- context) );
- commandArgs.add(getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context, workDir)),
- context));
- commandArgs.addAll(buildInitializeTaskArgs(context));
- return commandArgs;
- }
-
- // Get the directory from the list of directories configured
- // in Configs.LOCAL_DIR chosen for storing data pertaining to
- // this task.
- private String getDirectoryChosenForTask(File directory,
- TaskExecContext context) {
- String jobId = getJobId(context);
- String taskId = context.task.getTaskID().toString();
- for (String dir : mapredLocalDirs) {
- File mapredDir = new File(dir);
- File taskDir =
- new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
- .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
- .getParentFile();
- if (directory.equals(taskDir)) {
- return dir;
- }
- }
-
- LOG.error("Couldn't parse task cache directory correctly");
- throw new IllegalArgumentException("invalid task cache directory "
- + directory.getAbsolutePath());
- }
-
- /**
- * Builds the command line for launching/terminating/killing task JVM.
- * Following is the format for launching/terminating/killing task JVM
- * <br/>
- * For launching following is command line argument:
- * <br/>
- * {@code mapreduce.job.user.name command tt-root job_id task_id}
- * <br/>
- * For terminating/killing task jvm.
- * {@code mapreduce.job.user.name command tt-root task-pid}
- *
- * @param command command to be executed.
- * @param userName mapreduce.job.user.name
- * @param cmdArgs list of extra arguments
- * @param workDir working directory for the task-controller
- * @param env JVM environment variables.
- * @return {@link ShellCommandExecutor}
- * @throws IOException
- */
- private ShellCommandExecutor buildTaskControllerExecutor(
- TaskControllerCommands command, String userName, List<String> cmdArgs,
- File workDir, Map<String, String> env)
- throws IOException {
- String[] taskControllerCmd = new String[3 + cmdArgs.size()];
- taskControllerCmd[0] = getTaskControllerExecutablePath();
- taskControllerCmd[1] = userName;
- taskControllerCmd[2] = String.valueOf(command.ordinal());
- int i = 3;
- for (String cmdArg : cmdArgs) {
- taskControllerCmd[i++] = cmdArg;
- }
- if (LOG.isDebugEnabled()) {
- for (String cmd : taskControllerCmd) {
- LOG.debug("taskctrl command = " + cmd);
- }
- }
- ShellCommandExecutor shExec = null;
- if(workDir != null && workDir.exists()) {
- shExec = new ShellCommandExecutor(taskControllerCmd,
- workDir, env);
- } else {
- shExec = new ShellCommandExecutor(taskControllerCmd);
- }
-
- return shExec;
- }
-
- // Return the task specific directory under the cache.
- private String getTaskCacheDirectory(TaskExecContext context,
- File workDir) {
- // In the case of JVM reuse, the task specific directory
- // is different from what is set with respect with
- // env.workDir. Hence building this from the taskId everytime.
- String taskId = context.task.getTaskID().toString();
- File cacheDirForJob = workDir.getParentFile().getParentFile();
- if(context.task.isTaskCleanupTask()) {
- taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
- }
- return new File(cacheDirForJob, taskId).getAbsolutePath();
- }
-
- // Write the JVM command line to a file under the specified directory
- // Note that the JVM will be launched using a setuid executable, and
- // could potentially contain strings defined by a user. Hence, to
- // prevent special character attacks, we write the command line to
- // a file and execute it.
- private void writeCommand(String cmdLine,
- String directory) throws IOException {
-
- PrintWriter pw = null;
- String commandFile = directory + File.separator + COMMAND_FILE;
- LOG.info("Writing commands to " + commandFile);
- LOG.info("--------Commands Begin--------");
- LOG.info(cmdLine);
- LOG.info("--------Commands End--------");
- try {
- FileWriter fw = new FileWriter(commandFile);
- BufferedWriter bw = new BufferedWriter(fw);
- pw = new PrintWriter(bw);
- pw.write(cmdLine);
- } catch (IOException ioe) {
- LOG.error("Caught IOException while writing JVM command line to file. "
- + ioe.getMessage());
- } finally {
- if (pw != null) {
- pw.close();
- }
- // set execute permissions for all on the file.
- File f = new File(commandFile);
- if (f.exists()) {
- f.setReadable(true, false);
- f.setExecutable(true, false);
- }
- }
- }
-
- private List<String> buildInitializeJobCommandArgs(
- JobInitializationContext context) {
- List<String> initJobCmdArgs = new ArrayList<String>();
- initJobCmdArgs.add(context.jobid.toString());
- return initJobCmdArgs;
- }
-
- @Override
- void initializeJob(JobInitializationContext context)
- throws IOException {
- LOG.debug("Going to initialize job " + context.jobid.toString()
- + " on the TT");
- runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
- buildInitializeJobCommandArgs(context), context.workDir, null);
- }
-
- @Override
- public void initializeDistributedCacheFile(DistributedCacheFileContext context)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Going to initialize distributed cache for " + context.user
- + " with localizedBaseDir " + context.localizedBaseDir +
- " and uniqueString " + context.uniqueString);
- }
- List<String> args = new ArrayList<String>();
- // Here, uniqueString might start with '-'. Adding -- in front of the
- // arguments indicates that they are non-option parameters.
- args.add("--");
- args.add(context.localizedBaseDir.toString());
- args.add(context.uniqueString);
- runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE,
- context.user, args, context.workDir, null);
- }
-
- @Override
- public void initializeUser(InitializationContext context)
- throws IOException {
- LOG.debug("Going to initialize user directories for " + context.user
- + " on the TT");
- runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
- new ArrayList<String>(), context.workDir, null);
- }
-
- /**
- * API which builds the command line to be pass to LinuxTaskController
- * binary to terminate/kill the task. See
- * {@code buildTaskControllerExecutor(TaskControllerCommands,
- * String, List<String>, JvmEnv)} documentation.
- *
- *
- * @param context context of task which has to be passed kill signal.
- *
- */
- private List<String> buildKillTaskCommandArgs(TaskControllerContext
- context){
- List<String> killTaskJVMArgs = new ArrayList<String>();
- killTaskJVMArgs.add(context.pid);
- return killTaskJVMArgs;
- }
-
- /**
- * Convenience method used to sending appropriate signal to the task
- * VM
- * @param context
- * @param command
- * @throws IOException
- */
- protected void signalTask(TaskControllerContext context,
- TaskControllerCommands command) throws IOException{
- if(context.task == null) {
- LOG.info("Context task is null; not signaling the JVM");
- return;
- }
- ShellCommandExecutor shExec = buildTaskControllerExecutor(
- command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env.workDir,
- context.env.env);
try {
shExec.execute();
- } catch (Exception e) {
- LOG.warn("Output from task-contoller is : " + shExec.getOutput());
- throw new IOException(e);
- }
- }
-
- @Override
- void terminateTask(TaskControllerContext context) {
- try {
- signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Exception thrown while sending kill to the Task VM " +
- StringUtils.stringifyException(e));
- }
- }
-
- @Override
- void killTask(TaskControllerContext context) {
- try {
- signalTask(context, TaskControllerCommands.KILL_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Exception thrown while sending destroy to the Task VM " +
- StringUtils.stringifyException(e));
- }
- }
-
- @Override
- void dumpTaskStack(TaskControllerContext context) {
- try {
- signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
- StringUtils.stringifyException(e));
+ } catch (ExitCodeException e) {
+ int ret_code = shExec.getExitCode();
+ if (ret_code == ResultCode.INVALID_TASK_PID.getValue()) {
+ return false;
+ }
+ logOutput(shExec.getOutput());
+ throw new IOException("Problem signalling task " + taskPid + " with " +
+ signal + "; exit = " + ret_code);
}
- }
-
- protected String getTaskControllerExecutablePath() {
- return taskControllerExe;
+ return true;
}
@Override
- String getRunAsUser(JobConf conf) {
+ public String getRunAsUser(JobConf conf) {
return conf.getUser();
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jun 5 02:33:44 2012
@@ -38,7 +38,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -79,6 +78,7 @@ public class LocalJobRunner implements C
private AtomicInteger map_tasks = new AtomicInteger(0);
private int reduce_tasks = 0;
final Random rand = new Random();
+ private final TaskController taskController = new DefaultTaskController();
private JobTrackerInstrumentation myMetrics = null;
@@ -116,7 +116,7 @@ public class LocalJobRunner implements C
private FileSystem localFs;
boolean killed = false;
- private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+ private TrackerDistributedCacheManager trackerDistributedCacheManager;
private TaskDistributedCacheManager taskDistributedCacheManager;
public long getProtocolVersion(String protocol, long clientVersion) {
@@ -134,14 +134,12 @@ public class LocalJobRunner implements C
// Manage the distributed cache. If there are files to be copied,
// this will trigger localFile to be re-written again.
- this.trackerDistributerdCacheManager =
- new TrackerDistributedCacheManager(conf, new DefaultTaskController());
+ this.trackerDistributedCacheManager =
+ new TrackerDistributedCacheManager(conf);
this.taskDistributedCacheManager =
- trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
- taskDistributedCacheManager.setup(
- new LocalDirAllocator(MRConfig.LOCAL_DIR),
- new File(systemJobDir.toString()),
- "archive", "archive");
+ trackerDistributedCacheManager.newTaskDistributedCacheManager(
+ jobid, conf);
+ taskDistributedCacheManager.setupCache(conf, "archive", "archive");
if (DistributedCache.getSymlink(conf)) {
// This is not supported largely because,
@@ -458,7 +456,7 @@ public class LocalJobRunner implements C
localFs.delete(localJobFile, true); // delete local copy
// Cleanup distributed cache
taskDistributedCacheManager.release();
- trackerDistributerdCacheManager.purgeCache();
+ trackerDistributedCacheManager.purgeCache();
} catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e);
}
@@ -532,6 +530,14 @@ public class LocalJobRunner implements C
public boolean ping(TaskAttemptID taskid) throws IOException {
return true;
}
+
+ @Override
+ public void updatePrivateDistributedCacheSizes(
+ org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes)
+ throws IOException {
+ trackerDistributedCacheManager.setArchiveSizes(jobId, sizes);
+ }
public boolean canCommit(TaskAttemptID taskid)
throws IOException {
@@ -578,6 +584,7 @@ public class LocalJobRunner implements C
this.fs = FileSystem.getLocal(conf);
this.conf = conf;
myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
+ taskController.setConf(conf);
}
// JobSubmissionProtocol methods
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java Tue Jun 5 02:33:44 2012
@@ -129,8 +129,10 @@ class MapTask extends Task {
@Override
public TaskRunner createRunner(TaskTracker tracker,
- TaskTracker.TaskInProgress tip) {
- return new MapTaskRunner(tip, tracker, this.conf);
+ TaskTracker.TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException {
+ return new MapTaskRunner(tip, tracker, this.conf, rjob);
}
@Override
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Tue Jun 5 02:33:44 2012
@@ -17,14 +17,17 @@
*/
package org.apache.hadoop.mapred;
+import java.io.IOException;
+
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.log4j.Level;
/** Runs a map task. */
class MapTaskRunner extends TaskRunner {
- public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
- super(task, tracker, conf);
+ public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf,
+ TaskTracker.RunningJob rjob) throws IOException {
+ super(task, tracker, conf, rjob);
}
@Override
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jun 5 02:33:44 2012
@@ -141,9 +141,10 @@ public class ReduceTask extends Task {
}
@Override
- public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
- throws IOException {
- return new ReduceTaskRunner(tip, tracker, this.conf);
+ public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException {
+ return new ReduceTaskRunner(tip, tracker, this.conf, rjob);
}
@Override
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Jun 5 02:33:44 2012
@@ -26,9 +26,10 @@ import org.apache.log4j.Level;
class ReduceTaskRunner extends TaskRunner {
public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker,
- JobConf conf) throws IOException {
+ JobConf conf, TaskTracker.RunningJob rjob
+ ) throws IOException {
- super(task, tracker, conf);
+ super(task, tracker, conf, rjob);
}
public void close() throws IOException {
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Tue Jun 5 02:33:44 2012
@@ -468,7 +468,9 @@ abstract public class Task implements Wr
/** Return an approprate thread runner for this task.
* @param tip TODO*/
public abstract TaskRunner createRunner(TaskTracker tracker,
- TaskTracker.TaskInProgress tip) throws IOException;
+ TaskTracker.TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException;
/** The number of milliseconds between progress reports. */
public static final int PROGRESS_INTERVAL = 3000;
|