hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [28/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/test/test-task-controller.c?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/test/test-task-controller.c (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/test/test-task-controller.c Thu Mar 17 20:21:13 2011
@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "configuration.h"
+#include "task-controller.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#define TEST_ROOT "/tmp/test-task-controller"
+#define DONT_TOUCH_FILE "dont-touch-me"
+
+static char* username = NULL;
+
+/**
+ * Run the command using the effective user id.
+ * It can't use system, since bash seems to copy the real user id into the
+ * effective id.
+ */
+void run(const char *cmd) {
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork - %s\n", strerror(errno));
+  } else if (child == 0) {
+    char *cmd_copy = strdup(cmd);
+    char *ptr;
+    int words = 1;
+    for(ptr = strchr(cmd_copy, ' ');  ptr; ptr = strchr(ptr+1, ' ')) {
+      words += 1;
+    }
+    char **argv = malloc(sizeof(char *) * (words + 1));
+    ptr = strtok(cmd_copy, " ");
+    int i = 0;
+    argv[i++] = ptr;
+    while (ptr != NULL) {
+      ptr = strtok(NULL, " ");
+      argv[i++] = ptr;
+    }
+    if (execvp(argv[0], argv) != 0) {
+      printf("FAIL: exec failed in child %s - %s\n", cmd, strerror(errno));
+      exit(42);
+    }
+  } else {
+    int status = 0;
+    if (waitpid(child, &status, 0) <= 0) {
+      printf("FAIL: failed waiting for child process %s pid %d - %s\n", 
+	     cmd, child, strerror(errno));
+      exit(1);
+    }
+    if (!WIFEXITED(status)) {
+      printf("FAIL: process %s pid %d did not exit\n", cmd, child);
+      exit(1);
+    }
+    if (WEXITSTATUS(status) != 0) {
+      printf("FAIL: process %s pid %d exited with error status %d\n", cmd, 
+	     child, WEXITSTATUS(status));
+      exit(1);
+    }
+  }
+}
+
+int write_config_file(char *file_name) {
+  FILE *file;
+  file = fopen(file_name, "w");
+  if (file == NULL) {
+    printf("Failed to open %s.\n", file_name);
+    return EXIT_FAILURE;
+  }
+  fprintf(file, "mapred.local.dir=" TEST_ROOT "/local-1");
+  int i;
+  for(i=2; i < 5; ++i) {
+    fprintf(file, "," TEST_ROOT "/local-%d", i);
+  }
+  fprintf(file, "\n");
+  fprintf(file, "hadoop.log.dir=" TEST_ROOT "/logs\n");
+  fclose(file);
+  return 0;
+}
+
+void create_tt_roots() {
+  char** tt_roots = get_values("mapred.local.dir");
+  char** tt_root;
+  for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) {
+    if (mkdir(*tt_root, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", *tt_root,
+	     strerror(errno));
+      exit(1);
+    }
+    char buffer[100000];
+    sprintf(buffer, "%s/taskTracker", *tt_root);
+    if (mkdir(buffer, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", buffer,
+	     strerror(errno));
+      exit(1);
+    }
+  }
+  free_values(tt_roots);
+}
+
+void test_get_user_directory() {
+  char *user_dir = get_user_directory("/tmp", "user");
+  char *expected = "/tmp/taskTracker/user";
+  if (strcmp(user_dir, expected) != 0) {
+    printf("test_get_user_directory expected %s got %s\n", user_dir, expected);
+    exit(1);
+  }
+  free(user_dir);
+}
+
+void test_get_job_directory() {
+  char *expected = "/tmp/taskTracker/user/appcache/job_200906101234_0001";
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  if (strcmp(job_dir, expected) != 0) {
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_get_attempt_directory() {
+  char *attempt_dir = get_attempt_work_directory("/tmp", "owen", "job_1",
+						 "attempt_1");
+  char *expected = "/tmp/taskTracker/owen/appcache/job_1/attempt_1/work";
+  if (strcmp(attempt_dir, expected) != 0) {
+    printf("Fail get_attempt_work_directory got %s expected %s\n",
+	   attempt_dir, expected);
+  }
+  free(attempt_dir);
+}
+
+void test_get_task_launcher_file() {
+  char *expected_file = ("/tmp/taskTracker/user/appcache/job_200906101234_0001"
+			 "/taskjvm.sh");
+  char *job_dir = get_job_directory("/tmp", "user",
+                                    "job_200906101234_0001");
+  char *task_file =  get_task_launcher_file(job_dir);
+  if (strcmp(task_file, expected_file) != 0) {
+    printf("failure to match expected task file %s vs %s\n", task_file,
+           expected_file);
+    exit(1);
+  }
+  free(job_dir);
+  free(task_file);
+}
+
+void test_get_job_log_dir() {
+  char *expected = TEST_ROOT "/logs/userlogs/job_200906101234_0001";
+  char *logdir = get_job_log_directory("job_200906101234_0001");
+  if (strcmp(logdir, expected) != 0) {
+    printf("Fail get_job_log_dir got %s expected %s\n", logdir, expected);
+    exit(1);
+  }
+  free(logdir);
+}
+
+void test_get_task_log_dir() {
+  char *logdir = get_job_log_directory("job_5/task_4");
+  char *expected = TEST_ROOT "/logs/userlogs/job_5/task_4";
+  if (strcmp(logdir, expected) != 0) {
+    printf("FAIL: get_task_log_dir expected %s got %s\n", logdir, expected);
+  }
+  free(logdir);
+}
+
+void test_check_user() {
+  printf("\nTesting test_check_user\n");
+  struct passwd *user = check_user(username);
+  if (user == NULL) {
+    printf("FAIL: failed check for user %s\n", username);
+    exit(1);
+  }
+  free(user);
+  if (check_user("lp") != NULL) {
+    printf("FAIL: failed check for system user lp\n");
+    exit(1);
+  }
+  if (check_user("root") != NULL) {
+    printf("FAIL: failed check for system user root\n");
+    exit(1);
+  }
+  if (check_user("mapred") != NULL) {
+    printf("FAIL: failed check for hadoop user mapred\n");
+    exit(1);
+  }
+}
+
+void test_check_configuration_permissions() {
+  printf("\nTesting check_configuration_permissions\n");
+  if (check_configuration_permissions("/etc/passwd") != 0) {
+    printf("FAIL: failed permission check on /etc/passwd\n");
+    exit(1);
+  }
+  if (check_configuration_permissions(TEST_ROOT) == 0) {
+    printf("FAIL: failed permission check on %s\n", TEST_ROOT);
+    exit(1);
+  }
+}
+
+void test_delete_task() {
+  if (initialize_user(username)) {
+    printf("FAIL: failed to initialized user %s\n", username);
+    exit(1);
+  }
+  char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1");
+  char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, 
+                                       DONT_TOUCH_FILE);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2", 
+					      username, "job_1", "task_1");
+  char buffer[100000];
+  sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+  run(buffer);
+  sprintf(buffer, "touch %s", dont_touch);
+  run(buffer);
+
+  // soft link to the canary file from the task directory
+  sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+  run(buffer);
+  // hard link to the canary file from the task directory
+  sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+  run(buffer);
+  // create a dot file in the task directory
+  sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+  run(buffer);
+  // create a no permission file
+  sprintf(buffer, "touch %s/who/let/protect", task_dir);
+  run(buffer);
+  sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+  run(buffer);
+  // create a no permission directory
+  sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+  run(buffer);
+
+  // delete task directory
+  int ret = delete_as_user(username, "appcache/job_1/task_1");
+  if (ret != 0) {
+    printf("FAIL: return code from delete_as_user is %d\n", ret);
+    exit(1);
+  }
+
+  // check to make sure the task directory is gone
+  if (access(task_dir, R_OK) == 0) {
+    printf("FAIL: failed to delete the directory - %s\n", task_dir);
+    exit(1);
+  }
+  // check to make sure the job directory is not gone
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: accidently deleted the directory - %s\n", job_dir);
+    exit(1);
+  }
+  // but that the canary is not gone
+  if (access(dont_touch, R_OK) != 0) {
+    printf("FAIL: accidently deleted file %s\n", dont_touch);
+    exit(1);
+  }
+  sprintf(buffer, "chmod -R 700 %s", job_dir);
+  run(buffer);
+  sprintf(buffer, "rm -fr %s", job_dir);
+  run(buffer);
+  free(job_dir);
+  free(task_dir);
+  free(dont_touch);
+}
+
+void test_delete_job() {
+  char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2");
+  char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, 
+                                       DONT_TOUCH_FILE);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2", 
+					      username, "job_2", "task_1");
+  char buffer[100000];
+  sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+  run(buffer);
+  sprintf(buffer, "touch %s", dont_touch);
+  run(buffer);
+
+  // soft link to the canary file from the task directory
+  sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+  run(buffer);
+  // hard link to the canary file from the task directory
+  sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+  run(buffer);
+  // create a dot file in the task directory
+  sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+  run(buffer);
+  // create a no permission file
+  sprintf(buffer, "touch %s/who/let/protect", task_dir);
+  run(buffer);
+  sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+  run(buffer);
+  // create a no permission directory
+  sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+  run(buffer);
+
+  // delete task directory
+  int ret = delete_as_user(username, "appcache/job_2");
+  if (ret != 0) {
+    printf("FAIL: return code from delete_as_user is %d\n", ret);
+    exit(1);
+  }
+
+  // check to make sure the task directory is gone
+  if (access(task_dir, R_OK) == 0) {
+    printf("FAIL: failed to delete the directory - %s\n", task_dir);
+    exit(1);
+  }
+  // check to make sure the job directory is gone
+  if (access(job_dir, R_OK) == 0) {
+    printf("FAIL: didn't delete the directory - %s\n", job_dir);
+    exit(1);
+  }
+  // but that the canary is not gone
+  if (access(dont_touch, R_OK) != 0) {
+    printf("FAIL: accidently deleted file %s\n", dont_touch);
+    exit(1);
+  }
+  free(job_dir);
+  free(task_dir);
+  free(dont_touch);
+}
+
+
+void test_delete_user() {
+  printf("\nTesting delete_user\n");
+  char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3");
+  if (mkdirs(job_dir, 0700) != 0) {
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/local-1/taskTracker/%s", TEST_ROOT, username);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: directory missing before test\n");
+    exit(1);
+  }
+  if (delete_as_user(username, "") != 0) {
+    exit(1);
+  }
+  if (access(buffer, R_OK) == 0) {
+    printf("FAIL: directory not deleted\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/local-1", R_OK) != 0) {
+    printf("FAIL: local-1 directory does not exist\n");
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_delete_log_directory() {
+  printf("\nTesting delete_log_directory\n");
+  char *job_log_dir = get_job_log_directory("job_1");
+  if (job_log_dir == NULL) {
+    exit(1);
+  }
+  if (create_directory_for_user(job_log_dir) != 0) {
+    exit(1);
+  }
+  free(job_log_dir);
+  char *task_log_dir = get_job_log_directory("job_1/task_2");
+  if (task_log_dir == NULL) {
+    exit(1);
+  }
+  if (mkdirs(task_log_dir, 0700) != 0) {
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) != 0) {
+    printf("FAIL: can't access task directory - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (delete_log_directory("job_1/task_2") != 0) {
+    printf("FAIL: can't delete task directory\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) == 0) {
+    printf("FAIL: task directory not deleted\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) != 0) {
+    printf("FAIL: job directory not deleted - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (delete_log_directory("job_1") != 0) {
+    printf("FAIL: can't delete task directory\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) == 0) {
+    printf("FAIL: job directory not deleted\n");
+    exit(1);
+  }
+  free(task_log_dir);
+}
+
+void run_test_in_child(const char* test_name, void (*func)()) {
+  printf("\nRunning test %s in child process\n", test_name);
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    func();
+    exit(0);
+  } else {
+    int status = 0;
+    if (waitpid(child, &status, 0) == -1) {
+      printf("FAIL: waitpid %d failed - %s\n", child, strerror(errno));
+      exit(1);
+    }
+    if (!WIFEXITED(status)) {
+      printf("FAIL: child %d didn't exit - %d\n", child, status);
+      exit(1);
+    }
+    if (WEXITSTATUS(status) != 0) {
+      printf("FAIL: child %d exited with bad status %d\n",
+	     child, WEXITSTATUS(status));
+      exit(1);
+    }
+  }
+}
+
+void test_signal_task() {
+  printf("\nTesting signal_task\n");
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+      exit(1);
+    }
+    sleep(3600);
+    exit(0);
+  } else {
+    printf("Child task launched as %d\n", child);
+    if (signal_user_task(username, child, SIGQUIT) != 0) {
+      exit(1);
+    }
+    int status = 0;
+    if (waitpid(child, &status, 0) == -1) {
+      printf("FAIL: waitpid failed - %s\n", strerror(errno));
+      exit(1);
+    }
+    if (!WIFSIGNALED(status)) {
+      printf("FAIL: child wasn't signalled - %d\n", status);
+      exit(1);
+    }
+    if (WTERMSIG(status) != SIGQUIT) {
+      printf("FAIL: child was killed with %d instead of %d\n", 
+	     WTERMSIG(status), SIGQUIT);
+      exit(1);
+    }
+  }
+}
+
+void test_signal_task_group() {
+  printf("\nTesting group signal_task\n");
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    setpgrp();
+    if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+      exit(1);
+    }
+    sleep(3600);
+    exit(0);
+  }
+  printf("Child task launched as %d\n", child);
+  if (signal_user_task(username, child, SIGKILL) != 0) {
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) == -1) {
+    printf("FAIL: waitpid failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (!WIFSIGNALED(status)) {
+    printf("FAIL: child wasn't signalled - %d\n", status);
+    exit(1);
+  }
+  if (WTERMSIG(status) != SIGKILL) {
+    printf("FAIL: child was killed with %d instead of %d\n", 
+	   WTERMSIG(status), SIGKILL);
+    exit(1);
+  }
+}
+
+void test_init_job() {
+  printf("\nTesting init job\n");
+  if (seteuid(0) != 0) {
+    printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  FILE* creds = fopen(TEST_ROOT "/creds.txt", "w");
+  if (creds == NULL) {
+    printf("FAIL: failed to create credentials file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(creds, "secret key\n") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(creds) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  FILE* job_xml = fopen(TEST_ROOT "/job.xml", "w");
+  if (job_xml == NULL) {
+    printf("FAIL: failed to create job file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(job_xml, "<jobconf/>\n") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(job_xml) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (seteuid(user_detail->pw_uid) != 0) {
+    printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+    exit(1);
+  }
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork process for init_job - %s\n", 
+	   strerror(errno));
+    exit(1);
+  } else if (child == 0) {
+    char *final_pgm[] = {"touch", "my-touch-file", 0};
+    if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt", 
+                       TEST_ROOT "/job.xml", final_pgm) != 0) {
+      printf("FAIL: failed in child\n");
+      exit(42);
+    }
+    // should never return
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) <= 0) {
+    printf("FAIL: failed waiting for process %d - %s\n", child, 
+	   strerror(errno));
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_4", R_OK) != 0) {
+    printf("FAIL: failed to create job log directory\n");
+    exit(1);
+  }
+  char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_4");
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job directory %s\n", job_dir);
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/jobToken", job_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create credentials %s\n", buffer);
+    exit(1);
+  }
+  sprintf(buffer, "%s/my-touch-file", job_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create touch file %s\n", buffer);
+    exit(1);
+  }
+  free(job_dir);
+  job_dir = get_job_log_directory("job_4");
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job log directory %s\n", job_dir);
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_run_task() {
+  printf("\nTesting run task\n");
+  if (seteuid(0) != 0) {
+    printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  const char* script_name = TEST_ROOT "/task-script";
+  FILE* script = fopen(script_name, "w");
+  if (script == NULL) {
+    printf("FAIL: failed to create script file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (seteuid(user_detail->pw_uid) != 0) {
+    printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(script, "#!/bin/bash\n"
+                     "touch foobar\n"
+                     "exit 0") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(script) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  fflush(stdout);
+  fflush(stderr);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1", 
+					      username, "job_4", "task_1");
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork process for init_job - %s\n", 
+	   strerror(errno));
+    exit(1);
+  } else if (child == 0) {
+    if (run_task_as_user(username, "job_4", "task_1", 
+                         task_dir, script_name) != 0) {
+      printf("FAIL: failed in child\n");
+      exit(42);
+    }
+    // should never return
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) <= 0) {
+    printf("FAIL: failed waiting for process %d - %s\n", child, 
+	   strerror(errno));
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_4/task_1", R_OK) != 0) {
+    printf("FAIL: failed to create task log directory\n");
+    exit(1);
+  }
+  if (access(task_dir, R_OK) != 0) {
+    printf("FAIL: failed to create task directory %s\n", task_dir);
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/foobar", task_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create touch file %s\n", buffer);
+    exit(1);
+  }
+  free(task_dir);
+  task_dir = get_job_log_directory("job_4/task_1");
+  if (access(task_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job log directory %s\n", task_dir);
+    exit(1);
+  }
+  free(task_dir);
+}
+
+int main(int argc, char **argv) {
+  LOGFILE = stdout;
+  int my_username = 0;
+
+  // clean up any junk from previous run
+  system("chmod -R u=rwx " TEST_ROOT "; rm -fr " TEST_ROOT);
+  
+  if (mkdirs(TEST_ROOT "/logs/userlogs", 0755) != 0) {
+    exit(1);
+  }
+  
+  if (write_config_file(TEST_ROOT "/test.cfg") != 0) {
+    exit(1);
+  }
+  read_config(TEST_ROOT "/test.cfg");
+
+  create_tt_roots();
+
+  if (getuid() == 0 && argc == 2) {
+    username = argv[1];
+  } else {
+    username = strdup(getpwuid(getuid())->pw_name);
+    my_username = 1;
+  }
+  set_tasktracker_uid(geteuid(), getegid());
+
+  if (set_user(username)) {
+    exit(1);
+  }
+
+  printf("\nStarting tests\n");
+
+  printf("\nTesting get_user_directory()\n");
+  test_get_user_directory();
+
+  printf("\nTesting get_job_directory()\n");
+  test_get_job_directory();
+
+  printf("\nTesting get_attempt_directory()\n");
+  test_get_attempt_directory();
+
+  printf("\nTesting get_task_launcher_file()\n");
+  test_get_task_launcher_file();
+
+  printf("\nTesting get_job_log_dir()\n");
+  test_get_job_log_dir();
+
+  test_check_configuration_permissions();
+
+  printf("\nTesting get_task_log_dir()\n");
+  test_get_task_log_dir();
+
+  printf("\nTesting delete_task()\n");
+  test_delete_task();
+
+  printf("\nTesting delete_job()\n");
+  test_delete_job();
+
+  test_delete_user();
+
+  test_check_user();
+
+  test_delete_log_directory();
+
+  // the tests that change user need to be run in a subshell, so that
+  // when they change user they don't give up our privs
+  run_test_in_child("test_signal_task", test_signal_task);
+  run_test_in_child("test_signal_task_group", test_signal_task_group);
+
+  // init job and run task can't be run if you aren't testing as root
+  if (getuid() == 0) {
+    // these tests do internal forks so that the change_owner and execs
+    // don't mess up our process.
+    test_init_job();
+    test_run_task();
+  }
+
+  seteuid(0);
+  run("rm -fr " TEST_ROOT);
+  printf("\nFinished tests\n");
+
+  if (my_username) {
+    free(username);
+  }
+  free_configurations();
+  return 0;
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/config/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/config/yarn-default.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/config/yarn-default.xml (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/config/yarn-default.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<configuration>
+<!--
+  <property>
+    <name></name>
+    <value></value>
+  </property>
+-->
+
+<property>
+    <name>yarn.server.nodemanager.bindaddress</name>
+    <value>0.0.0.0:45454</value>
+  </property>
+
+<property>
+    <name>yarn.server.nodemanager.local-dir</name>
+    <value>/tmp/nm-local-dir</value>
+  </property>
+
+<property>
+    <name>yarn.server.container-manager.user-principal</name>
+    <value>nm/localhost@LOCALHOST</value>
+  </property>
+
+ <property>
+    <name>yarn.server.principal</name>
+    <value>nm/localhost@LOCALHOST</value>
+  </property>
+
+<property>
+    <name>yarn.server.nodemanager.keytab</name>
+    <value>/etc/krb5.keytab</value>
+  </property>
+
+<property>
+    <name>yarn.server.nodemanager.container-executor.class</name>
+    <value>org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor</value>
+  </property>
+
+</configuration>

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,164 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.LocalizationProtocol;
+
+public abstract class ContainerExecutor implements Configurable {
+
+  static final Log LOG = LogFactory.getLog(ContainerExecutor.class);
+  final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
+    FsPermission.createImmutable((short) 0700);
+
+  private Configuration conf;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Prepare the environment for containers in this application to execute.
+   * For $x in local.dirs
+   *   create $x/$user/$appId
+   * Copy $nmLocal/appTokens -> $N/$user/$appId
+   * Copy $nmLocal/publicEnv.sh -> $N/$user/$appId
+   * For $rsrc in private resources
+   *   Copy $rsrc -> $N/$user/filecache/[idef]
+   * For $rsrc in job resources
+   *   Copy $rsrc -> $N/$user/$appId/filecache/idef
+   * @param user user name of application owner
+   * @param appId id of the application
+   * @param nmLocal path to localized credentials, rsrc by NM
+   * @param nmAddr RPC address to contact NM
+   * @throws IOException For most application init failures
+   * @throws InterruptedException If application init thread is halted by NM
+   */
+  public abstract void initApplication(Path nmLocal,
+      LocalizationProtocol localization,
+      String user, String appId, Path logDir, List<Path> localDirs)
+      throws IOException, InterruptedException;
+
+  /**
+   * Launch the container on the node.
+   * @param launchCtxt 
+   */
+  public abstract int launchContainer(Container container, Path nmLocal,
+      String user, String appId, List<Path> appDirs, String stdout,
+      String stderr) throws IOException;
+
+  public abstract boolean signalContainer(String user, int pid, Signal signal)
+      throws IOException, InterruptedException;
+
+  public abstract void deleteAsUser(String user, Path subDir, Path... basedirs)
+      throws IOException, InterruptedException;
+
+  /**
+   * The constants for the signals.
+   */
+  public enum Signal {
+    NULL(0, "NULL"), QUIT(3, "SIGQUIT"), 
+    KILL(9, "SIGKILL"), TERM(15, "SIGTERM");
+    private final int value;
+    private final String str;
+    private Signal(int value, String str) {
+      this.str = str;
+      this.value = value;
+    }
+    public int getValue() {
+      return value;
+    }
+    @Override
+    public String toString() {
+      return str;
+    }
+  }
+
+  protected void logOutput(String output) {
+    String shExecOutput = output;
+    if (shExecOutput != null) {
+      for (String str : shExecOutput.split("\n")) {
+        LOG.info(str);
+      }
+    }
+  }
+
+  public static final boolean isSetsidAvailable = isSetsidSupported();
+  private static boolean isSetsidSupported() {
+    ShellCommandExecutor shexec = null;
+    boolean setsidSupported = true;
+    try {
+      String[] args = {"setsid", "bash", "-c", "echo $$"};
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("setsid is not available on this machine. So not using it.");
+      setsidSupported = false;
+    } finally { // handle the exit code
+      LOG.info("setsid exited with exit code " + shexec.getExitCode());
+    }
+    return setsidSupported;
+  }
+
+  public static class DelayedProcessKiller extends Thread {
+    private final String user;
+    private final int pid;
+    private final long delay;
+    private final Signal signal;
+    private final ContainerExecutor containerExecutor;
+    public DelayedProcessKiller(String user, int pid, long delay, Signal signal,
+        ContainerExecutor containerExecutor) {
+      this.user = user;
+      this.pid = pid;
+      this.delay = delay;
+      this.signal = signal;
+      this.containerExecutor = containerExecutor;
+      setName("Task killer for " + pid);
+      setDaemon(false);
+    }
+    @Override
+    public void run() {
+      try {
+        Thread.sleep(delay);
+        containerExecutor.signalContainer(user, pid, signal);
+      } catch (InterruptedException e) {
+        return;
+      } catch (IOException e) {
+        LOG.warn("Exception when killing task " + pid, e);
+      }
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+
+/**
+ * Context interface for sharing information across components in the
+ * NodeManager.
+ */
+public interface Context {
+
+  public ConcurrentMap<ApplicationID, Application> getApplications();
+
+  public ConcurrentMap<ContainerID, Container> getContainers();
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,356 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.LocalizationProtocol;
+
+public class DefaultContainerExecutor extends ContainerExecutor {
+
+  static final Log LOG = LogFactory.getLog(DefaultContainerExecutor.class);
+
+  private final FileContext lfs;
+
+  DefaultContainerExecutor() {
+    try {
+      this.lfs = FileContext.getLocalFSFileContext();
+    } catch (UnsupportedFileSystemException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  DefaultContainerExecutor(FileContext lfs) {
+    this.lfs = lfs;
+  }
+
+  @Override
+  public void initApplication(Path nmLocal, LocalizationProtocol localization,
+      String user, String appId, Path logDir, List<Path> localDirs)
+      throws IOException, InterruptedException {
+    // TODO need a type
+    InetSocketAddress nmAddr =
+        ((ResourceLocalizationService) localization).getAddress();
+    FileContext appContext;
+    try {
+      appContext = FileContext.getLocalFSFileContext();
+    } catch (UnsupportedFileSystemException e) {
+      throw new YarnException("Failed to get local context", e);
+    }
+
+    ApplicationLocalizer localizer =
+      new ApplicationLocalizer(appContext, user, appId, logDir, localDirs);
+
+    createUserLocalDirs(localDirs, user);
+    createUserCacheDirs(localDirs, user);
+    createAppDirs(localDirs, user, appId);
+    createAppLogDir(logDir, appId);
+
+    Path appStorageDir = getApplicationDir(localDirs, user, appId);
+    Path filedesc = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE);
+    lfs.util().copy(filedesc,
+        new Path(appStorageDir, ApplicationLocalizer.FILECACHE_FILE));
+
+    Path appTokens = new Path(nmLocal, ApplicationLocalizer.APPTOKEN_FILE);
+    Path tokenDst = new Path(appStorageDir, ApplicationLocalizer.APPTOKEN_FILE);
+    lfs.util().copy(appTokens, tokenDst);
+    lfs.setWorkingDirectory(appStorageDir);
+
+    // TODO: DO it over RPC for maintaining similarity?
+    localizer.runLocalization(nmAddr);
+  }
+
+  @Override
+  public int launchContainer(Container container, Path nmLocal,
+      String user, String appId, List<Path> appDirs, String stdout,
+      String stderr) throws IOException {
+    // create container dirs
+    for (Path p : appDirs) {
+      lfs.mkdir(new Path(p, container.toString()), null, false);
+    }
+    // copy launch script to work dir
+    Path appWorkDir = new Path(appDirs.get(0), container.toString());
+    Path launchScript = new Path(nmLocal, ContainerLaunch.CONTAINER_SCRIPT);
+    Path launchDst = new Path(appWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
+    lfs.util().copy(launchScript, launchDst);
+    // copy container tokens to work dir
+    Path appTokens = new Path(nmLocal, ApplicationLocalizer.APPTOKEN_FILE);
+    Path tokenDst = new Path(appWorkDir, ApplicationLocalizer.APPTOKEN_FILE);
+    lfs.util().copy(appTokens, tokenDst);
+    // create log dir under app
+    // fork script
+    ShellCommandExecutor shExec = null;
+    try {
+      lfs.setPermission(launchDst,
+          ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
+      String[] command = 
+          new String[] { "bash", "-c", launchDst.toUri().getPath().toString() };
+      shExec = new ShellCommandExecutor(command,
+          new File(appWorkDir.toUri().getPath()));
+      shExec.execute();
+    } catch (Exception e) {
+      if (null == shExec) {
+        return -1;
+      }
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from task is : " + exitCode);
+      logOutput(shExec.getOutput());
+      return exitCode;
+    }
+    return 0;
+  }
+
+  @Override
+  public boolean signalContainer(String user, int pid, Signal signal)
+      throws IOException, InterruptedException {
+    final int sigpid = ContainerExecutor.isSetsidAvailable
+      ? -1 * pid
+      : pid;
+    try {
+      sendSignal(sigpid, Signal.NULL);
+    } catch (ExitCodeException e) {
+      return false;
+    }
+    try {
+      sendSignal(sigpid, signal);
+    } catch (IOException e) {
+      try {
+        sendSignal(sigpid, Signal.NULL);
+      } catch (IOException ignore) {
+        return false;
+      }
+      throw e;
+    }
+    return true;
+  }
+
+  /**
+   * Send a specified signal to the specified pid
+   *
+   * @param pid the pid of the process [group] to signal.
+   * @param signal signal to send
+   * (for logging).
+   */
+  protected void sendSignal(int pid, Signal signal) throws IOException {
+    ShellCommandExecutor shexec = null;
+      String[] arg = { "kill", "-" + signal.getValue(), Integer.toString(pid) };
+      shexec = new ShellCommandExecutor(arg);
+    shexec.execute();
+  }
+
+  @Override
+  public void deleteAsUser(String user, Path subDir, Path... baseDirs)
+      throws IOException, InterruptedException {
+    if (baseDirs == null || baseDirs.length == 0) {
+      LOG.info("Deleting absolute path : " + subDir);
+      lfs.delete(subDir, true);
+      return;
+    }
+    for (Path baseDir : baseDirs) {
+      Path del = new Path(baseDir, subDir);
+      LOG.info("Deleting path : " + del);
+      lfs.delete(del, true);
+    }
+  }
+
+  /** Permissions for user dir.
+   * $loaal.dir/usercache/$user */
+  private static final short USER_PERM = (short)0750;
+  /** Permissions for user appcache dir.
+   * $loaal.dir/usercache/$user/appcache */
+  private static final short APPCACHE_PERM = (short)0710;
+  /** Permissions for user filecache dir.
+   * $loaal.dir/usercache/$user/filecache */
+  private static final short FILECACHE_PERM = (short)0710;
+  /** Permissions for user app dir.
+   * $loaal.dir/usercache/$user/filecache */
+  private static final short APPDIR_PERM = (short)0710;
+  /** Permissions for user log dir.
+   * $logdir/$user/$appId */
+  private static final short LOGDIR_PERM = (short)0710;
+
+  private Path getApplicationDir(List<Path> localDirs, String user,
+      String appId) {
+    return getApplicationDir(localDirs.get(0), user, appId);
+  }
+
+  private Path getApplicationDir(Path base, String user, String appId) {
+    return new Path(getAppcacheDir(base, user), appId);
+  }
+
+  private Path getUserCacheDir(Path base, String user) {
+    return new Path(new Path(base, ApplicationLocalizer.USERCACHE), user);
+  }
+
+  private Path getAppcacheDir(Path base, String user) {
+    return new Path(getUserCacheDir(base, user),
+        ApplicationLocalizer.APPCACHE);
+  }
+
+  private Path getFileCacheDir(Path base, String user) {
+    return new Path(getUserCacheDir(base, user),
+        ApplicationLocalizer.FILECACHE);
+  }
+
+  /**
+   * Initialize the local directories for a particular user.
+   * <ul>
+   * <li>$local.dir/usercache/$user</li>
+   * </ul>
+   */
+  private void createUserLocalDirs(List<Path> localDirs, String user)
+      throws IOException {
+    boolean userDirStatus = false;
+    FsPermission userperms = new FsPermission(USER_PERM);
+    for (Path localDir : localDirs) {
+      // create $local.dir/usercache/$user
+      try {
+        lfs.mkdir(getUserCacheDir(localDir, user), userperms, true);
+      } catch (IOException e) {
+        LOG.warn("Unable to create the user directory : " + localDir, e);
+        continue;
+      }
+      userDirStatus = true;
+    }
+    if (!userDirStatus) {
+      throw new IOException("Not able to initialize user directories "
+          + "in any of the configured local directories for user " + user);
+    }
+  }
+
+
+  /**
+   * Initialize the local cache directories for a particular user.
+   * <ul>
+   * <li>$local.dir/usercache/$user</li>
+   * <li>$local.dir/usercache/$user/appcache</li>
+   * <li>$local.dir/usercache/$user/filecache</li>
+   * </ul>
+   */
+  private void createUserCacheDirs(List<Path> localDirs, String user)
+      throws IOException {
+    LOG.info("Initializing user " + user);
+
+    boolean appcacheDirStatus = false;
+    boolean distributedCacheDirStatus = false;
+    FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
+    FsPermission fileperms = new FsPermission(FILECACHE_PERM);
+
+    for (Path localDir : localDirs) {
+      // create $local.dir/usercache/$user/appcache
+      final Path appDir = getAppcacheDir(localDir, user);
+      try {
+        lfs.mkdir(appDir, appCachePerms, true);
+        appcacheDirStatus = true;
+      } catch (IOException e) {
+        LOG.warn("Unable to create app cache directory : " + appDir, e);
+      }
+      // create $local.dir/usercache/$user/filecache
+      final Path distDir = getFileCacheDir(localDir, user);
+      try {
+        lfs.mkdir(distDir, fileperms, true);
+        distributedCacheDirStatus = true;
+      } catch (IOException e) {
+        LOG.warn("Unable to create file cache directory : " + distDir, e);
+      }
+    }
+    if (!appcacheDirStatus) {
+      throw new IOException("Not able to initialize app-cache directories "
+          + "in any of the configured local directories for user " + user);
+    }
+    if (!distributedCacheDirStatus) {
+      throw new IOException(
+          "Not able to initialize distributed-cache directories "
+              + "in any of the configured local directories for user "
+              + user);
+    }
+  }
+
+  /**
+   * Initialize the local directories for a particular user.
+   * <ul>
+   * <li>$local.dir/usercache/$user/appcache/$appid</li>
+   * </ul>
+   * @param localDirs 
+   */
+  private void createAppDirs(List<Path> localDirs, String user, String appId)
+      throws IOException {
+    boolean initAppDirStatus = false;
+    FsPermission appperms = new FsPermission(APPDIR_PERM);
+    for (Path localDir : localDirs) {
+      Path fullAppDir = getApplicationDir(localDir, user, appId);
+      if (lfs.util().exists(fullAppDir)) {
+        // this will happen on a partial execution of localizeJob. Sometimes
+        // copying job.xml to the local disk succeeds but copying job.jar might
+        // throw out an exception. We should clean up and then try again.
+        lfs.delete(fullAppDir, true);
+      }
+      // create $local.dir/usercache/$user/appcache/$appId
+      try {
+        lfs.mkdir(fullAppDir, appperms, true);
+        initAppDirStatus = true;
+      } catch (IOException e) {
+        LOG.warn("Unable to create app directory " + fullAppDir.toString(), e);
+      }
+    }
+    if (!initAppDirStatus) {
+      throw new IOException("Not able to initialize app directories "
+          + "in any of the configured local directories for app "
+          + appId.toString());
+    }
+    // pick random work dir for compatibility
+    // create $local.dir/usercache/$user/appcache/$appId/work
+    Path workDir =
+        new Path(getApplicationDir(localDirs, user, appId),
+            ApplicationLocalizer.WORKDIR);
+    lfs.mkdir(workDir, null, true);
+  }
+
+  /**
+   * Create application log directory.
+   */
+  private void createAppLogDir(Path logDir, String appId)
+      throws IOException {
+    Path appUserLogDir = new Path(logDir, appId);
+    try {
+      lfs.mkdir(appUserLogDir, new FsPermission(LOGDIR_PERM), true);
+    } catch (IOException e) {
+      throw new IOException(
+          "Could not create app user log directory: " + appUserLogDir, e);
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,129 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import static java.util.concurrent.TimeUnit.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DeletionService extends AbstractService {
+
+  static final Log LOG = LogFactory.getLog(DeletionService.class);
+
+  private final ThreadPoolExecutor sched =
+    new ThreadPoolExecutor(1, 4, 60L, SECONDS,
+        new LinkedBlockingQueue<Runnable>());
+  private final ContainerExecutor exec;
+  private final FileContext lfs = getLfs();
+  static final FileContext getLfs() {
+    try {
+      return FileContext.getLocalFSFileContext();
+    } catch (UnsupportedFileSystemException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public DeletionService(ContainerExecutor exec) {
+    super(DeletionService.class.getName());
+    this.exec = exec;
+  }
+
+  /**
+   * Delete the path(s) as this user.
+   * @param user The user to delete as, or the JVM user if null
+   * @param p Paths to delete
+   */
+  public void delete(String user, Path subDir, Path... baseDirs) {
+    // TODO if parent owned by NM, rename within parent inline
+    sched.submit(new FileDeletion(user, subDir, baseDirs));
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    sched.setMaximumPoolSize(
+        conf.getInt(NM_MAX_DELETE_THREADS, DEFAULT_MAX_DELETE_THREADS));
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    sched.shutdown();
+    try {
+      sched.awaitTermination(10, SECONDS);
+    } catch (InterruptedException e) {
+      sched.shutdownNow();
+    }
+    super.stop();
+  }
+
+  private class FileDeletion implements Runnable {
+    final String user;
+    final Path subDir;
+    final Path[] baseDirs;
+    FileDeletion(String user, Path subDir, Path[] baseDirs) {
+      this.user = user;
+      this.subDir = subDir;
+      this.baseDirs = baseDirs;
+    }
+    @Override
+    public void run() {
+      if (null == user) {
+        if (baseDirs == null || baseDirs.length == 0) {
+          LOG.debug("NM deleting absolute path : " + subDir);
+          try {
+            lfs.delete(subDir, true);
+          } catch (IOException e) {
+            LOG.warn("Failed to delete " + subDir);
+          }
+          return;
+        }
+        for (Path baseDir : baseDirs) {
+          Path del = new Path(baseDir, subDir);
+          LOG.debug("NM deleting path : " + del);
+          try {
+            lfs.delete(del, true);
+          } catch (IOException e) {
+            LOG.warn("Failed to delete " + subDir);
+          }
+        }
+      } else {
+        try {
+          exec.deleteAsUser(user, subDir, baseDirs);
+        } catch (IOException e) {
+          LOG.warn("Failed to delete as user " + user, e);
+        } catch (InterruptedException e) {
+          LOG.warn("Failed to delete as user " + user, e);
+        }
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,209 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.LocalizationProtocol;
+
+public class LinuxContainerExecutor extends ContainerExecutor {
+
+  private String containerExecutorExe;
+  private static final String CONTAINER_EXECUTOR_EXEC_KEY =
+    NMConfig.NM_PREFIX + "linux-container-executor.path";
+  
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    containerExecutorExe = getContainerExecutorExecutablePath(conf);
+  }
+
+  /**
+   * List of commands that the setuid script will execute.
+   */
+  enum Commands {
+    INITIALIZE_JOB(0),
+    LAUNCH_CONTAINER(1),
+    SIGNAL_TASK(2),
+    DELETE_AS_USER(3),
+    DELETE_LOG_AS_USER(4);
+
+    private int value;
+    Commands(int value) {
+      this.value = value;
+    }
+    int getValue() {
+      return value;
+    }
+  }
+
+  /**
+   * Result codes returned from the C task-controller.
+   * These must match the values in task-controller.h.
+   */
+  enum ResultCode {
+    OK(0),
+    INVALID_USER_NAME(2),
+    INVALID_TASK_PID(9),
+    INVALID_TASKCONTROLLER_PERMISSIONS(22),
+    INVALID_CONFIG_FILE(24);
+
+    private final int value;
+    ResultCode(int value) {
+      this.value = value;
+    }
+    int getValue() {
+      return value;
+    }
+  }
+
+  protected String getContainerExecutorExecutablePath(Configuration conf) {
+    File hadoopBin = new File(System.getenv("YARN_HOME"), "bin");
+    String defaultPath =
+      new File(hadoopBin, "container-executor").getAbsolutePath();
+    return null == conf
+      ? defaultPath
+      : conf.get(CONTAINER_EXECUTOR_EXEC_KEY, defaultPath);
+  }
+
+  @Override
+  public void initApplication(Path nmLocal, LocalizationProtocol localization,
+      String user, String appId, Path logDir, List<Path> localDirs)
+      throws IOException, InterruptedException {
+    // TODO need a type
+    InetSocketAddress nmAddr = ((ResourceLocalizationService)localization).getAddress();
+    Path appFiles = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE);
+    Path appTokens = new Path(nmLocal, ApplicationLocalizer.APPTOKEN_FILE);
+    List<String> command = new ArrayList<String>(
+      Arrays.asList(containerExecutorExe, 
+                    user, 
+                    Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+                    appId,
+                    appTokens.toUri().getPath().toString(),
+                    appFiles.toUri().getPath().toString()));
+    File jvm =                                  // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+    command.add(jvm.toString());
+    command.add("-classpath");
+    command.add(System.getProperty("java.class.path"));
+    command.add(ApplicationLocalizer.class.getName());
+    command.add(user);
+    command.add(appId);
+    // add the task tracker's reporting address
+    command.add(nmAddr.getHostName());
+    command.add(Integer.toString(nmAddr.getPort()));
+    command.add(logDir.toUri().getPath().toString());
+    for (Path p : localDirs) {
+      command.add(p.toUri().getPath().toString());
+    }
+    String[] commandArray = command.toArray(new String[command.size()]);
+    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+    // TODO: DEBUG
+    LOG.info("initApplication: " + Arrays.toString(commandArray));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("initApplication: " + Arrays.toString(commandArray));
+    }
+    try {
+      shExec.execute();
+      if (LOG.isDebugEnabled()) {
+        logOutput(shExec.getOutput());
+      }
+    } catch (ExitCodeException e) {
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from task is : " + exitCode);
+      logOutput(shExec.getOutput());
+      throw new IOException("App initialization failed (" + exitCode + ")", e);
+    }
+  }
+
+  @Override
+  public int launchContainer(Container container, Path nmLocal,
+      String user, String appId, List<Path> appDirs, String stdout,
+      String stderr) throws IOException {
+    Path appWorkDir = new Path(appDirs.get(0), container.toString());
+    Path launchScript = new Path(nmLocal, ContainerLaunch.CONTAINER_SCRIPT);
+    Path appToken = new Path(nmLocal, ApplicationLocalizer.APPTOKEN_FILE);
+    List<String> command = new ArrayList<String>(
+      Arrays.asList(containerExecutorExe, 
+                    user, 
+                    Integer.toString(Commands.LAUNCH_CONTAINER.getValue()),
+                    appId,
+                    container.toString(),
+                    appWorkDir.toString(),
+                    launchScript.toUri().getPath().toString(),
+                    appToken.toUri().getPath().toString()));
+    String[] commandArray = command.toArray(new String[command.size()]);
+    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+    // DEBUG
+    LOG.info("launchContainer: " + Arrays.toString(commandArray));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("launchContainer: " + Arrays.toString(commandArray));
+    }
+    try {
+      shExec.execute();
+      if (LOG.isDebugEnabled()) {
+        logOutput(shExec.getOutput());
+      }
+    } catch (ExitCodeException e) {
+      if (shExec == null) {
+        return -1;
+      }
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from task is : " + exitCode);
+      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
+      // terminated/killed forcefully. In all other cases, log the
+      // task-controller output
+      if (exitCode != 143 && exitCode != 137) {
+        LOG.warn("Exception thrown while launching task JVM : ", e);
+        logOutput(shExec.getOutput());
+      }
+      return exitCode;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Output from LinuxTaskController's launchTask follows:");
+      logOutput(shExec.getOutput());
+    }
+    return 0;
+  }
+
+  @Override
+  public boolean signalContainer(String user, int pid, Signal signal)
+      throws IOException, InterruptedException {
+    return true;
+  }
+
+  @Override
+  public void deleteAsUser(String user, Path subDir, Path... basedirs)
+      throws IOException, InterruptedException {
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+/** this class stores all the configuration constant keys 
+ * for the nodemanager. All the configuration key variables
+ * that are going to be used in the nodemanager should be 
+ * stored here. This allows us to see all the configuration 
+ * parameters at one place.
+ */
+public class NMConfig {
+  public static final String NM_PREFIX = "yarn.server.nodemanager.";
+
+  public static final String DEFAULT_NM_BIND_ADDRESS = "0.0.0.0:45454";
+
+  /** host:port address to which to bind to **/
+  public static final String NM_BIND_ADDRESS = NM_PREFIX + "address";
+
+  public static final String DEFAULT_NM_LOCALIZER_BIND_ADDRESS = "0.0.0.0:4344";
+
+  public static final String NM_LOCALIZER_BIND_ADDRESS =
+    NM_PREFIX + "localizer.address";
+
+  public static final String NM_KEYTAB = NM_PREFIX + "keytab";
+
+  public static final String NM_CONTAINER_EXECUTOR_CLASS = NM_PREFIX
+      + "container-executor.class";
+
+  public static final String NM_LOCAL_DIR = NM_PREFIX + "local-dir";
+
+  public static final String DEFAULT_NM_LOCAL_DIR = "/tmp/nm-local-dir";
+
+  public static final String NM_LOG_DIR = NM_PREFIX + "log.dir";
+
+  public static final String DEFAULT_NM_LOG_DIR = "/tmp/logs";
+
+  public static final String NM_RESOURCE = NM_PREFIX + "resource.memory.gb";
+
+  // TODO: Should this instead be dictated by RM?
+  public static final String HEARTBEAT_INTERVAL = NM_PREFIX
+      + "heartbeat-interval";
+
+  public static final int DEFAULT_HEARTBEAT_INTERVAL = 1000;
+
+  public static final String NM_MAX_DELETE_THREADS = NM_PREFIX +
+    "max.delete.threads";
+
+  public static final int DEFAULT_MAX_DELETE_THREADS = 4;
+
+  public static final String NM_MAX_PUBLIC_FETCH_THREADS = NM_PREFIX +
+    "max.public.fetch.threads";
+
+  public static final int DEFAULT_MAX_PUBLIC_FETCH_THREADS = 4;
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,166 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.*;
+
+public class NodeManager extends CompositeService {
+
+  public NodeManager() {
+    super(NodeManager.class.getName());
+
+    Context context = new NMContext();
+
+    YarnConfiguration conf = new YarnConfiguration();
+    ContainerExecutor exec = ReflectionUtils.newInstance(
+        conf.getClass(NM_CONTAINER_EXECUTOR_CLASS,
+          DefaultContainerExecutor.class, ContainerExecutor.class), conf);
+    DeletionService del = new DeletionService(exec);
+    addService(del);
+
+    // StatusUpdater should be added first so that it can start first. Once it
+    // contacts RM, does registration and gets tokens, then only
+    // ContainerManager can start.
+    NodeStatusUpdater nodeStatusUpdater = createNodeStatusUpdater(context);
+    addService(nodeStatusUpdater);
+
+    NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
+    addService(nodeResourceMonitor);
+
+    Service containerManager =
+        createContainerManager(context, exec, del, nodeStatusUpdater);
+    addService(containerManager);
+
+    Service webServer = createWebServer();
+    addService(webServer);
+  }
+
+  protected NodeStatusUpdater createNodeStatusUpdater(Context context) {
+    return new NodeStatusUpdaterImpl(context);
+  }
+
+  protected NodeResourceMonitor createNodeResourceMonitor() {
+    return new NodeResourceMonitorImpl();
+  }
+
+  protected Service createContainerManager(Context context,
+      ContainerExecutor exec, DeletionService del,
+      NodeStatusUpdater nodeStatusUpdater) {
+    return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater);
+  }
+
+  protected WebServer createWebServer() {
+    return new WebServer();
+  }
+
+  protected void doSecureLogin() throws IOException {
+    SecurityUtil.login(getConfig(), NM_KEYTAB,
+        YarnServerConfig.NM_SERVER_PRINCIPAL_KEY);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+          @Override
+          public void run() {
+            NodeManager.this.stop();
+          }
+        });
+    super.init(conf);
+    // TODO add local dirs to del
+  }
+
+  @Override
+  public void start() {
+    try {
+      doSecureLogin();
+    } catch (IOException e) {
+      throw new YarnException("Failed NodeManager login", e);
+    }
+    super.start();
+  }
+
+  static class NMContext implements Context {
+
+    private final ConcurrentMap<ApplicationID, Application> applications =
+        new ConcurrentHashMap<ApplicationID, Application>();
+    private final ConcurrentMap<ContainerID, Container> containers =
+      new ConcurrentSkipListMap<ContainerID,Container>(
+          new Comparator<ContainerID>() {
+            @Override
+            public int compare(ContainerID a, ContainerID b) {
+              if (a.appID.id == b.appID.id) {
+                return a.id - b.id;
+              }
+              return a.appID.id - b.appID.id;
+            }
+            @Override
+            public boolean equals(Object other) {
+              return getClass().equals(other.getClass());
+            }
+          });
+
+    public NMContext() {
+    }
+
+    @Override
+    public ConcurrentMap<ApplicationID, Application> getApplications() {
+      return this.applications;
+    }
+
+    @Override
+    public ConcurrentMap<ContainerID, Container> getContainers() {
+      return this.containers;
+    }
+
+  }
+
+  public static void main(String[] args) {
+    NodeManager nodeManager = new NodeManager();
+    YarnConfiguration conf = new YarnConfiguration();
+    nodeManager.init(conf);
+    nodeManager.start();
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,25 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.yarn.service.Service;
+
+public interface NodeResourceMonitor extends Service {
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class NodeResourceMonitorImpl extends AbstractService implements
+    NodeResourceMonitor {
+
+  public NodeResourceMonitorImpl() {
+    super(NodeResourceMonitorImpl.class.getName());
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.yarn.service.Service;
+
+public interface NodeStatusUpdater extends Service {
+
+  byte[] getRMNMSharedSecret();
+
+  String getNodeName();
+
+  void sendOutofBandHeartBeat();
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,233 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.HeartbeatResponse;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.NodeStatus;
+import org.apache.hadoop.yarn.RegistrationResponse;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceTracker;
+
+public class NodeStatusUpdaterImpl extends AbstractService implements
+    NodeStatusUpdater {
+
+  private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
+
+  private final Object heartbeatMonitor = new Object();
+
+  private Context context;
+  private long heartBeatInterval;
+  private ResourceTracker resourceTracker;
+  private String rmAddress;
+  private Resource totalResource;
+  private String nodeName;
+  private NodeID nodeId;
+  private byte[] secretKeyBytes = new byte[0];
+  private boolean isStopped;
+
+  public NodeStatusUpdaterImpl(Context context) {
+    super(NodeStatusUpdaterImpl.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    this.rmAddress =
+        conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
+            YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
+    this.heartBeatInterval =
+        conf.getLong(NMConfig.HEARTBEAT_INTERVAL,
+            NMConfig.DEFAULT_HEARTBEAT_INTERVAL);
+    int memory = conf.getInt(NMConfig.NM_RESOURCE, 8);
+    this.totalResource = new Resource();
+    this.totalResource.memory = memory * 1024;
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    String bindAddress =
+        getConfig().get(NMConfig.NM_BIND_ADDRESS,
+            NMConfig.DEFAULT_NM_BIND_ADDRESS);
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddress);
+    try {
+      this.nodeName =
+          addr.getAddress().getLocalHost().getHostAddress() + ":"
+              + addr.getPort();
+      LOG.info("Configured ContainerManager Address is " + this.nodeName);
+      // Registration has to be in start so that ContainerManager can get the
+      // perNM tokens needed to authenticate ContainerTokens.
+      registerWithRM();
+      super.start();
+      startStatusUpdater();
+    } catch (Exception e) {
+      throw new AvroRuntimeException(e);
+    }
+  }
+
+  @Override
+  public synchronized void stop() {
+    // Interrupt the updater.
+    this.isStopped = true;
+    super.stop();
+  }
+
+  protected ResourceTracker getRMClient() {
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.rmAddress);
+    getConfig().setClass(
+        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        RMNMSecurityInfoClass.class, SecurityInfo.class);
+    return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
+        getConfig());
+  }
+
+  private void registerWithRM() throws AvroRemoteException {
+    this.resourceTracker = getRMClient();
+    LOG.info("Connected to ResourceManager at " + this.rmAddress);
+    RegistrationResponse regResponse =
+        this.resourceTracker.registerNodeManager(this.nodeName,
+            this.totalResource);
+    this.nodeId = regResponse.nodeID;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      this.secretKeyBytes = regResponse.secretKey.array();
+    }
+
+    LOG.info("Registered with ResourceManager as " + this.nodeName
+        + " with total resource of " + this.totalResource);
+  }
+
+  @Override
+  public String getNodeName() {
+    return this.nodeName;
+  }
+
+  @Override
+  public byte[] getRMNMSharedSecret() {
+    return this.secretKeyBytes;
+  }
+
+  private NodeStatus getNodeStatus() {
+    NodeStatus status = new NodeStatus();
+    status.nodeId = this.nodeId;
+    status.containers =
+        new HashMap<CharSequence, List<org.apache.hadoop.yarn.Container>>();
+    Map<CharSequence, List<org.apache.hadoop.yarn.Container>> activeContainers =
+        status.containers;
+
+    int numActiveContainers = 0;
+    synchronized (this.context.getContainers()) {
+      for (Iterator<Entry<ContainerID, Container>> i =
+          this.context.getContainers().entrySet().iterator(); i.hasNext();) {
+        Entry<ContainerID, Container> e = i.next();
+        ContainerID containerId = e.getKey();
+        Container container = e.getValue();
+        CharSequence applicationId = String.valueOf(containerId.appID.id); // TODO: ID? Really?
+
+        List<org.apache.hadoop.yarn.Container> applicationContainers =
+            activeContainers.get(applicationId);
+        if (applicationContainers == null) {
+          applicationContainers = new ArrayList<org.apache.hadoop.yarn.Container>();
+          activeContainers.put(applicationId, applicationContainers);
+        }
+
+        // Clone the container to send it to the RM
+        org.apache.hadoop.yarn.Container c = container.getContainer();
+        c.hostName = this.nodeName;
+        applicationContainers.add(c);
+        ++numActiveContainers;
+        LOG.info("Sending out status for container: " + c);
+
+        if (c.state == ContainerState.COMPLETE) {
+          // Remove
+          i.remove();
+
+          LOG.info("Removed completed container " + containerId);
+        }
+      }
+    }
+
+    LOG.debug(this.nodeName + " sending out status for " + numActiveContainers
+        + " containers");
+
+    return status;
+  }
+
+  @Override
+  public void sendOutofBandHeartBeat() {
+    synchronized (this.heartbeatMonitor) {
+      this.heartbeatMonitor.notify();
+    }
+  }
+
+  protected void startStatusUpdater() throws InterruptedException,
+      AvroRemoteException {
+
+    new Thread() {
+      @Override
+      public void run() {
+        int lastHeartBeatID = 0;
+        while (!isStopped) {
+          // Send heartbeat
+          try {
+            synchronized (heartbeatMonitor) {
+              heartbeatMonitor.wait(heartBeatInterval);
+            }
+            NodeStatus nodeStatus = getNodeStatus();
+            nodeStatus.responseId = lastHeartBeatID;
+            HeartbeatResponse response =
+              resourceTracker.nodeHeartbeat(nodeStatus);
+            lastHeartBeatID = response.responseId;
+          } catch (AvroRemoteException e) {
+            LOG.error("Caught exception in status-updater", e);
+            break;
+          } catch (InterruptedException e) {
+            LOG.error("Status-updater interrupted", e);
+            break;
+          }
+        }
+      }
+    }.start();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WebServer.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WebServer.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WebServer.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class WebServer extends AbstractService{
+
+  public WebServer() {
+    super(WebServer.class.getName());
+  }
+
+}



Mime
View raw message