Added: hadoop/common/branches/branch-0.22/mapreduce/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/c++/task-controller/tests/test-task-controller.c (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/c++/task-controller/tests/test-task-controller.c Tue Jun 5 01:02:16 2012
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../task-controller.h"
+
+#define HADOOP_CONF_DIR "/tmp"
+
+int write_config_file(char *file_name) {
+ FILE *file;
+ char const *str =
+ "mapreduce.cluster.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
+
+ file = fopen(file_name, "w");
+ if (file == NULL) {
+ printf("Failed to open %s.\n", file_name);
+ return EXIT_FAILURE;
+ }
+ fwrite(str, 1, strlen(str), file);
+ fclose(file);
+ return 0;
+}
+
+void test_check_variable_against_config() {
+
+ // A temporary configuration directory
+ char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX";
+
+ // To accomodate "/conf/taskcontroller.cfg"
+ char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")];
+
+ strcpy(template, conf_dir_templ);
+ char *temp_dir = mkdtemp(template);
+ if (temp_dir == NULL) {
+ printf("Couldn't create a temporary dir for conf.\n");
+ goto cleanup;
+ }
+
+ // Set the configuration directory
+ hadoop_conf_dir = strdup(temp_dir);
+
+ // create the configuration directory
+ strcat(template, "/conf");
+ char *conf_dir = strdup(template);
+ mkdir(conf_dir, S_IRWXU);
+
+ // create the configuration file
+ strcat(template, "/taskcontroller.cfg");
+ if (write_config_file(template) != 0) {
+ printf("Couldn't write the configuration file.\n");
+ goto cleanup;
+ }
+
+ // Test obtaining a value for a key from the config
+ char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
+ "/tmp/testing3", "/tmp/testing4" };
+ char *value = (char *) get_value("mapreduce.cluster.local.dir");
+ if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
+ != 0) {
+ printf("Obtaining a value for a key from the config failed.\n");
+ goto cleanup;
+ }
+
+ // Test the parsing of a multiple valued key from the config
+ char **values = (char **)get_values("mapreduce.cluster.local.dir");
+ char **values_ptr = values;
+ int i = 0;
+ while (*values_ptr != NULL) {
+ printf(" value : %s\n", *values_ptr);
+ if (strcmp(*values_ptr, config_values[i++]) != 0) {
+ printf("Configured values are not read out properly. Test failed!");
+ goto cleanup;;
+ }
+ values_ptr++;
+ }
+
+ if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing5") == 0) {
+ printf("Configuration should not contain /tmp/testing5! \n");
+ goto cleanup;
+ }
+
+ if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing4") != 0) {
+ printf("Configuration should contain /tmp/testing4! \n");
+ goto cleanup;
+ }
+
+ cleanup: if (value != NULL) {
+ free(value);
+ }
+ if (values != NULL) {
+ free(values);
+ }
+ if (hadoop_conf_dir != NULL) {
+ free(hadoop_conf_dir);
+ }
+ unlink(template);
+ rmdir(conf_dir);
+ rmdir(hadoop_conf_dir);
+}
+
+void test_get_user_directory() {
+ char *user_dir = (char *) get_user_directory("/tmp", "user");
+ printf("user_dir obtained is %s\n", user_dir);
+ int ret = 0;
+ if (strcmp(user_dir, "/tmp/taskTracker/user") != 0) {
+ ret = -1;
+ }
+ free(user_dir);
+ assert(ret == 0);
+}
+
+void test_get_job_directory() {
+ char *job_dir = (char *) get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ printf("job_dir obtained is %s\n", job_dir);
+ int ret = 0;
+ if (strcmp(job_dir, "/tmp/taskTracker/user/jobcache/job_200906101234_0001")
+ != 0) {
+ ret = -1;
+ }
+ free(job_dir);
+ assert(ret == 0);
+}
+
+void test_get_attempt_directory() {
+ char *job_dir = (char *) get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ printf("job_dir obtained is %s\n", job_dir);
+ char *attempt_dir = (char *) get_attempt_directory(job_dir,
+ "attempt_200906101234_0001_m_000000_0");
+ printf("attempt_dir obtained is %s\n", attempt_dir);
+ int ret = 0;
+ if (strcmp(
+ attempt_dir,
+ "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906101234_0001_m_000000_0")
+ != 0) {
+ ret = -1;
+ }
+ free(job_dir);
+ free(attempt_dir);
+ assert(ret == 0);
+}
+
+void test_get_task_launcher_file() {
+ char *job_dir = (char *) get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ char *task_file = (char *) get_task_launcher_file(job_dir,
+ "attempt_200906112028_0001_m_000000_0");
+ printf("task_file obtained is %s\n", task_file);
+ int ret = 0;
+ if (strcmp(
+ task_file,
+ "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+ != 0) {
+ ret = -1;
+ }
+ free(task_file);
+ assert(ret == 0);
+}
+
+void test_get_job_log_dir() {
+ char *logdir = (char *) get_job_log_dir("/tmp/testing",
+ "job_200906101234_0001");
+ printf("logdir obtained is %s\n", logdir);
+ int ret = 0;
+ if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) {
+ ret = -1;
+ }
+ free(logdir);
+ assert(ret == 0);
+}
+
+void test_get_job_acls_file() {
+ char *job_acls_file = (char *) get_job_acls_file(
+ "/tmp/testing/userlogs/job_200906101234_0001");
+ printf("job acls file obtained is %s\n", job_acls_file);
+ int ret = 0;
+ if (strcmp(job_acls_file,
+ "/tmp/testing/userlogs/job_200906101234_0001/job-acls.xml") != 0) {
+ ret = -1;
+ }
+ free(job_acls_file);
+ assert(ret == 0);
+}
+
+void test_get_task_log_dir() {
+ char *logdir = (char *) get_task_log_dir("/tmp/testing",
+ "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
+ printf("logdir obtained is %s\n", logdir);
+ int ret = 0;
+ if (strcmp(logdir,
+ "/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+ != 0) {
+ ret = -1;
+ }
+ free(logdir);
+ assert(ret == 0);
+}
+
+int main(int argc, char **argv) {
+ printf("\nStarting tests\n");
+ LOGFILE = stdout;
+
+ printf("\nTesting check_variable_against_config()\n");
+ test_check_variable_against_config();
+
+ printf("\nTesting get_user_directory()\n");
+ test_get_user_directory();
+
+ printf("\nTesting get_job_directory()\n");
+ test_get_job_directory();
+
+ printf("\nTesting get_attempt_directory()\n");
+ test_get_attempt_directory();
+
+ printf("\nTesting get_task_launcher_file()\n");
+ test_get_task_launcher_file();
+
+ printf("\nTesting get_job_log_dir()\n");
+ test_get_job_log_dir();
+
+ printf("\nTesting get_job_acls_file()\n");
+ test_get_job_acls_file();
+
+ printf("\nTesting get_task_log_dir()\n");
+ test_get_task_log_dir();
+
+ printf("\nFinished tests\n");
+ return 0;
+}
Added: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Jun 5 01:02:16 2012
@@ -0,0 +1,657 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * A {@link TaskController} that runs the task JVMs as the user
+ * who submits the job.
+ *
+ * This class executes a setuid executable to implement methods
+ * of the {@link TaskController}, including launching the task
+ * JVM and killing it when needed, and also initializing and
+ * finalizing the task environment.
+ * <p> The setuid executable is launched using the command line:</p>
+ * <p>task-controller mapreduce.job.user.name command command-args, where</p>
+ * <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
+ * <p>command is one of the cardinal value of the
+ * {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
+ * <p>command-args depends on the command being launched.</p>
+ *
+ * In addition to running and killing tasks, the class also
+ * sets up appropriate access for the directories and files
+ * that will be used by the tasks.
+ */
+class LinuxTaskController extends TaskController {
+
+ private static final Log LOG =
+ LogFactory.getLog(LinuxTaskController.class);
+
+ // Name of the executable script that will contain the child
+ // JVM command line. See writeCommand for details.
+ private static final String COMMAND_FILE = "taskjvm.sh";
+
+ // Path to the setuid executable.
+ private static String taskControllerExe;
+
+ static {
+ // the task-controller is expected to be under the $HADOOP_HOME/bin
+ // directory.
+ File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+ taskControllerExe =
+ new File(hadoopBin, "task-controller").getAbsolutePath();
+ }
+
+ public LinuxTaskController() {
+ super();
+ }
+
+ /**
+ * List of commands that the setuid script will execute.
+ */
+ enum TaskControllerCommands {
+ INITIALIZE_USER,
+ INITIALIZE_JOB,
+ INITIALIZE_DISTRIBUTEDCACHE_FILE,
+ LAUNCH_TASK_JVM,
+ INITIALIZE_TASK,
+ TERMINATE_TASK_JVM,
+ KILL_TASK_JVM,
+ RUN_DEBUG_SCRIPT,
+ SIGQUIT_TASK_JVM,
+ ENABLE_TASK_FOR_CLEANUP,
+ ENABLE_JOB_FOR_CLEANUP
+ }
+
+ @Override
+ public void setup() throws IOException {
+ super.setup();
+
+ // Check the permissions of the task-controller binary by running it plainly.
+ // If permissions are correct, it returns an error code 1, else it returns
+ // 24 or something else if some other bugs are also present.
+ String[] taskControllerCmd =
+ new String[] { getTaskControllerExecutablePath() };
+ ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
+ try {
+ shExec.execute();
+ } catch (ExitCodeException e) {
+ int exitCode = shExec.getExitCode();
+ if (exitCode != 1) {
+ LOG.warn("Exit code from checking binary permissions is : " + exitCode);
+ logOutput(shExec.getOutput());
+ throw new IOException("Task controller setup failed because of invalid"
+ + "permissions/ownership with exit code " + exitCode, e);
+ }
+ }
+ }
+
+ /**
+ * Launch a task JVM that will run as the owner of the job.
+ *
+ * This method launches a task JVM by executing a setuid executable that will
+ * switch to the user and run the task. Also does initialization of the first
+ * task in the same setuid process launch.
+ */
+ @Override
+ void launchTaskJVM(TaskController.TaskControllerContext context)
+ throws IOException {
+ JvmEnv env = context.env;
+ // get the JVM command line.
+ String cmdLine =
+ TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
+ env.logSize, true);
+
+ StringBuffer sb = new StringBuffer();
+ //export out all the environment variable before child command as
+ //the setuid/setgid binaries would not be getting, any environmental
+ //variables which begin with LD_*.
+ for(Entry<String, String> entry : env.env.entrySet()) {
+ sb.append("export ");
+ sb.append(entry.getKey());
+ sb.append("=");
+ sb.append(entry.getValue());
+ sb.append("\n");
+ }
+ sb.append(cmdLine);
+ // write the command to a file in the
+ // task specific cache directory
+ writeCommand(sb.toString(), getTaskCacheDirectory(context,
+ context.env.workDir));
+
+ // Call the taskcontroller with the right parameters.
+ List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context,
+ context.env.workDir);
+ ShellCommandExecutor shExec = buildTaskControllerExecutor(
+ TaskControllerCommands.LAUNCH_TASK_JVM,
+ env.conf.getUser(),
+ launchTaskJVMArgs, env.workDir, env.env);
+ context.shExec = shExec;
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ int exitCode = shExec.getExitCode();
+ LOG.warn("Exit code from task is : " + exitCode);
+ // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
+ // terminated/killed forcefully. In all other cases, log the
+ // task-controller output
+ if (exitCode != 143 && exitCode != 137) {
+ LOG.warn("Exception thrown while launching task JVM : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
+ }
+ throw new IOException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
+ }
+ }
+
+ /**
+ * Launch the debug script process that will run as the owner of the job.
+ *
+ * This method launches the task debug script process by executing a setuid
+ * executable that will switch to the user and run the task.
+ */
+ @Override
+ void runDebugScript(DebugScriptContext context) throws IOException {
+ String debugOut = FileUtil.makeShellPath(context.stdout);
+ String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
+ writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
+ // Call the taskcontroller with the right parameters.
+ List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
+ runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(),
+ launchTaskJVMArgs, context.workDir, null);
+ }
+ /**
+ * Helper method that runs a LinuxTaskController command
+ *
+ * @param taskControllerCommand
+ * @param user
+ * @param cmdArgs
+ * @param env
+ * @throws IOException
+ */
+ private void runCommand(TaskControllerCommands taskControllerCommand,
+ String user, List<String> cmdArgs, File workDir, Map<String, String> env)
+ throws IOException {
+
+ ShellCommandExecutor shExec =
+ buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs,
+ workDir, env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
+ + shExec.getExitCode());
+ LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's "
+ + taskControllerCommand.toString() + " follows:");
+ logOutput(shExec.getOutput());
+ throw new IOException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's "
+ + taskControllerCommand.toString() + " follows:");
+ logOutput(shExec.getOutput());
+ }
+ }
+
+ /**
+ * Returns list of arguments to be passed while initializing a new task. See
+ * {@code buildTaskControllerExecutor(TaskControllerCommands, String,
+ * List<String>, JvmEnv)} documentation.
+ *
+ * @param context
+ * @return Argument to be used while launching Task VM
+ */
+ private List<String> buildInitializeTaskArgs(TaskExecContext context) {
+ List<String> commandArgs = new ArrayList<String>(3);
+ String taskId = context.task.getTaskID().toString();
+ String jobId = getJobId(context);
+ commandArgs.add(jobId);
+ if (!context.task.isTaskCleanupTask()) {
+ commandArgs.add(taskId);
+ } else {
+ commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+ }
+ return commandArgs;
+ }
+
+ @Override
+ void initializeTask(TaskControllerContext context)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to do "
+ + TaskControllerCommands.INITIALIZE_TASK.toString()
+ + " for " + context.task.getTaskID().toString());
+ }
+ runCommand(TaskControllerCommands.INITIALIZE_TASK,
+ context.env.conf.getUser(),
+ buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+ }
+
+ /**
+ * Builds the args to be passed to task-controller for enabling of task for
+ * cleanup. Last arg in this List is either $attemptId or $attemptId/work
+ */
+ private List<String> buildTaskCleanupArgs(
+ TaskControllerTaskPathDeletionContext context) {
+ List<String> commandArgs = new ArrayList<String>(3);
+ commandArgs.add(context.mapredLocalDir.toUri().getPath());
+ commandArgs.add(context.task.getJobID().toString());
+
+ String workDir = "";
+ if (context.isWorkDir) {
+ workDir = "/work";
+ }
+ if (context.task.isTaskCleanupTask()) {
+ commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
+ + workDir);
+ } else {
+ commandArgs.add(context.task.getTaskID() + workDir);
+ }
+
+ return commandArgs;
+ }
+
+ /**
+ * Builds the args to be passed to task-controller for enabling of job for
+ * cleanup. Last arg in this List is $jobid.
+ */
+ private List<String> buildJobCleanupArgs(
+ TaskControllerJobPathDeletionContext context) {
+ List<String> commandArgs = new ArrayList<String>(2);
+ commandArgs.add(context.mapredLocalDir.toUri().getPath());
+ commandArgs.add(context.jobId.toString());
+
+ return commandArgs;
+ }
+
+ /**
+ * Enables the task for cleanup by changing permissions of the specified path
+ * in the local filesystem
+ */
+ @Override
+ void enableTaskForCleanup(PathDeletionContext context)
+ throws IOException {
+ if (context instanceof TaskControllerTaskPathDeletionContext) {
+ TaskControllerTaskPathDeletionContext tContext =
+ (TaskControllerTaskPathDeletionContext) context;
+ enablePathForCleanup(tContext,
+ TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
+ buildTaskCleanupArgs(tContext));
+ }
+ else {
+ throw new IllegalArgumentException("PathDeletionContext provided is not "
+ + "TaskControllerTaskPathDeletionContext.");
+ }
+ }
+
+ /**
+ * Enables the job for cleanup by changing permissions of the specified path
+ * in the local filesystem
+ */
+ @Override
+ void enableJobForCleanup(PathDeletionContext context)
+ throws IOException {
+ if (context instanceof TaskControllerJobPathDeletionContext) {
+ TaskControllerJobPathDeletionContext tContext =
+ (TaskControllerJobPathDeletionContext) context;
+ enablePathForCleanup(tContext,
+ TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
+ buildJobCleanupArgs(tContext));
+ } else {
+ throw new IllegalArgumentException("PathDeletionContext provided is not "
+ + "TaskControllerJobPathDeletionContext.");
+ }
+ }
+
+ /**
+ * Enable a path for cleanup
+ * @param c {@link TaskControllerPathDeletionContext} for the path to be
+ * cleaned up
+ * @param command {@link TaskControllerCommands} for task/job cleanup
+ * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable
+ * path cleanup
+ */
+ private void enablePathForCleanup(TaskControllerPathDeletionContext c,
+ TaskControllerCommands command,
+ List<String> cleanupArgs) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
+ }
+
+ if ( c.user != null && c.fs instanceof LocalFileSystem) {
+ try {
+ runCommand(command, c.user, cleanupArgs, null, null);
+ } catch(IOException e) {
+ LOG.warn("Unable to change permissions for " + c.fullPath);
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Either user is null or the "
+ + "file system is not local file system.");
+ }
+ }
+
+ private void logOutput(String output) {
+ String shExecOutput = output;
+ if (shExecOutput != null) {
+ for (String str : shExecOutput.split("\n")) {
+ LOG.info(str);
+ }
+ }
+ }
+
+ private String getJobId(TaskExecContext context) {
+ String taskId = context.task.getTaskID().toString();
+ TaskAttemptID tId = TaskAttemptID.forName(taskId);
+ String jobId = tId.getJobID().toString();
+ return jobId;
+ }
+
+ /**
+ * Returns list of arguments to be passed while launching task VM.
+ * See {@code buildTaskControllerExecutor(TaskControllerCommands,
+ * String, List<String>, JvmEnv)} documentation.
+ * @param context
+ * @return Argument to be used while launching Task VM
+ */
+ private List<String> buildLaunchTaskArgs(TaskExecContext context,
+ File workDir) {
+ List<String> commandArgs = new ArrayList<String>(3);
+ LOG.debug("getting the task directory as: "
+ + getTaskCacheDirectory(context, workDir));
+ LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context, workDir)),
+ context) );
+ commandArgs.add(getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context, workDir)),
+ context));
+ commandArgs.addAll(buildInitializeTaskArgs(context));
+ return commandArgs;
+ }
+
+ // Get the directory from the list of directories configured
+ // in Configs.LOCAL_DIR chosen for storing data pertaining to
+ // this task.
+ private String getDirectoryChosenForTask(File directory,
+ TaskExecContext context) {
+ String jobId = getJobId(context);
+ String taskId = context.task.getTaskID().toString();
+ for (String dir : mapredLocalDirs) {
+ File mapredDir = new File(dir);
+ File taskDir =
+ new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
+ .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
+ .getParentFile();
+ if (directory.equals(taskDir)) {
+ return dir;
+ }
+ }
+
+ LOG.error("Couldn't parse task cache directory correctly");
+ throw new IllegalArgumentException("invalid task cache directory "
+ + directory.getAbsolutePath());
+ }
+
+ /**
+ * Builds the command line for launching/terminating/killing task JVM.
+ * Following is the format for launching/terminating/killing task JVM
+ * <br/>
+ * For launching following is command line argument:
+ * <br/>
+ * {@code mapreduce.job.user.name command tt-root job_id task_id}
+ * <br/>
+ * For terminating/killing task jvm.
+ * {@code mapreduce.job.user.name command tt-root task-pid}
+ *
+ * @param command command to be executed.
+ * @param userName mapreduce.job.user.name
+ * @param cmdArgs list of extra arguments
+ * @param workDir working directory for the task-controller
+ * @param env JVM environment variables.
+ * @return {@link ShellCommandExecutor}
+ * @throws IOException
+ */
+ private ShellCommandExecutor buildTaskControllerExecutor(
+ TaskControllerCommands command, String userName, List<String> cmdArgs,
+ File workDir, Map<String, String> env)
+ throws IOException {
+ String[] taskControllerCmd = new String[3 + cmdArgs.size()];
+ taskControllerCmd[0] = getTaskControllerExecutablePath();
+ taskControllerCmd[1] = userName;
+ taskControllerCmd[2] = String.valueOf(command.ordinal());
+ int i = 3;
+ for (String cmdArg : cmdArgs) {
+ taskControllerCmd[i++] = cmdArg;
+ }
+ if (LOG.isDebugEnabled()) {
+ for (String cmd : taskControllerCmd) {
+ LOG.debug("taskctrl command = " + cmd);
+ }
+ }
+ ShellCommandExecutor shExec = null;
+ if(workDir != null && workDir.exists()) {
+ shExec = new ShellCommandExecutor(taskControllerCmd,
+ workDir, env);
+ } else {
+ shExec = new ShellCommandExecutor(taskControllerCmd);
+ }
+
+ return shExec;
+ }
+
+ // Return the task specific directory under the cache.
+ private String getTaskCacheDirectory(TaskExecContext context,
+ File workDir) {
+ // In the case of JVM reuse, the task specific directory
+ // is different from what is set with respect with
+ // env.workDir. Hence building this from the taskId everytime.
+ String taskId = context.task.getTaskID().toString();
+ File cacheDirForJob = workDir.getParentFile().getParentFile();
+ if(context.task.isTaskCleanupTask()) {
+ taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
+ }
+ return new File(cacheDirForJob, taskId).getAbsolutePath();
+ }
+
+ // Write the JVM command line to a file under the specified directory
+ // Note that the JVM will be launched using a setuid executable, and
+ // could potentially contain strings defined by a user. Hence, to
+ // prevent special character attacks, we write the command line to
+ // a file and execute it.
+ private void writeCommand(String cmdLine,
+ String directory) throws IOException {
+
+ PrintWriter pw = null;
+ String commandFile = directory + File.separator + COMMAND_FILE;
+ LOG.info("Writing commands to " + commandFile);
+ LOG.info("--------Commands Begin--------");
+ LOG.info(cmdLine);
+ LOG.info("--------Commands End--------");
+ try {
+ FileWriter fw = new FileWriter(commandFile);
+ BufferedWriter bw = new BufferedWriter(fw);
+ pw = new PrintWriter(bw);
+ pw.write(cmdLine);
+ } catch (IOException ioe) {
+ LOG.error("Caught IOException while writing JVM command line to file. "
+ + ioe.getMessage());
+ } finally {
+ if (pw != null) {
+ pw.close();
+ }
+ // set execute permissions for all on the file.
+ File f = new File(commandFile);
+ if (f.exists()) {
+ f.setReadable(true, false);
+ f.setExecutable(true, false);
+ }
+ }
+ }
+
+ private List<String> buildInitializeJobCommandArgs(
+ JobInitializationContext context) {
+ List<String> initJobCmdArgs = new ArrayList<String>();
+ initJobCmdArgs.add(context.jobid.toString());
+ return initJobCmdArgs;
+ }
+
+ @Override
+ void initializeJob(JobInitializationContext context)
+ throws IOException {
+ LOG.debug("Going to initialize job " + context.jobid.toString()
+ + " on the TT");
+ runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
+ buildInitializeJobCommandArgs(context), context.workDir, null);
+ }
+
+ @Override
+ public void initializeDistributedCacheFile(DistributedCacheFileContext context)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to initialize distributed cache for " + context.user
+ + " with localizedBaseDir " + context.localizedBaseDir +
+ " and uniqueString " + context.uniqueString);
+ }
+ List<String> args = new ArrayList<String>();
+ // Here, uniqueString might start with '-'. Adding -- in front of the
+ // arguments indicates that they are non-option parameters.
+ args.add("--");
+ args.add(context.localizedBaseDir.toString());
+ args.add(context.uniqueString);
+ runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE,
+ context.user, args, context.workDir, null);
+ }
+
+ @Override
+ public void initializeUser(InitializationContext context)
+ throws IOException {
+ LOG.debug("Going to initialize user directories for " + context.user
+ + " on the TT");
+ runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
+ new ArrayList<String>(), context.workDir, null);
+ }
+
+ /**
+ * API which builds the command line to be pass to LinuxTaskController
+ * binary to terminate/kill the task. See
+ * {@code buildTaskControllerExecutor(TaskControllerCommands,
+ * String, List<String>, JvmEnv)} documentation.
+ *
+ *
+ * @param context context of task which has to be passed kill signal.
+ *
+ */
+ private List<String> buildKillTaskCommandArgs(TaskControllerContext
+ context){
+ List<String> killTaskJVMArgs = new ArrayList<String>();
+ killTaskJVMArgs.add(context.pid);
+ return killTaskJVMArgs;
+ }
+
+ /**
+ * Convenience method used to sending appropriate signal to the task
+ * VM
+ * @param context
+ * @param command
+ * @throws IOException
+ */
+ protected void signalTask(TaskControllerContext context,
+ TaskControllerCommands command) throws IOException{
+ if(context.task == null) {
+ LOG.info("Context task is null; not signaling the JVM");
+ return;
+ }
+ ShellCommandExecutor shExec = buildTaskControllerExecutor(
+ command, context.env.conf.getUser(),
+ buildKillTaskCommandArgs(context), context.env.workDir,
+ context.env.env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ void terminateTask(TaskControllerContext context) {
+ try {
+ signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while sending kill to the Task VM " +
+ StringUtils.stringifyException(e));
+ }
+ }
+
+ @Override
+ void killTask(TaskControllerContext context) {
+ try {
+ signalTask(context, TaskControllerCommands.KILL_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while sending destroy to the Task VM " +
+ StringUtils.stringifyException(e));
+ }
+ }
+
+ @Override
+ void dumpTaskStack(TaskControllerContext context) {
+ try {
+ signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
+ StringUtils.stringifyException(e));
+ }
+ }
+
+ protected String getTaskControllerExecutablePath() {
+ return taskControllerExe;
+ }
+
+ @Override
+ String getRunAsUser(JobConf conf) {
+ return conf.getUser();
+ }
+}
\ No newline at end of file
Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Tue Jun 5 01:02:16 2012
@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import junit.framework.TestCase;
+
+/**
+ * The base class which starts up a cluster with LinuxTaskController as the task
+ * controller.
+ *
+ * In order to run test cases utilizing LinuxTaskController please follow the
+ * following steps:
+ * <ol>
+ * <li>Build LinuxTaskController by not passing any
+ * <code>-Dhadoop.conf.dir</code></li>
+ * <li>Change ownership of the built binary to root:group1, where group1 is
+ * a secondary group of the test runner.</li>
+ * <li>Change permissions on the binary so that <em>others</em> component does
+ * not have any permissions on binary</li>
+ * <li>Make the built binary to setuid and setgid executable</li>
+ * <li>Execute following targets:
+ * <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em>
+ * -Dtaskcontroller-ugi=<em>user,group</em></code>
+ * <br/>(Note that "path to built binary" means the directory containing task-controller -
+ * not the actual complete path of the binary itself. This path must end in ".../bin")
+ * </li>
+ * </ol>
+ *
+ */
+public class ClusterWithLinuxTaskController extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(ClusterWithLinuxTaskController.class);
+
+ /**
+ * The wrapper class around LinuxTaskController which allows modification of
+ * the custom path to task-controller which we can use for task management.
+ *
+ **/
+ public static class MyLinuxTaskController extends LinuxTaskController {
+ String taskControllerExePath = System.getProperty(TASKCONTROLLER_PATH)
+ + "/task-controller";
+
+ @Override
+ public void setup() throws IOException {
+ getConf().set(TTConfig.TT_GROUP, taskTrackerSpecialGroup);
+
+ // write configuration file
+ configurationFile = createTaskControllerConf(System
+ .getProperty(TASKCONTROLLER_PATH), getConf());
+ super.setup();
+ }
+
+ @Override
+ protected String getTaskControllerExecutablePath() {
+ return new File(taskControllerExePath).getAbsolutePath();
+ }
+
+ void setTaskControllerExe(String execPath) {
+ this.taskControllerExePath = execPath;
+ }
+
+ volatile static int attemptedSigQuits = 0;
+ volatile static int failedSigQuits = 0;
+
+ /** Work like LinuxTaskController, but also count the number of
+ * attempted and failed SIGQUIT sends via the task-controller
+ * executable.
+ */
+ @Override
+ void dumpTaskStack(TaskControllerContext context) {
+ attemptedSigQuits++;
+ try {
+ signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
+ failedSigQuits++;
+ }
+ }
+ }
+
+ // cluster instances which sub classes can use
+ protected MiniMRCluster mrCluster = null;
+ protected MiniDFSCluster dfsCluster = null;
+
+ private JobConf clusterConf = null;
+ protected Path homeDirectory;
+
+ /** changing this to a larger number needs more work for creating
+ * taskcontroller.cfg.
+ * see {@link #startCluster()} and
+ * {@link #createTaskControllerConf(String, Configuration)}
+ */
+ private static final int NUMBER_OF_NODES = 1;
+
+ static final String TASKCONTROLLER_PATH = "taskcontroller-path";
+ static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
+
+ private static File configurationFile = null;
+
+ protected UserGroupInformation jobOwner;
+
+ protected static String taskTrackerSpecialGroup = null;
+ /**
+ * Primary group of the tasktracker - i.e. the user running the
+ * test.
+ */
+ protected static String taskTrackerPrimaryGroup = null;
+ static {
+ if (isTaskExecPathPassed()) {
+ try {
+ taskTrackerSpecialGroup = FileSystem.getLocal(new Configuration())
+ .getFileStatus(
+ new Path(System.getProperty(TASKCONTROLLER_PATH),
+ "task-controller")).getGroup();
+ } catch (IOException e) {
+ LOG.warn("Could not get group of the binary", e);
+ fail("Could not get group of the binary");
+ }
+ try {
+ taskTrackerPrimaryGroup =
+ UserGroupInformation.getCurrentUser().getGroupNames()[0];
+ } catch (IOException ioe) {
+ LOG.warn("Could not get primary group of the current user", ioe);
+ fail("Could not get primary group of the current user");
+ }
+ }
+ }
+
+ /*
+ * Utility method which subclasses use to start and configure the MR Cluster
+ * so they can directly submit a job.
+ */
+ protected void startCluster()
+ throws IOException, InterruptedException {
+ JobConf conf = new JobConf();
+ dfsCluster = new MiniDFSCluster(conf, NUMBER_OF_NODES, true, null);
+ conf.set(TTConfig.TT_TASK_CONTROLLER,
+ MyLinuxTaskController.class.getName());
+ conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
+ mrCluster =
+ new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
+ .toString(), 4, null, null, conf);
+
+ clusterConf = mrCluster.createJobConf();
+
+ String ugi = System.getProperty(TASKCONTROLLER_UGI);
+ String[] splits = ugi.split(",");
+ jobOwner = UserGroupInformation.createUserForTesting(splits[0],
+ new String[]{splits[1]});
+ createHomeAndStagingDirectory(clusterConf);
+ }
+
+ private void createHomeAndStagingDirectory(JobConf conf)
+ throws IOException {
+ FileSystem fs = dfsCluster.getFileSystem();
+ String path = "/user/" + jobOwner.getUserName();
+ homeDirectory = new Path(path);
+ LOG.info("Creating Home directory : " + homeDirectory);
+ fs.mkdirs(homeDirectory);
+ changePermission(fs);
+ Path stagingArea = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT));
+ LOG.info("Creating Staging root directory : " + stagingArea);
+ fs.mkdirs(stagingArea);
+ fs.setPermission(stagingArea, new FsPermission((short)0777));
+ }
+
+ private void changePermission(FileSystem fs)
+ throws IOException {
+ fs.setOwner(homeDirectory, jobOwner.getUserName(),
+ jobOwner.getGroupNames()[0]);
+ }
+
+ static File getTaskControllerConfFile(String path) {
+ File confDirectory = new File(path, "../conf");
+ return new File(confDirectory, "taskcontroller.cfg");
+ }
+
+ /**
+ * Create taskcontroller.cfg.
+ *
+ * @param path Path to the taskcontroller binary.
+ * @param conf TaskTracker's configuration
+ * @return the created conf file
+ * @throws IOException
+ */
+ static File createTaskControllerConf(String path,
+ Configuration conf) throws IOException {
+ File confDirectory = new File(path, "../conf");
+ if (!confDirectory.exists()) {
+ confDirectory.mkdirs();
+ }
+ File configurationFile = new File(confDirectory, "taskcontroller.cfg");
+ PrintWriter writer =
+ new PrintWriter(new FileOutputStream(configurationFile));
+
+ writer.println(String.format(MRConfig.LOCAL_DIR + "=%s", conf
+ .get(MRConfig.LOCAL_DIR)));
+
+ writer
+ .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
+ writer.println(String.format(TTConfig.TT_GROUP + "=%s",
+ conf.get(TTConfig.TT_GROUP)));
+
+ writer.flush();
+ writer.close();
+ return configurationFile;
+ }
+
+ /**
+ * Can we run the tests with LinuxTaskController?
+ *
+ * @return boolean
+ */
+ protected static boolean shouldRun() {
+ if (!isTaskExecPathPassed() || !isUserPassed()) {
+ LOG.info("Not running test.");
+ return false;
+ }
+ return true;
+ }
+
+ static boolean isTaskExecPathPassed() {
+ String path = System.getProperty(TASKCONTROLLER_PATH);
+ if (path == null || path.isEmpty()
+ || path.equals("${" + TASKCONTROLLER_PATH + "}")) {
+ LOG.info("Invalid taskcontroller-path : " + path);
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean isUserPassed() {
+ String ugi = System.getProperty(TASKCONTROLLER_UGI);
+ if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}"))
+ && !ugi.isEmpty()) {
+ if (ugi.indexOf(",") > 1) {
+ return true;
+ }
+ LOG.info("Invalid taskcontroller-ugi : " + ugi);
+ return false;
+ }
+ LOG.info("Invalid taskcontroller-ugi : " + ugi);
+ return false;
+ }
+
+ protected JobConf getClusterConf() {
+ return new JobConf(clusterConf);
+ }
+
+ @Override
+ protected void tearDown()
+ throws Exception {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+
+ if (configurationFile != null) {
+ configurationFile.delete();
+ }
+
+ super.tearDown();
+ }
+
+ /**
+ * Assert that the job is actually run by the specified user by verifying the
+ * permissions of the output part-files.
+ *
+ * @param outDir
+ * @throws IOException
+ */
+ protected void assertOwnerShip(Path outDir)
+ throws IOException {
+ FileSystem fs = outDir.getFileSystem(clusterConf);
+ assertOwnerShip(outDir, fs);
+ }
+
+ /**
+ * Assert that the job is actually run by the specified user by verifying the
+ * permissions of the output part-files.
+ *
+ * @param outDir
+ * @param fs
+ * @throws IOException
+ */
+ protected void assertOwnerShip(Path outDir, FileSystem fs)
+ throws IOException {
+ for (FileStatus status : fs.listStatus(outDir,
+ new Utils.OutputFileUtils
+ .OutputFilesFilter())) {
+ String owner = status.getOwner();
+ String group = status.getGroup();
+ LOG.info("Ownership of the file is " + status.getPath() + " is " + owner
+ + "," + group);
+ assertTrue("Output part-file's owner is not correct. Expected : "
+ + jobOwner.getUserName() + " Found : " + owner, owner
+ .equals(jobOwner.getUserName()));
+ assertTrue("Output part-file's group is not correct. Expected : "
+ + jobOwner.getGroupNames()[0] + " Found : " + group, group
+ .equals(jobOwner.getGroupNames()[0]));
+ }
+ }
+
+ /**
+ * Validates permissions of private distcache dir and its contents fully
+ */
+ public static void checkPermissionsOnPrivateDistCache(String[] localDirs,
+ String user, String taskTrackerUser, String groupOwner)
+ throws IOException {
+ // user-dir, jobcache and distcache will have
+ // 2770 permissions if jobOwner is same as tt_user
+ // 2570 permissions for any other user
+ String expectedDirPerms = taskTrackerUser.equals(user)
+ ? "drwxrws---"
+ : "dr-xrws---";
+ String expectedFilePerms = taskTrackerUser.equals(user)
+ ? "-rwxrwx---"
+ : "-r-xrwx---";
+ for (String localDir : localDirs) {
+ File distCacheDir = new File(localDir,
+ TaskTracker.getPrivateDistributedCacheDir(user));
+ if (distCacheDir.exists()) {
+ checkPermissionsOnDir(distCacheDir, user, groupOwner, expectedDirPerms,
+ expectedFilePerms);
+ }
+ }
+ }
+
+ /**
+ * Check that files expected to be localized in distributed cache for a user
+ * are present.
+ * @param localDirs List of mapred local directories.
+ * @param user User against which localization is happening
+ * @param expectedFileNames List of files expected to be localized
+ * @throws IOException
+ */
+ public static void checkPresenceOfPrivateDistCacheFiles(String[] localDirs,
+ String user, String[] expectedFileNames) throws IOException {
+ FileGatherer gatherer = new FileGatherer();
+ for (String localDir : localDirs) {
+ File distCacheDir = new File(localDir,
+ TaskTracker.getPrivateDistributedCacheDir(user));
+ findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+ }
+ assertEquals("Files expected in private distributed cache were not found",
+ expectedFileNames.length, gatherer.getCount());
+ }
+
+ /**
+ * Validates permissions and ownership of public distcache dir and its
+ * contents fully in all local dirs
+ */
+ public static void checkPermissionsOnPublicDistCache(FileSystem localFS,
+ String[] localDirs, String owner, String group) throws IOException {
+ for (String localDir : localDirs) {
+ File distCacheDir = new File(localDir,
+ TaskTracker.getPublicDistributedCacheDir());
+
+ if (distCacheDir.exists()) {
+ checkPublicFilePermissions(localFS, distCacheDir, owner, group);
+ }
+ }
+ }
+
+ /**
+ * Checks that files expected to be localized in the public distributed
+ * cache are present
+ * @param localDirs List of mapred local directories
+ * @param expectedFileNames List of expected file names.
+ * @throws IOException
+ */
+ public static void checkPresenceOfPublicDistCacheFiles(String[] localDirs,
+ String[] expectedFileNames) throws IOException {
+ FileGatherer gatherer = new FileGatherer();
+ for (String localDir : localDirs) {
+ File distCacheDir = new File(localDir,
+ TaskTracker.getPublicDistributedCacheDir());
+ findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+ }
+ assertEquals("Files expected in public distributed cache were not found",
+ expectedFileNames.length, gatherer.getCount());
+ }
+
+ /**
+ * Validates permissions and ownership on the public distributed cache files
+ */
+ private static void checkPublicFilePermissions(FileSystem localFS, File dir,
+ String owner, String group)
+ throws IOException {
+ Path dirPath = new Path(dir.getAbsolutePath());
+ TestTrackerDistributedCacheManager.checkPublicFilePermissions(localFS,
+ new Path[] {dirPath});
+ TestTrackerDistributedCacheManager.checkPublicFileOwnership(localFS,
+ new Path[] {dirPath}, owner, group);
+ if (dir.isDirectory()) {
+ File[] files = dir.listFiles();
+ for (File file : files) {
+ checkPublicFilePermissions(localFS, file, owner, group);
+ }
+ }
+ }
+
+ /**
+ * Validates permissions of given dir and its contents fully(i.e. recursively)
+ */
+ private static void checkPermissionsOnDir(File dir, String user,
+ String groupOwner, String expectedDirPermissions,
+ String expectedFilePermissions) throws IOException {
+ TestTaskTrackerLocalization.checkFilePermissions(dir.toString(),
+ expectedDirPermissions, user, groupOwner);
+ File[] files = dir.listFiles();
+ for (File file : files) {
+ if (file.isDirectory()) {
+ checkPermissionsOnDir(file, user, groupOwner, expectedDirPermissions,
+ expectedFilePermissions);
+ } else {
+ TestTaskTrackerLocalization.checkFilePermissions(file.toString(),
+ expectedFilePermissions, user, groupOwner);
+ }
+ }
+ }
+
+ // Check which files among those expected are present in the rootDir
+ // Add those present to the FileGatherer.
+ private static void findExpectedFiles(String[] expectedFileNames,
+ File rootDir, FileGatherer gatherer) {
+
+ File[] files = rootDir.listFiles();
+ if (files == null) {
+ return;
+ }
+ for (File file : files) {
+ if (file.isDirectory()) {
+ findExpectedFiles(expectedFileNames, file, gatherer);
+ } else {
+ if (isFilePresent(expectedFileNames, file)) {
+ gatherer.addFileName(file.getName());
+ }
+ }
+ }
+
+ }
+
+ // Test if the passed file is present in the expected list of files.
+ private static boolean isFilePresent(String[] expectedFileNames, File file) {
+ boolean foundFileName = false;
+ for (String name : expectedFileNames) {
+ if (name.equals(file.getName())) {
+ foundFileName = true;
+ break;
+ }
+ }
+ return foundFileName;
+ }
+
+ // Helper class to collect a list of file names across multiple
+ // method calls. Wrapper around a collection defined for clarity
+ private static class FileGatherer {
+ List<String> foundFileNames = new ArrayList<String>();
+
+ void addFileName(String fileName) {
+ foundFileNames.add(fileName);
+ }
+
+ int getCount() {
+ return foundFileNames.size();
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java Tue Jun 5 01:02:16 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+public class TestDebugScriptWithLinuxTaskController extends
+ ClusterWithLinuxTaskController {
+
+ @Test
+ public void testDebugScriptExecutionAsDifferentUser() throws Exception {
+ if (!super.shouldRun()) {
+ return;
+ }
+ super.startCluster();
+ TestDebugScript.setupDebugScriptDirs();
+ final Path inDir = new Path("input");
+ final Path outDir = new Path("output");
+ JobConf conf = super.getClusterConf();
+ FileSystem fs = inDir.getFileSystem(conf);
+ fs.mkdirs(inDir);
+ Path p = new Path(inDir, "1.txt");
+ fs.createNewFile(p);
+ String splits[] = System
+ .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI).
+ split(",");
+ JobID jobId = UserGroupInformation.createUserForTesting(splits[0],
+ new String[]{splits[1]}).doAs(new PrivilegedExceptionAction<JobID>() {
+ public JobID run() throws IOException{
+ return TestDebugScript.runFailingMapJob(
+ TestDebugScriptWithLinuxTaskController.this.getClusterConf(),
+ inDir, outDir);
+ }
+ });
+ // construct the task id of first map task of failmap
+ TaskAttemptID taskId = new TaskAttemptID(
+ new TaskID(jobId,TaskType.MAP, 0), 0);
+ TestDebugScript.verifyDebugScriptOutput(taskId, splits[0],
+ taskTrackerSpecialGroup, "-rw-rw----");
+ TestDebugScript.cleanupDebugScriptDirs();
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Tue Jun 5 01:02:16 2012
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Test a java-based mapred job with LinuxTaskController running the jobs as a
+ * user different from the user running the cluster. See
+ * {@link ClusterWithLinuxTaskController}
+ */
+public class TestJobExecutionAsDifferentUser extends
+ ClusterWithLinuxTaskController {
+
+ public void testJobExecution()
+ throws Exception {
+ if (!shouldRun()) {
+ return;
+ }
+ startCluster();
+
+
+ jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws Exception {
+ Path inDir = new Path("input");
+ Path outDir = new Path("output");
+
+ RunningJob job;
+ // Run a job with zero maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with 1 map and zero reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a normal job with maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with jvm reuse
+ JobConf myConf = getClusterConf();
+ myConf.set(JobContext.JVM_NUMTASKS_TORUN, "-1");
+ String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
+ assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
+ return null;
+ }
+ });
+
+ }
+
+ public void testEnvironment() throws Exception {
+ if (!shouldRun()) {
+ return;
+ }
+ startCluster();
+ jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws Exception {
+
+ TestMiniMRChildTask childTask = new TestMiniMRChildTask();
+ Path inDir = new Path("input1");
+ Path outDir = new Path("output1");
+ try {
+ childTask.runTestTaskEnv(getClusterConf(), inDir, outDir, false);
+ } catch (IOException e) {
+ fail("IOException thrown while running enviroment test."
+ + e.getMessage());
+ } finally {
+ FileSystem outFs = outDir.getFileSystem(getClusterConf());
+ if (outFs.exists(outDir)) {
+ assertOwnerShip(outDir);
+ outFs.delete(outDir, true);
+ } else {
+ fail("Output directory does not exist" + outDir.toString());
+ }
+ return null;
+ }
+ }
+ });
+ }
+
+ /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
+ * if a task times out.
+ */
+ public void testTimeoutStackTrace() throws Exception {
+ if (!shouldRun()) {
+ return;
+ }
+
+ // Run a job that should timeout and trigger a SIGQUIT.
+ startCluster();
+ jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws Exception {
+ JobConf conf = getClusterConf();
+ conf.setInt(JobContext.TASK_TIMEOUT, 10000);
+ conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(conf);
+ Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
+ job.setMaxMapAttempts(1);
+ int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
+ job.waitForCompletion(true);
+ assertTrue("Did not detect a new SIGQUIT!",
+ prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
+ assertEquals("A SIGQUIT attempt failed!", 0,
+ MyLinuxTaskController.failedSigQuits);
+ return null;
+ }
+ });
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java Tue Jun 5 01:02:16 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Test killing of child processes spawned by the jobs with LinuxTaskController
+ * running the jobs as a user different from the user running the cluster.
+ * See {@link ClusterWithLinuxTaskController}
+ */
+
+public class TestKillSubProcessesWithLinuxTaskController extends
+ ClusterWithLinuxTaskController {
+
+ public void testKillSubProcess() throws Exception{
+ if(!shouldRun()) {
+ return;
+ }
+ startCluster();
+ jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws Exception {
+ JobConf myConf = getClusterConf();
+ JobTracker jt = mrCluster.getJobTrackerRunner().getJobTracker();
+
+ TestKillSubProcesses.mr = mrCluster;
+ TestKillSubProcesses sbProc = new TestKillSubProcesses();
+ sbProc.runTests(myConf, jt);
+ return null;
+ }
+ });
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java Tue Jun 5 01:02:16 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestLinuxTaskController extends TestCase {
+ private static int INVALID_TASKCONTROLLER_PERMISSIONS = 24;
+ private static File testDir = new File(System.getProperty("test.build.data",
+ "/tmp"), TestLinuxTaskController.class.getName());
+ private static String taskControllerPath = System
+ .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+
+ @Before
+ protected void setUp() throws Exception {
+ testDir.mkdirs();
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ FileUtil.fullyDelete(testDir);
+ }
+
+ public static class MyLinuxTaskController extends LinuxTaskController {
+ String taskControllerExePath = taskControllerPath + "/task-controller";
+
+ @Override
+ protected String getTaskControllerExecutablePath() {
+ return taskControllerExePath;
+ }
+ }
+
+ private void validateTaskControllerSetup(TaskController controller,
+ boolean shouldFail) throws IOException {
+ if (shouldFail) {
+ // task controller setup should fail validating permissions.
+ Throwable th = null;
+ try {
+ controller.setup();
+ } catch (IOException ie) {
+ th = ie;
+ }
+ assertNotNull("No exception during setup", th);
+ assertTrue("Exception message does not contain exit code"
+ + INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
+ "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
+ } else {
+ controller.setup();
+ }
+
+ }
+
+ @Test
+ public void testTaskControllerGroup() throws Exception {
+ if (!ClusterWithLinuxTaskController.isTaskExecPathPassed()) {
+ return;
+ }
+ // cleanup configuration file.
+ ClusterWithLinuxTaskController
+ .getTaskControllerConfFile(taskControllerPath).delete();
+ Configuration conf = new Configuration();
+ // create local dirs and set in the conf.
+ File mapredLocal = new File(testDir, "mapred/local");
+ mapredLocal.mkdirs();
+ conf.set(MRConfig.LOCAL_DIR, mapredLocal.toString());
+
+ // setup task-controller without setting any group name
+ TaskController controller = new MyLinuxTaskController();
+ controller.setConf(conf);
+ validateTaskControllerSetup(controller, true);
+
+ // set an invalid group name for the task controller group
+ conf.set(TTConfig.TT_GROUP, "invalid");
+ // write the task-controller's conf file
+ ClusterWithLinuxTaskController.createTaskControllerConf(taskControllerPath,
+ conf);
+ validateTaskControllerSetup(controller, true);
+
+ conf.set(TTConfig.TT_GROUP,
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ // write the task-controller's conf file
+ ClusterWithLinuxTaskController.createTaskControllerConf(taskControllerPath,
+ conf);
+ validateTaskControllerSetup(controller, false);
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Tue Jun 5 01:02:16 2012
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker when {@link LinuxTaskController} is used.
+ *
+ */
+public class TestLocalizationWithLinuxTaskController extends
+ TestTaskTrackerLocalization {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
+
+ private File configFile;
+ private static String taskTrackerUserName;
+
+ @Override
+ protected boolean canRun() {
+ return ClusterWithLinuxTaskController.shouldRun();
+ }
+
+ @Override
+ protected void setUp()
+ throws Exception {
+
+ if (!canRun()) {
+ return;
+ }
+
+ super.setUp();
+
+ taskTrackerUserName = UserGroupInformation.getLoginUser()
+ .getShortUserName();
+ }
+
+ @Override
+ protected void tearDown()
+ throws Exception {
+ if (!canRun()) {
+ return;
+ }
+ super.tearDown();
+ if (configFile != null) {
+ configFile.delete();
+ }
+ }
+
+ protected TaskController createTaskController() {
+ return new MyLinuxTaskController();
+ }
+
+ protected UserGroupInformation getJobOwner() {
+ String ugi = System
+ .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ String[] splits = ugi.split(",");
+ return UserGroupInformation.createUserForTesting(splits[0],
+ new String[] { splits[1] });
+ }
+
+ /** @InheritDoc */
+ @Override
+ public void testTaskControllerSetup() {
+ // Do nothing.
+ }
+
+ @Override
+ protected void checkUserLocalization()
+ throws IOException {
+ // Check the directory structure and permissions
+ for (String dir : localDirs) {
+
+ File localDir = new File(dir);
+ assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!",
+ localDir.exists());
+
+ File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+ assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ + "is not created!", taskTrackerSubDir.exists());
+
+ // user-dir, jobcache and distcache will have
+ // 2770 permissions if jobOwner is same as tt_user
+ // 2570 permissions for any other user
+ String expectedDirPerms = taskTrackerUserName.equals(task.getUser())
+ ? "drwxrws---"
+ : "dr-xrws---";
+
+ File userDir = new File(taskTrackerSubDir, task.getUser());
+ assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+ + "is not created!", userDir.exists());
+
+ checkFilePermissions(userDir.getAbsolutePath(), expectedDirPerms, task
+ .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+ File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+ assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+ jobCache.exists());
+
+ checkFilePermissions(jobCache.getAbsolutePath(), expectedDirPerms, task
+ .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+ // Verify the distributed cache dir.
+ File distributedCacheDir =
+ new File(localDir, TaskTracker
+ .getPrivateDistributedCacheDir(task.getUser()));
+ assertTrue("distributed cache dir " + distributedCacheDir
+ + " doesn't exists!", distributedCacheDir.exists());
+ checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+ expectedDirPerms, task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ }
+ }
+
+ @Override
+ protected void checkJobLocalization()
+ throws IOException {
+ // job-dir, jars-dir and subdirectories in them will have
+ // 2770 permissions if jobOwner is same as tt_user
+ // 2570 permissions for any other user
+ // Files under these dirs will have
+ // 770 permissions if jobOwner is same as tt_user
+ // 570 permissions for any other user
+ String expectedDirPerms = taskTrackerUserName.equals(task.getUser())
+ ? "drwxrws---"
+ : "dr-xrws---";
+ String expectedFilePerms = taskTrackerUserName.equals(task.getUser())
+ ? "-rwxrwx---"
+ : "-r-xrwx---";
+
+ for (String localDir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
+ File jobDir =
+ new File(localDir, TaskTracker.getLocalJobDir(task.getUser(), jobId
+ .toString()));
+ // check the private permissions on the job directory
+ checkFilePermissions(jobDir.getAbsolutePath(), expectedDirPerms, task
+ .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ }
+
+ // check the private permissions of various directories
+ List<Path> dirs = new ArrayList<Path>();
+ Path jarsDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(task.getUser(),
+ jobId.toString()), trackerFConf);
+ dirs.add(jarsDir);
+ dirs.add(new Path(jarsDir, "lib"));
+ for (Path dir : dirs) {
+ checkFilePermissions(dir.toUri().getPath(), expectedDirPerms,
+ task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ }
+
+ // job-work dir needs user writable permissions i.e. 2770 for any user
+ Path jobWorkDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(),
+ jobId.toString()), trackerFConf);
+ checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", task
+ .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+ // check the private permissions of various files
+ List<Path> files = new ArrayList<Path>();
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalJobConfFile(
+ task.getUser(), jobId.toString()), trackerFConf));
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task
+ .getUser(), jobId.toString()), trackerFConf));
+ files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
+ files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
+ for (Path file : files) {
+ checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task
+ .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ }
+
+ // check job user-log directory permissions
+ File jobLogDir = TaskLog.getJobDir(jobId);
+ checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ // check job-acls.xml file permissions
+ checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR
+ + TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+ // validate the content of job ACLs file
+ validateJobACLsFileContent();
+ }
+
+ @Override
+ protected void checkTaskLocalization()
+ throws IOException {
+ // check the private permissions of various directories
+ List<Path> dirs = new ArrayList<Path>();
+ dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
+ .getUser(), jobId.toString(), taskId.toString(),
+ task.isTaskCleanupTask()), trackerFConf));
+ dirs.add(attemptWorkDir);
+ dirs.add(new Path(attemptWorkDir, "tmp"));
+ dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));
+ for (Path dir : dirs) {
+ checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
+ task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ }
+
+ // check the private permissions of various files
+ List<Path> files = new ArrayList<Path>();
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+ .getUser(), task.getJobID().toString(), task.getTaskID().toString(),
+ task.isTaskCleanupTask()), trackerFConf));
+ for (Path file : files) {
+ checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", task
+ .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Tue Jun 5 01:02:16 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Test the DistributedCacheManager when LinuxTaskController is used.
+ *
+ */
+public class TestTrackerDistributedCacheManagerWithLinuxTaskController extends
+ TestTrackerDistributedCacheManager {
+
+ private File configFile;
+
+ private static final Log LOG =
+ LogFactory
+ .getLog(TestTrackerDistributedCacheManagerWithLinuxTaskController.class);
+
+ @Override
+ protected void setUp()
+ throws IOException, InterruptedException {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp"),
+ TestTrackerDistributedCacheManagerWithLinuxTaskController.class
+ .getSimpleName()).getAbsolutePath();
+
+ super.setUp();
+
+ taskController = new MyLinuxTaskController();
+ String path =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+ String execPath = path + "/task-controller";
+ ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
+ taskController.setConf(conf);
+ taskController.setup();
+ }
+
+ @Override
+ protected void refreshConf(Configuration conf) throws IOException {
+ super.refreshConf(conf);
+ String path =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+ configFile =
+ ClusterWithLinuxTaskController.createTaskControllerConf(path, conf);
+
+ }
+
+ @Override
+ protected void tearDown()
+ throws IOException {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+ if (configFile != null) {
+ configFile.delete();
+ }
+ super.tearDown();
+ }
+
+ @Override
+ protected boolean canRun() {
+ return ClusterWithLinuxTaskController.shouldRun();
+ }
+
+ @Override
+ protected String getJobOwnerName() {
+ String ugi =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ String userName = ugi.split(",")[0];
+ return userName;
+ }
+
+ @Override
+ protected void checkFilePermissions(Path[] localCacheFiles)
+ throws IOException {
+ String userName = getJobOwnerName();
+ String filePermissions = UserGroupInformation.getLoginUser()
+ .getShortUserName().equals(userName) ? "-rwxrwx---" : "-r-xrwx---";
+
+ for (Path p : localCacheFiles) {
+ // First make sure that the cache file has proper permissions.
+ TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(),
+ filePermissions, userName,
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ // Now. make sure that all the path components also have proper
+ // permissions.
+ checkPermissionOnPathComponents(p.toUri().getPath(), userName);
+ }
+
+ }
+
+ /**
+ * @param cachedFilePath
+ * @param userName
+ * @throws IOException
+ */
+ private void checkPermissionOnPathComponents(String cachedFilePath,
+ String userName)
+ throws IOException {
+ // The trailing distcache/file/... string
+ String trailingStringForFirstFile =
+ cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath()
+ + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
+ + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
+ "");
+ LOG.info("Trailing path for cacheFirstFile is : "
+ + trailingStringForFirstFile);
+ // The leading mapreduce.cluster.local.dir/0_[0-n]/taskTracker/$user string.
+ String leadingStringForFirstFile =
+ cachedFilePath.substring(0, cachedFilePath
+ .lastIndexOf(trailingStringForFirstFile));
+ LOG.info("Leading path for cacheFirstFile is : "
+ + leadingStringForFirstFile);
+
+ String dirPermissions = UserGroupInformation.getLoginUser()
+ .getShortUserName().equals(userName) ? "drwxrws---" : "dr-xrws---";
+
+ // Now check path permissions, starting with cache file's parent dir.
+ File path = new File(cachedFilePath).getParentFile();
+ while (!path.getAbsolutePath().equals(leadingStringForFirstFile)) {
+ TestTaskTrackerLocalization.checkFilePermissions(path.getAbsolutePath(),
+ dirPermissions, userName,
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ path = path.getParentFile();
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
|