Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,760 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A JUnit test to test ProcfsBasedProcessTree.
+ */
+public class TestProcfsBasedProcessTree {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestProcfsBasedProcessTree.class);
+ protected static File TEST_ROOT_DIR = new File("target",
+ TestProcfsBasedProcessTree.class.getName() + "-localDir");
+
+ private ShellCommandExecutor shexec = null;
+ private String pidFile, lowestDescendant;
+ private String shellScript;
+
+ private static final int N = 6; // Controls the RogueTask
+
+ private class RogueTaskThread extends Thread {
+ public void run() {
+ try {
+ Vector<String> args = new Vector<String>();
+ if(isSetsidAvailable()) {
+ args.add("setsid");
+ }
+ args.add("bash");
+ args.add("-c");
+ args.add(" echo $$ > " + pidFile + "; sh " +
+ shellScript + " " + N + ";") ;
+ shexec = new ShellCommandExecutor(args.toArray(new String[0]));
+ shexec.execute();
+ } catch (ExitCodeException ee) {
+ LOG.info("Shell Command exit with a non-zero exit code. This is" +
+ " expected as we are killing the subprocesses of the" +
+ " task intentionally. " + ee);
+ } catch (IOException ioe) {
+ LOG.info("Error executing shell command " + ioe);
+ } finally {
+ LOG.info("Exit code: " + shexec.getExitCode());
+ }
+ }
+ }
+
+ private String getRogueTaskPID() {
+ File f = new File(pidFile);
+ while (!f.exists()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+
+ // read from pidFile
+ return getPidFromPidFile(pidFile);
+ }
+
+ @Before
+ public void setup() throws IOException {
+ FileContext.getLocalFSFileContext().delete(
+ new Path(TEST_ROOT_DIR.getAbsolutePath()), true);
+ }
+
+ @Test
+ public void testProcessTree() throws Exception {
+
+ try {
+ if (!ProcfsBasedProcessTree.isAvailable()) {
+ System.out
+ .println("ProcfsBasedProcessTree is not available on this system. Not testing");
+ return;
+ }
+ } catch (Exception e) {
+ LOG.info(StringUtils.stringifyException(e));
+ return;
+ }
+ // create shell script
+ Random rm = new Random();
+ File tempFile =
+ new File(TEST_ROOT_DIR, getClass().getName() + "_shellScript_"
+ + rm.nextInt() + ".sh");
+ tempFile.deleteOnExit();
+ shellScript = TEST_ROOT_DIR + File.separator + tempFile.getName();
+
+ // create pid file
+ tempFile =
+ new File(TEST_ROOT_DIR, getClass().getName() + "_pidFile_"
+ + rm.nextInt() + ".pid");
+ tempFile.deleteOnExit();
+ pidFile = TEST_ROOT_DIR + File.separator + tempFile.getName();
+
+ lowestDescendant = TEST_ROOT_DIR + File.separator + "lowestDescendantPidFile";
+
+ // write to shell-script
+ try {
+ FileWriter fWriter = new FileWriter(shellScript);
+ fWriter.write(
+ "# rogue task\n" +
+ "sleep 1\n" +
+ "echo hello\n" +
+ "if [ $1 -ne 0 ]\n" +
+ "then\n" +
+ " sh " + shellScript + " $(($1-1))\n" +
+ "else\n" +
+ " echo $$ > " + lowestDescendant + "\n" +
+ " while true\n do\n" +
+ " sleep 5\n" +
+ " done\n" +
+ "fi");
+ fWriter.close();
+ } catch (IOException ioe) {
+ LOG.info("Error: " + ioe);
+ return;
+ }
+
+ Thread t = new RogueTaskThread();
+ t.start();
+ String pid = getRogueTaskPID();
+ LOG.info("Root process pid: " + pid);
+ ProcfsBasedProcessTree p = createProcessTree(pid);
+ p = p.getProcessTree(); // initialize
+ LOG.info("ProcessTree: " + p.toString());
+
+ File leaf = new File(lowestDescendant);
+ //wait till lowest descendant process of Rougue Task starts execution
+ while (!leaf.exists()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+
+ p = p.getProcessTree(); // reconstruct
+ LOG.info("ProcessTree: " + p.toString());
+
+ // Get the process-tree dump
+ String processTreeDump = p.getProcessTreeDump();
+
+ // destroy the process and all its subprocesses
+ destroyProcessTree(pid);
+
+ if (isSetsidAvailable()) { // whole processtree should be gone
+ Assert.assertFalse("Proceesses in process group live",
+ isAnyProcessInTreeAlive(p));
+ } else {// process should be gone
+ Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
+ }
+
+ LOG.info("Process-tree dump follows: \n" + processTreeDump);
+ Assert.assertTrue("Process-tree dump doesn't start with a proper header",
+ processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
+ "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
+ "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
+ for (int i = N; i >= 0; i--) {
+ String cmdLineDump = "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\)" +
+ " [0-9]+ [0-9]+ [0-9]+ [0-9]+ sh " + shellScript + " " + i;
+ Pattern pat = Pattern.compile(cmdLineDump);
+ Matcher mat = pat.matcher(processTreeDump);
+ Assert.assertTrue("Process-tree dump doesn't contain the cmdLineDump of " + i
+ + "th process!", mat.find());
+ }
+
+ // Not able to join thread sometimes when forking with large N.
+ try {
+ t.join(2000);
+ LOG.info("RogueTaskThread successfully joined.");
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted while joining RogueTaskThread.");
+ }
+
+ // ProcessTree is gone now. Any further calls should be sane.
+ p = p.getProcessTree();
+ Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
+ Assert.assertTrue("Cumulative vmem for the gone-process is "
+ + p.getCumulativeVmem() + " . It should be zero.", p
+ .getCumulativeVmem() == 0);
+ Assert.assertTrue(p.toString().equals("[ ]"));
+ }
+
+ protected ProcfsBasedProcessTree createProcessTree(String pid) {
+ return new ProcfsBasedProcessTree(pid,
+ isSetsidAvailable());
+ }
+
+ protected ProcfsBasedProcessTree createProcessTree(String pid,
+ boolean setsidUsed, String procfsRootDir) {
+ return new ProcfsBasedProcessTree(pid, setsidUsed, procfsRootDir);
+ }
+
+ protected void destroyProcessTree(String pid) throws IOException {
+ sendSignal(pid, 9);
+ }
+
+ /**
+ * Get PID from a pid-file.
+ *
+ * @param pidFileName
+ * Name of the pid-file.
+ * @return the PID string read from the pid-file. Returns null if the
+ * pidFileName points to a non-existing file or if read fails from the
+ * file.
+ */
+ public static String getPidFromPidFile(String pidFileName) {
+ BufferedReader pidFile = null;
+ FileReader fReader = null;
+ String pid = null;
+
+ try {
+ fReader = new FileReader(pidFileName);
+ pidFile = new BufferedReader(fReader);
+ } catch (FileNotFoundException f) {
+ LOG.debug("PidFile doesn't exist : " + pidFileName);
+ return pid;
+ }
+
+ try {
+ pid = pidFile.readLine();
+ } catch (IOException i) {
+ LOG.error("Failed to read from " + pidFileName);
+ } finally {
+ try {
+ if (fReader != null) {
+ fReader.close();
+ }
+ try {
+ if (pidFile != null) {
+ pidFile.close();
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + pidFile);
+ }
+ } catch (IOException i) {
+ LOG.warn("Error closing the stream " + fReader);
+ }
+ }
+ return pid;
+ }
+
+ public static class ProcessStatInfo {
+ // sample stat in a single line : 3910 (gpm) S 1 3910 3910 0 -1 4194624
+ // 83 0 0 0 0 0 0 0 16 0 1 0 7852 2408448 88 4294967295 134512640
+ // 134590050 3220521392 3220520036 10975138 0 0 4096 134234626
+ // 4294967295 0 0 17 1 0 0
+ String pid;
+ String name;
+ String ppid;
+ String pgrpId;
+ String session;
+ String vmem = "0";
+ String rssmemPage = "0";
+ String utime = "0";
+ String stime = "0";
+
+ public ProcessStatInfo(String[] statEntries) {
+ pid = statEntries[0];
+ name = statEntries[1];
+ ppid = statEntries[2];
+ pgrpId = statEntries[3];
+ session = statEntries[4];
+ vmem = statEntries[5];
+ if (statEntries.length > 6) {
+ rssmemPage = statEntries[6];
+ }
+ if (statEntries.length > 7) {
+ utime = statEntries[7];
+ stime = statEntries[8];
+ }
+ }
+
+ // construct a line that mimics the procfs stat file.
+ // all unused numerical entries are set to 0.
+ public String getStatLine() {
+ return String.format("%s (%s) S %s %s %s 0 0 0" +
+ " 0 0 0 0 %s %s 0 0 0 0 0 0 0 %s %s 0 0" +
+ " 0 0 0 0 0 0 0 0" +
+ " 0 0 0 0 0",
+ pid, name, ppid, pgrpId, session,
+ utime, stime, vmem, rssmemPage);
+ }
+ }
+
+ /**
+ * A basic test that creates a few process directories and writes
+ * stat files. Verifies that the cpu time and memory is correctly
+ * computed.
+ * @throws IOException if there was a problem setting up the
+ * fake procfs directories or files.
+ */
+ @Test
+ public void testCpuAndMemoryForProcessTree() throws IOException {
+
+ // test processes
+ String[] pids = { "100", "200", "300", "400" };
+ // create the fake procfs root directory.
+ File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+ try {
+ setupProcfsRootDir(procfsRootDir);
+ setupPidDirs(procfsRootDir, pids);
+
+ // create stat objects.
+ // assuming processes 100, 200, 300 are in tree and 400 is not.
+ ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
+ procInfos[0] = new ProcessStatInfo(new String[]
+ {"100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
+ procInfos[1] = new ProcessStatInfo(new String[]
+ {"200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
+ procInfos[2] = new ProcessStatInfo(new String[]
+ {"300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
+ procInfos[3] = new ProcessStatInfo(new String[]
+ {"400", "proc4", "1", "400", "400", "400000", "400", "4000", "800"});
+
+ writeStatFiles(procfsRootDir, pids, procInfos);
+
+ // crank up the process tree class.
+ ProcfsBasedProcessTree processTree =
+ createProcessTree("100", true, procfsRootDir.getAbsolutePath());
+ // build the process tree.
+ processTree.getProcessTree();
+
+ // verify cumulative memory
+ Assert.assertEquals("Cumulative virtual memory does not match", 600000L,
+ processTree.getCumulativeVmem());
+
+ // verify rss memory
+ long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+ 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+ Assert.assertEquals("Cumulative rss memory does not match",
+ cumuRssMem, processTree.getCumulativeRssmem());
+
+ // verify cumulative cpu time
+ long cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
+ 7200L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
+ Assert.assertEquals("Cumulative cpu time does not match",
+ cumuCpuTime, processTree.getCumulativeCpuTime());
+
+ // test the cpu time again to see if it cumulates
+ procInfos[0] = new ProcessStatInfo(new String[]
+ {"100", "proc1", "1", "100", "100", "100000", "100", "2000", "300"});
+ procInfos[1] = new ProcessStatInfo(new String[]
+ {"200", "proc2", "100", "100", "100", "200000", "200", "3000", "500"});
+ writeStatFiles(procfsRootDir, pids, procInfos);
+
+ // build the process tree.
+ processTree.getProcessTree();
+
+ // verify cumulative cpu time again
+ cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
+ 9400L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
+ Assert.assertEquals("Cumulative cpu time does not match",
+ cumuCpuTime, processTree.getCumulativeCpuTime());
+ } finally {
+ FileUtil.fullyDelete(procfsRootDir);
+ }
+ }
+
+ /**
+ * Tests that cumulative memory is computed only for
+ * processes older than a given age.
+ * @throws IOException if there was a problem setting up the
+ * fake procfs directories or files.
+ */
+ @Test
+ public void testMemForOlderProcesses() throws IOException {
+ // initial list of processes
+ String[] pids = { "100", "200", "300", "400" };
+ // create the fake procfs root directory.
+ File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+ try {
+ setupProcfsRootDir(procfsRootDir);
+ setupPidDirs(procfsRootDir, pids);
+
+ // create stat objects.
+ // assuming 100, 200 and 400 are in tree, 300 is not.
+ ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
+ procInfos[0] = new ProcessStatInfo(new String[]
+ {"100", "proc1", "1", "100", "100", "100000", "100"});
+ procInfos[1] = new ProcessStatInfo(new String[]
+ {"200", "proc2", "100", "100", "100", "200000", "200"});
+ procInfos[2] = new ProcessStatInfo(new String[]
+ {"300", "proc3", "1", "300", "300", "300000", "300"});
+ procInfos[3] = new ProcessStatInfo(new String[]
+ {"400", "proc4", "100", "100", "100", "400000", "400"});
+
+ writeStatFiles(procfsRootDir, pids, procInfos);
+
+ // crank up the process tree class.
+ ProcfsBasedProcessTree processTree =
+ createProcessTree("100", true, procfsRootDir.getAbsolutePath());
+ // build the process tree.
+ processTree.getProcessTree();
+
+ // verify cumulative memory
+ Assert.assertEquals("Cumulative memory does not match",
+ 700000L, processTree.getCumulativeVmem());
+
+ // write one more process as child of 100.
+ String[] newPids = { "500" };
+ setupPidDirs(procfsRootDir, newPids);
+
+ ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1];
+ newProcInfos[0] = new ProcessStatInfo(new String[]
+ {"500", "proc5", "100", "100", "100", "500000", "500"});
+ writeStatFiles(procfsRootDir, newPids, newProcInfos);
+
+ // check memory includes the new process.
+ processTree.getProcessTree();
+ Assert.assertEquals("Cumulative vmem does not include new process",
+ 1200000L, processTree.getCumulativeVmem());
+ long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+ 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+ Assert.assertEquals("Cumulative rssmem does not include new process",
+ cumuRssMem, processTree.getCumulativeRssmem());
+
+ // however processes older than 1 iteration will retain the older value
+ Assert.assertEquals("Cumulative vmem shouldn't have included new process",
+ 700000L, processTree.getCumulativeVmem(1));
+ cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+ 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+ Assert.assertEquals("Cumulative rssmem shouldn't have included new process",
+ cumuRssMem, processTree.getCumulativeRssmem(1));
+
+ // one more process
+ newPids = new String[]{ "600" };
+ setupPidDirs(procfsRootDir, newPids);
+
+ newProcInfos = new ProcessStatInfo[1];
+ newProcInfos[0] = new ProcessStatInfo(new String[]
+ {"600", "proc6", "100", "100", "100", "600000", "600"});
+ writeStatFiles(procfsRootDir, newPids, newProcInfos);
+
+ // refresh process tree
+ processTree.getProcessTree();
+
+ // processes older than 2 iterations should be same as before.
+ Assert.assertEquals("Cumulative vmem shouldn't have included new processes",
+ 700000L, processTree.getCumulativeVmem(2));
+ cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+ 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+ Assert.assertEquals("Cumulative rssmem shouldn't have included new processes",
+ cumuRssMem, processTree.getCumulativeRssmem(2));
+
+ // processes older than 1 iteration should not include new process,
+ // but include process 500
+ Assert.assertEquals("Cumulative vmem shouldn't have included new processes",
+ 1200000L, processTree.getCumulativeVmem(1));
+ cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+ 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+ Assert.assertEquals("Cumulative rssmem shouldn't have included new processes",
+ cumuRssMem, processTree.getCumulativeRssmem(1));
+
+ // no processes older than 3 iterations, this should be 0
+ Assert.assertEquals("Getting non-zero vmem for processes older than 3 iterations",
+ 0L, processTree.getCumulativeVmem(3));
+ Assert.assertEquals("Getting non-zero rssmem for processes older than 3 iterations",
+ 0L, processTree.getCumulativeRssmem(3));
+ } finally {
+ FileUtil.fullyDelete(procfsRootDir);
+ }
+ }
+
+ /**
+ * Verifies ProcfsBasedProcessTree.checkPidPgrpidForMatch() in case of
+ * 'constructProcessInfo() returning null' by not writing stat file for the
+ * mock process
+ * @throws IOException if there was a problem setting up the
+ * fake procfs directories or files.
+ */
+ @Test
+ public void testDestroyProcessTree() throws IOException {
+ // test process
+ String pid = "100";
+ // create the fake procfs root directory.
+ File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+ try {
+ setupProcfsRootDir(procfsRootDir);
+
+ // crank up the process tree class.
+ ProcfsBasedProcessTree processTree =
+ createProcessTree(pid, true, procfsRootDir.getAbsolutePath());
+
+ // Let us not create stat file for pid 100.
+ Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
+ Integer.valueOf(pid), procfsRootDir.getAbsolutePath()));
+ } finally {
+ FileUtil.fullyDelete(procfsRootDir);
+ }
+ }
+
+ /**
+ * Test the correctness of process-tree dump.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testProcessTreeDump()
+ throws IOException {
+
+ String[] pids = { "100", "200", "300", "400", "500", "600" };
+
+ File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+ try {
+ setupProcfsRootDir(procfsRootDir);
+ setupPidDirs(procfsRootDir, pids);
+
+ int numProcesses = pids.length;
+ // Processes 200, 300, 400 and 500 are descendants of 100. 600 is not.
+ ProcessStatInfo[] procInfos = new ProcessStatInfo[numProcesses];
+ procInfos[0] = new ProcessStatInfo(new String[] {
+ "100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
+ procInfos[1] = new ProcessStatInfo(new String[] {
+ "200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
+ procInfos[2] = new ProcessStatInfo(new String[] {
+ "300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
+ procInfos[3] = new ProcessStatInfo(new String[] {
+ "400", "proc4", "200", "100", "100", "400000", "400", "4000", "800"});
+ procInfos[4] = new ProcessStatInfo(new String[] {
+ "500", "proc5", "400", "100", "100", "400000", "400", "4000", "800"});
+ procInfos[5] = new ProcessStatInfo(new String[] {
+ "600", "proc6", "1", "1", "1", "400000", "400", "4000", "800"});
+
+ String[] cmdLines = new String[numProcesses];
+ cmdLines[0] = "proc1 arg1 arg2";
+ cmdLines[1] = "proc2 arg3 arg4";
+ cmdLines[2] = "proc3 arg5 arg6";
+ cmdLines[3] = "proc4 arg7 arg8";
+ cmdLines[4] = "proc5 arg9 arg10";
+ cmdLines[5] = "proc6 arg11 arg12";
+
+ writeStatFiles(procfsRootDir, pids, procInfos);
+ writeCmdLineFiles(procfsRootDir, pids, cmdLines);
+
+ ProcfsBasedProcessTree processTree = createProcessTree(
+ "100", true, procfsRootDir.getAbsolutePath());
+ // build the process tree.
+ processTree.getProcessTree();
+
+ // Get the process-tree dump
+ String processTreeDump = processTree.getProcessTreeDump();
+
+ LOG.info("Process-tree dump follows: \n" + processTreeDump);
+ Assert.assertTrue("Process-tree dump doesn't start with a proper header",
+ processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
+ "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
+ "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
+ for (int i = 0; i < 5; i++) {
+ ProcessStatInfo p = procInfos[i];
+ Assert.assertTrue(
+ "Process-tree dump doesn't contain the cmdLineDump of process "
+ + p.pid, processTreeDump.contains("\t|- " + p.pid + " "
+ + p.ppid + " " + p.pgrpId + " " + p.session + " (" + p.name
+ + ") " + p.utime + " " + p.stime + " " + p.vmem + " "
+ + p.rssmemPage + " " + cmdLines[i]));
+ }
+
+ // 600 should not be in the dump
+ ProcessStatInfo p = procInfos[5];
+ Assert.assertFalse(
+ "Process-tree dump shouldn't contain the cmdLineDump of process "
+ + p.pid, processTreeDump.contains("\t|- " + p.pid + " " + p.ppid
+ + " " + p.pgrpId + " " + p.session + " (" + p.name + ") "
+ + p.utime + " " + p.stime + " " + p.vmem + " " + cmdLines[5]));
+ } finally {
+ FileUtil.fullyDelete(procfsRootDir);
+ }
+ }
+
+ protected static boolean isSetsidAvailable() {
+ ShellCommandExecutor shexec = null;
+ boolean setsidSupported = true;
+ try {
+ String[] args = {"setsid", "bash", "-c", "echo $$"};
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("setsid is not available on this machine. So not using it.");
+ setsidSupported = false;
+ } finally { // handle the exit code
+ LOG.info("setsid exited with exit code " + shexec.getExitCode());
+ }
+ return setsidSupported;
+ }
+
+ /**
+ * Is the root-process alive?
+ * Used only in tests.
+ * @return true if the root-process is alive, false otherwise.
+ */
+ private static boolean isAlive(String pid) {
+ try {
+ final String sigpid = isSetsidAvailable() ? "-" + pid : pid;
+ try {
+ sendSignal(sigpid, 0);
+ } catch (ExitCodeException e) {
+ return false;
+ }
+ return true;
+ } catch (IOException ignored) {
+ }
+ return false;
+ }
+
+ private static void sendSignal(String pid, int signal) throws IOException {
+ ShellCommandExecutor shexec = null;
+ String[] arg = { "kill", "-" + signal, pid };
+ shexec = new ShellCommandExecutor(arg);
+ shexec.execute();
+ }
+
+ /**
+ * Is any of the subprocesses in the process-tree alive?
+ * Used only in tests.
+ * @return true if any of the processes in the process-tree is
+ * alive, false otherwise.
+ */
+ private static boolean isAnyProcessInTreeAlive(
+ ProcfsBasedProcessTree processTree) {
+ for (Integer pId : processTree.getCurrentProcessIDs()) {
+ if (isAlive(pId.toString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Create a directory to mimic the procfs file system's root.
+ * @param procfsRootDir root directory to create.
+ * @throws IOException if could not delete the procfs root directory
+ */
+ public static void setupProcfsRootDir(File procfsRootDir)
+ throws IOException {
+ // cleanup any existing process root dir.
+ if (procfsRootDir.exists()) {
+ Assert.assertTrue(FileUtil.fullyDelete(procfsRootDir));
+ }
+
+ // create afresh
+ Assert.assertTrue(procfsRootDir.mkdirs());
+ }
+
+ /**
+ * Create PID directories under the specified procfs root directory
+ * @param procfsRootDir root directory of procfs file system
+ * @param pids the PID directories to create.
+ * @throws IOException If PID dirs could not be created
+ */
+ public static void setupPidDirs(File procfsRootDir, String[] pids)
+ throws IOException {
+ for (String pid : pids) {
+ File pidDir = new File(procfsRootDir, pid);
+ pidDir.mkdir();
+ if (!pidDir.exists()) {
+ throw new IOException ("couldn't make process directory under " +
+ "fake procfs");
+ } else {
+ LOG.info("created pid dir");
+ }
+ }
+ }
+
+ /**
+ * Write stat files under the specified pid directories with data
+ * setup in the corresponding ProcessStatInfo objects
+ * @param procfsRootDir root directory of procfs file system
+ * @param pids the PID directories under which to create the stat file
+ * @param procs corresponding ProcessStatInfo objects whose data should be
+ * written to the stat files.
+ * @throws IOException if stat files could not be written
+ */
+ public static void writeStatFiles(File procfsRootDir, String[] pids,
+ ProcessStatInfo[] procs) throws IOException {
+ for (int i=0; i<pids.length; i++) {
+ File statFile =
+ new File(new File(procfsRootDir, pids[i]),
+ ProcfsBasedProcessTree.PROCFS_STAT_FILE);
+ BufferedWriter bw = null;
+ try {
+ FileWriter fw = new FileWriter(statFile);
+ bw = new BufferedWriter(fw);
+ bw.write(procs[i].getStatLine());
+ LOG.info("wrote stat file for " + pids[i] +
+ " with contents: " + procs[i].getStatLine());
+ } finally {
+ // not handling exception - will throw an error and fail the test.
+ if (bw != null) {
+ bw.close();
+ }
+ }
+ }
+ }
+
+ private static void writeCmdLineFiles(File procfsRootDir, String[] pids,
+ String[] cmdLines)
+ throws IOException {
+ for (int i = 0; i < pids.length; i++) {
+ File statFile =
+ new File(new File(procfsRootDir, pids[i]),
+ ProcfsBasedProcessTree.PROCFS_CMDLINE_FILE);
+ BufferedWriter bw = null;
+ try {
+ bw = new BufferedWriter(new FileWriter(statFile));
+ bw.write(cmdLines[i]);
+ LOG.info("wrote command-line file for " + pids[i] + " with contents: "
+ + cmdLines[i]);
+ } finally {
+ // not handling exception - will throw an error and fail the test.
+ if (bw != null) {
+ bw.close();
+ }
+ }
+ }
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java Tue Apr 12 07:56:07 2011
@@ -151,7 +151,7 @@ public class TestRPC {
status = recordFactory.newRecordInstance(ContainerStatus.class);
status.setState(ContainerState.RUNNING);
status.setContainerId(container.getContainerId());
- status.setExitStatus(0);
+ status.setExitStatus(String.valueOf(0));
return response;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/Clock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/Clock.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/Clock.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/Clock.java Tue Apr 12 07:56:07 2011
@@ -21,8 +21,8 @@ package org.apache.hadoop;
/**
* A clock class - can be mocked out for testing.
*/
-class Clock {
- long getTime() {
+public class Clock {
+ public long getTime() {
return System.currentTimeMillis();
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml Tue Apr 12 07:56:07 2011
@@ -71,8 +71,7 @@
<value>0.0.0.0:45454</value>
</property>
-<property><name>yarn.server.nodemanager.connect.rm</name><value>true</value></property>
-
+ <!-- HealthChecker's properties -->
<property>
<name>yarn.server.nodemanager.healthchecker.script.path</name>
<value></value>
@@ -130,6 +129,26 @@
<description>Arguments to be passed to the health-check script run
by the NodeManager</description>
</property>
+ <!-- End of HealthChecker's properties -->
+
+ <!-- ContainerMonitor related properties -->
+
+ <property>
+ <name>yarn.server.nodemanager.containers-monitor.monitoring-interval</name>
+ <value>3000</value>
+ </property>
+
+ <property>
+ <name>yarn.server.nodemanager.containers-monitor.resourcecalculatorplugin</name>
+ <value></value>
+ </property>
+
+ <property>
+ <name>yarn.server.nodemanager.reserved-physical-memory.mb</name>
+ <value>-1</value>
+ </property>
+
+ <!-- End of ContainerMonitor related properties -->
<!-- All MRAppMaster related configuration properties -->
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/pom.xml Tue Apr 12 07:56:07 2011
@@ -1,5 +1,10 @@
<?xml version="1.0"?>
<project>
+
+ <properties>
+ <yarn.version>1.0-SNAPSHOT</yarn.version>
+ </properties>
+
<parent>
<artifactId>yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java Tue Apr 12 07:56:07 2011
@@ -50,7 +50,9 @@ public class NMConfig {
public static final String DEFAULT_NM_LOG_DIR = "/tmp/logs";
- public static final String NM_RESOURCE = NM_PREFIX + "resource.memory.gb";
+ public static final int DEFAULT_NM_VMEM_GB = 8;
+
+ public static final String NM_VMEM_GB = NM_PREFIX + "resource.memory.gb";
// TODO: Should this instead be dictated by RM?
public static final String HEARTBEAT_INTERVAL = NM_PREFIX
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Apr 12 07:56:07 2011
@@ -94,7 +94,7 @@ public class NodeStatusUpdaterImpl exten
this.heartBeatInterval =
conf.getLong(NMConfig.HEARTBEAT_INTERVAL,
NMConfig.DEFAULT_HEARTBEAT_INTERVAL);
- int memory = conf.getInt(NMConfig.NM_RESOURCE, 8);
+ int memory = conf.getInt(NMConfig.NM_VMEM_GB, 8);
this.totalResource = recordFactory.newRecordInstance(Resource.class);
this.totalResource.setMemory(memory * 1024);
super.init(conf);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Apr 12 07:56:07 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
@@ -130,7 +132,8 @@ public class ContainerManagerImpl extend
auxiluaryServices.register(this);
addService(auxiluaryServices);
- ContainersMonitor containersMonitor = new ContainersMonitor();
+ ContainersMonitor containersMonitor =
+ new ContainersMonitorImpl(exec, dispatcher);
addService(containersMonitor);
dispatcher.register(ContainerEventType.class,
@@ -247,6 +250,9 @@ public class ContainerManagerImpl extend
// + containerID + " on this NodeManager!!");
}
dispatcher.getEventHandler().handle(
+ new ContainerDiagnosticsUpdateEvent(containerID,
+ "Container killed by the application."));
+ dispatcher.getEventHandler().handle(
new ContainerEvent(containerID, ContainerEventType.KILL_CONTAINER));
// TODO: Move this code to appropriate place once kill_container is
@@ -324,6 +330,9 @@ public class ContainerManagerImpl extend
for (org.apache.hadoop.yarn.api.records.Container container : containersFinishedEvent
.getContainersToCleanup()) {
this.dispatcher.getEventHandler().handle(
+ new ContainerDiagnosticsUpdateEvent(container.getId(),
+ "Container Killed by ResourceManager"));
+ this.dispatcher.getEventHandler().handle(
new ContainerEvent(container.getId(),
ContainerEventType.KILL_CONTAINER));
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Tue Apr 12 07:56:07 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -318,6 +319,9 @@ public class ApplicationImpl implements
// application.
for (ContainerId containerID : app.containers.keySet()) {
app.dispatcher.getEventHandler().handle(
+ new ContainerDiagnosticsUpdateEvent(containerID,
+ "Container killed on application-finish."));
+ app.dispatcher.getEventHandler().handle(
new ContainerEvent(containerID,
ContainerEventType.KILL_CONTAINER));
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerDiagnosticsUpdateEvent.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,17 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerDiagnosticsUpdateEvent extends ContainerEvent {
+
+ private final String diagnosticsUpdate;
+
+ public ContainerDiagnosticsUpdateEvent(ContainerId cID, String update) {
+ super(cID, ContainerEventType.UPDATE_DIAGNOSTICS_MSG);
+ this.diagnosticsUpdate = update;
+ }
+
+ public String getDiagnosticsUpdate() {
+ return this.diagnosticsUpdate;
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java Tue Apr 12 07:56:07 2011
@@ -23,6 +23,7 @@ public enum ContainerEventType {
// Producer: ContainerManager
INIT_CONTAINER,
KILL_CONTAINER,
+ UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
// DownloadManager
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Apr 12 07:56:07 2011
@@ -36,8 +36,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
@@ -49,6 +49,7 @@ public class ContainerImpl implements Co
private final Dispatcher dispatcher;
private final ContainerLaunchContext launchContext;
private int exitCode;
+ private final StringBuilder diagnostics;
private static final Log LOG = LogFactory.getLog(Container.class);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@@ -57,6 +58,7 @@ public class ContainerImpl implements Co
ContainerLaunchContext launchContext) {
this.dispatcher = dispatcher;
this.launchContext = launchContext;
+ this.diagnostics = new StringBuilder();
stateMachine = stateMachineFactory.make(this);
}
@@ -64,6 +66,9 @@ public class ContainerImpl implements Co
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition();
+ private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
+ new ContainerDiagnosticsUpdateTransition();
+
// State Machine for each container.
private static StateMachineFactory
<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
@@ -72,6 +77,9 @@ public class ContainerImpl implements Co
// From NEW State
.addTransition(ContainerState.NEW, ContainerState.LOCALIZING,
ContainerEventType.INIT_CONTAINER)
+ .addTransition(ContainerState.NEW, ContainerState.NEW,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, CONTAINER_DONE_TRANSITION)
@@ -80,6 +88,9 @@ public class ContainerImpl implements Co
ContainerState.LOCALIZED,
ContainerEventType.CONTAINER_RESOURCES_LOCALIZED,
new LocalizedTransition())
+ .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.LOCALIZING,
ContainerState.CONTAINER_RESOURCES_CLEANINGUP,
ContainerEventType.KILL_CONTAINER,
@@ -91,6 +102,9 @@ public class ContainerImpl implements Co
.addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition())
+ .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
// From RUNNING State
.addTransition(ContainerState.RUNNING,
@@ -101,6 +115,9 @@ public class ContainerImpl implements Co
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
@@ -109,6 +126,10 @@ public class ContainerImpl implements Co
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.KILL_CONTAINER)
@@ -117,6 +138,10 @@ public class ContainerImpl implements Co
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_FAILURE,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.KILL_CONTAINER)
@@ -125,6 +150,9 @@ public class ContainerImpl implements Co
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new ContainerKilledTransition())
+ .addTransition(ContainerState.KILLING, ContainerState.KILLING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER)
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
@@ -139,10 +167,17 @@ public class ContainerImpl implements Co
ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION)
+ .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, CONTAINER_DONE_TRANSITION)
+ .addTransition(ContainerState.DONE, ContainerState.DONE,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
// create the topology tables
.installTopology();
@@ -203,7 +238,8 @@ public class ContainerImpl implements Co
ContainerStatus containerStatus = recordFactory.newRecordInstance(ContainerStatus.class);
containerStatus.setState(getCurrentState());
containerStatus.setContainerId(this.launchContext.getContainerId());
- containerStatus.setExitStatus(exitCode);
+ containerStatus.setDiagnostics(diagnostics.toString());
+ containerStatus.setExitStatus(String.valueOf(exitCode));
return containerStatus;
}
@@ -244,9 +280,12 @@ public class ContainerImpl implements Co
public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the ContainersMonitor to start monitoring the container's
// resource usage.
+ // TODO: Fix pmem limits below
+ long vmemBytes =
+ container.getLaunchContext().getResource().getMemory() * 1024 * 1024L;
container.dispatcher.getEventHandler().handle(
- new ContainersMonitorEvent(
- ContainersMonitorEventType.START_MONITORING_CONTAINER));
+ new ContainerStartMonitoringEvent(container.getContainerID(),
+ vmemBytes, -1));
}
}
@@ -326,6 +365,20 @@ public class ContainerImpl implements Co
// Inform the application
container.dispatcher.getEventHandler().handle(
new ApplicationContainerFinishedEvent(container.getContainerID()));
+ // Remove the container from the resource-monitor
+ container.dispatcher.getEventHandler().handle(
+ new ContainerStopMonitoringEvent(container.getContainerID()));
+ }
+ }
+
+ static class ContainerDiagnosticsUpdateTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ ContainerDiagnosticsUpdateEvent updateEvent =
+ (ContainerDiagnosticsUpdateEvent) event;
+ container.diagnostics.append(updateEvent.getDiagnosticsUpdate())
+ .append("\n");
}
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,25 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerStartMonitoringEvent extends ContainersMonitorEvent {
+
+ private final long vmemLimit;
+ private final long pmemLimit;
+
+ public ContainerStartMonitoringEvent(ContainerId containerId,
+ long vmemLimit, long pmemLimit) {
+ super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER);
+ this.vmemLimit = vmemLimit;
+ this.pmemLimit = pmemLimit;
+ }
+
+ public long getVmemLimit() {
+ return this.vmemLimit;
+ }
+
+ public long getPmemLimit() {
+ return this.pmemLimit;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,11 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerStopMonitoringEvent extends ContainersMonitorEvent {
+
+ public ContainerStopMonitoringEvent(ContainerId containerId) {
+ super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java Tue Apr 12 07:56:07 2011
@@ -19,18 +19,9 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
-public class ContainersMonitor extends AbstractService
- implements EventHandler<ContainersMonitorEvent> {
-
- public ContainersMonitor() {
- super("containers-monitor");
- }
-
- @Override
- public void handle(ContainersMonitorEvent monitorEvent) {
- // TODO
- }
+public interface ContainersMonitor extends Service,
+ EventHandler<ContainersMonitorEvent> {
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java Tue Apr 12 07:56:07 2011
@@ -18,14 +18,22 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.Event;
public class ContainersMonitorEvent extends
AbstractEvent<ContainersMonitorEventType> {
- public ContainersMonitorEvent(ContainersMonitorEventType eventType) {
+ private final ContainerId containerId;
+
+ public ContainersMonitorEvent(ContainerId containerId,
+ ContainersMonitorEventType eventType) {
super(eventType);
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
}
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1091316&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Tue Apr 12 07:56:07 2011
@@ -0,0 +1,495 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+
+public class ContainersMonitorImpl extends AbstractService implements
+ ContainersMonitor {
+
+ final static Log LOG = LogFactory
+ .getLog(ContainersMonitorImpl.class);
+
+ private final static String MONITORING_INTERVAL_CONFIG_KEY =
+ NMConfig.NM_PREFIX + "containers-monitor.monitoring-interval";
+ public static final String RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY =
+ NMConfig.NM_PREFIX + "containers-monitor.resourcecalculatorplugin";
+ public static final String NM_RESERVED_PHYSICALMEMORY_MB =
+ NMConfig.NM_PREFIX + "reserved-physical-memory.mb";
+
+ private final static int MONITORING_INTERVAL_DEFAULT = 3000;
+ private long monitoringInterval;
+ private MonitoringThread monitoringThread;
+
+ List<ContainerId> containersToBeRemoved;
+ Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
+ Map<ContainerId, ProcessTreeInfo> trackingContainers =
+ new HashMap<ContainerId, ProcessTreeInfo>();
+
+ final ContainerExecutor containerExecutor;
+ private final Dispatcher eventDispatcher;
+ private ResourceCalculatorPlugin resourceCalculatorPlugin;
+
+ private long maxVmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
+ private long maxPmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
+
+ /**
+ * A value which if set for memory related configuration options, indicates
+ * that the options are turned off.
+ */
+ public static final long DISABLED_MEMORY_LIMIT = -1L;
+
+ private static final String MEMORY_USAGE_STRING =
+ "Memory usage of ProcessTree %s for container-id %s : Virtual %d bytes, "
+ +
+ "limit : %d bytes; Physical %d bytes, limit %d bytes";
+
+ public ContainersMonitorImpl(ContainerExecutor exec,
+ AsyncDispatcher dispatcher) {
+ super("containers-monitor");
+
+ this.containerExecutor = exec;
+ this.eventDispatcher = dispatcher;
+
+ this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
+ this.containersToBeRemoved = new ArrayList<ContainerId>();
+ this.monitoringThread = new MonitoringThread();
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ this.monitoringInterval =
+ conf.getLong(MONITORING_INTERVAL_CONFIG_KEY,
+ MONITORING_INTERVAL_DEFAULT);
+
+ Class<? extends ResourceCalculatorPlugin> clazz =
+ conf.getClass(RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY, null,
+ ResourceCalculatorPlugin.class);
+ this.resourceCalculatorPlugin =
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
+ LOG.info(" Using ResourceCalculatorPlugin : "
+ + this.resourceCalculatorPlugin);
+
+ long totalPhysicalMemoryOnNM = DISABLED_MEMORY_LIMIT;
+ if (this.resourceCalculatorPlugin != null) {
+ totalPhysicalMemoryOnNM =
+ this.resourceCalculatorPlugin.getPhysicalMemorySize();
+ if (totalPhysicalMemoryOnNM <= 0) {
+ LOG.warn("NodeManager's totalPmem could not be calculated. "
+ + "Setting it to " + DISABLED_MEMORY_LIMIT);
+ totalPhysicalMemoryOnNM = DISABLED_MEMORY_LIMIT;
+ }
+ }
+
+ // ///////// Virtual memory configuration //////
+ this.maxVmemAllottedForContainers =
+ conf.getLong(NMConfig.NM_VMEM_GB, NMConfig.DEFAULT_NM_VMEM_GB);
+ this.maxVmemAllottedForContainers =
+ this.maxVmemAllottedForContainers * 1024 * 1024 * 1024L; //Normalize
+
+ if (this.maxVmemAllottedForContainers > totalPhysicalMemoryOnNM) {
+ LOG.info("totalMemoryAllottedForContainers > totalPhysicalMemoryOnNM."
+ + " Thrashing might happen.");
+ }
+
+ // ///////// Physical memory configuration //////
+ long reservedPmemOnNM =
+ conf.getLong(NM_RESERVED_PHYSICALMEMORY_MB, DISABLED_MEMORY_LIMIT);
+ reservedPmemOnNM =
+ reservedPmemOnNM == DISABLED_MEMORY_LIMIT
+ ? DISABLED_MEMORY_LIMIT
+ : reservedPmemOnNM * 1024 * 1024; // normalize to bytes
+
+ if (reservedPmemOnNM == DISABLED_MEMORY_LIMIT
+ || totalPhysicalMemoryOnNM == DISABLED_MEMORY_LIMIT) {
+ this.maxPmemAllottedForContainers = DISABLED_MEMORY_LIMIT;
+ } else {
+ this.maxPmemAllottedForContainers =
+ totalPhysicalMemoryOnNM - reservedPmemOnNM;
+ }
+
+ super.init(conf);
+ }
+
+ /**
+ * Is the total physical memory check enabled?
+ *
+ * @return true if total physical memory check is enabled.
+ */
+ boolean doCheckPhysicalMemory() {
+ return !(this.maxPmemAllottedForContainers == DISABLED_MEMORY_LIMIT);
+ }
+
+ /**
+ * Is the total virtual memory check enabled?
+ *
+ * @return true if total virtual memory check is enabled.
+ */
+ boolean doCheckVirtualMemory() {
+ return !(this.maxVmemAllottedForContainers == DISABLED_MEMORY_LIMIT);
+ }
+
+ @Override
+ public synchronized void start() {
+ this.monitoringThread.start();
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ this.monitoringThread.interrupt();
+ try {
+ this.monitoringThread.join();
+ } catch (InterruptedException e) {
+ ;
+ }
+ super.stop();
+ }
+
+ private static class ProcessTreeInfo {
+ private ContainerId containerId;
+ private String pid;
+ private ProcfsBasedProcessTree pTree;
+ private long vmemLimit;
+ private long pmemLimit;
+
+ public ProcessTreeInfo(ContainerId containerId, String pid,
+ ProcfsBasedProcessTree pTree, long vmemLimit, long pmemLimit) {
+ this.containerId = containerId;
+ this.pid = pid;
+ this.pTree = pTree;
+ this.vmemLimit = vmemLimit;
+ this.pmemLimit = pmemLimit;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ public String getPID() {
+ return this.pid;
+ }
+
+ public void setPid(String pid) {
+ this.pid = pid;
+ }
+
+ public ProcfsBasedProcessTree getProcessTree() {
+ return this.pTree;
+ }
+
+ public void setProcessTree(ProcfsBasedProcessTree pTree) {
+ this.pTree = pTree;
+ }
+
+ public long getVmemLimit() {
+ return this.vmemLimit;
+ }
+
+ /**
+ * @return Physical memory limit for the process tree in bytes
+ */
+ public long getPmemLimit() {
+ return this.pmemLimit;
+ }
+ }
+
+
+ /**
+ * Check whether a container's process tree's current memory usage is over
+ * limit.
+ *
+ * When a java process exec's a program, it could momentarily account for
+ * double the size of it's memory, because the JVM does a fork()+exec()
+ * which at fork time creates a copy of the parent's memory. If the
+ * monitoring thread detects the memory used by the container tree at the
+ * same instance, it could assume it is over limit and kill the tree, for no
+ * fault of the process itself.
+ *
+ * We counter this problem by employing a heuristic check: - if a process
+ * tree exceeds the memory limit by more than twice, it is killed
+ * immediately - if a process tree has processes older than the monitoring
+ * interval exceeding the memory limit by even 1 time, it is killed. Else it
+ * is given the benefit of doubt to lie around for one more iteration.
+ *
+ * @param containerId
+ * Container Id for the container tree
+ * @param currentMemUsage
+ * Memory usage of a container tree
+ * @param curMemUsageOfAgedProcesses
+ * Memory usage of processes older than an iteration in a container
+ * tree
+ * @param vmemLimit
+ * The limit specified for the container
+ * @return true if the memory usage is more than twice the specified limit,
+ * or if processes in the tree, older than this thread's monitoring
+ * interval, exceed the memory limit. False, otherwise.
+ */
+ boolean isProcessTreeOverLimit(String containerId,
+ long currentMemUsage,
+ long curMemUsageOfAgedProcesses,
+ long vmemLimit) {
+ boolean isOverLimit = false;
+
+ if (currentMemUsage > (2 * vmemLimit)) {
+ LOG.warn("Process tree for container: " + containerId
+ + " running over twice " + "the configured limit. Limit=" + vmemLimit
+ + ", current usage = " + currentMemUsage);
+ isOverLimit = true;
+ } else if (curMemUsageOfAgedProcesses > vmemLimit) {
+ LOG.warn("Process tree for container: " + containerId
+ + " has processes older than 1 "
+ + "iteration running over the configured limit. Limit=" + vmemLimit
+ + ", current usage = " + curMemUsageOfAgedProcesses);
+ isOverLimit = true;
+ }
+
+ return isOverLimit;
+ }
+
+ // method provided just for easy testing purposes
+ boolean isProcessTreeOverLimit(ProcfsBasedProcessTree pTree,
+ String containerId, long limit) {
+ long currentMemUsage = pTree.getCumulativeVmem();
+ // as processes begin with an age 1, we want to see if there are processes
+ // more than 1 iteration old.
+ long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
+ return isProcessTreeOverLimit(containerId, currentMemUsage,
+ curMemUsageOfAgedProcesses, limit);
+ }
+
+ private class MonitoringThread extends Thread {
+ public MonitoringThread() {
+
+ }
+
+ @Override
+ public void run() {
+
+ if (!(doCheckPhysicalMemory() || doCheckVirtualMemory())) {
+ LOG.info("Neither virutal-memory nor physical-memory monitoring is " +
+ "needed. Not running the monitor-thread");
+ return;
+ }
+
+ while (true) {
+
+ // Print the processTrees for debugging.
+ if (LOG.isDebugEnabled()) {
+ StringBuffer tmp = new StringBuffer("[ ");
+ for (ProcessTreeInfo p : trackingContainers.values()) {
+ tmp.append(p.getPID());
+ tmp.append(" ");
+ }
+ LOG.debug("Current ProcessTree list : "
+ + tmp.substring(0, tmp.length()) + "]");
+ }
+
+ // Add new containers
+ synchronized (containersToBeAdded) {
+ for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded
+ .entrySet()) {
+ ContainerId containerId = entry.getKey();
+ ProcessTreeInfo processTreeInfo = entry.getValue();
+ LOG.info("Starting resource-monitoring for " + containerId);
+ trackingContainers.put(containerId, processTreeInfo);
+ }
+ containersToBeAdded.clear();
+ }
+
+ // Remove finished containers
+ synchronized (containersToBeRemoved) {
+ for (ContainerId containerId : containersToBeRemoved) {
+ trackingContainers.remove(containerId);
+ LOG.info("Stopping resource-monitoring for " + containerId);
+ }
+ containersToBeRemoved.clear();
+ }
+
+ // Now do the monitoring for the trackingContainers
+ // Check memory usage and kill any overflowing containers
+ long vmemStillInUsage = 0;
+ long pmemStillInUsage = 0;
+ for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it =
+ trackingContainers.entrySet().iterator(); it.hasNext();) {
+
+ Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next();
+ ContainerId containerId = entry.getKey();
+ ProcessTreeInfo ptInfo = entry.getValue();
+ try {
+ String pId = ptInfo.getPID();
+
+ // Initialize any uninitialized processTrees
+ if (pId == null) {
+ // get pid from ContainerId
+ pId = containerExecutor.getProcessId(ptInfo.getContainerId());
+ if (pId != null) {
+ // pId will be null, either if the container is not spawned yet
+ // or if the container's pid is removed from ContainerExecutor
+ LOG.debug("Tracking ProcessTree " + pId
+ + " for the first time");
+
+ ProcfsBasedProcessTree pt =
+ new ProcfsBasedProcessTree(pId,
+ ContainerExecutor.isSetsidAvailable);
+ ptInfo.setPid(pId);
+ ptInfo.setProcessTree(pt);
+ }
+ }
+ // End of initializing any uninitialized processTrees
+
+ if (pId == null) {
+ continue; // processTree cannot be tracked
+ }
+
+ LOG.debug("Constructing ProcessTree for : PID = " + pId
+ + " ContainerId = " + containerId);
+ ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
+ pTree = pTree.getProcessTree(); // get the updated process-tree
+ ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
+ // updated state
+ long currentVmemUsage = pTree.getCumulativeVmem();
+ long currentPmemUsage = pTree.getCumulativeRssmem();
+ // as processes begin with an age 1, we want to see if there
+ // are processes more than 1 iteration old.
+ long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
+ long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1);
+ long vmemLimit = ptInfo.getVmemLimit();
+ long pmemLimit = ptInfo.getPmemLimit();
+ LOG.info(String.format(MEMORY_USAGE_STRING, pId,
+ containerId.toString(), currentVmemUsage, vmemLimit,
+ currentPmemUsage, pmemLimit));
+
+ boolean isMemoryOverLimit = false;
+ String msg = "";
+ if (doCheckVirtualMemory()
+ && isProcessTreeOverLimit(containerId.toString(),
+ currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
+ // Container (the root process) is still alive and overflowing
+ // memory.
+ // Dump the process-tree and then clean it up.
+ msg =
+ "Container [pid="
+ + pId
+ + ",containerID="
+ + containerId
+ + "] is running beyond memory-limits. Current usage : "
+ + currentVmemUsage
+ + "bytes. Limit : "
+ + vmemLimit
+ + "bytes. Killing container. "
+ + "\nDump of the process-tree for " + containerId
+ + " : \n" + pTree.getProcessTreeDump();
+ isMemoryOverLimit = true;
+ } else if (doCheckPhysicalMemory()
+ && isProcessTreeOverLimit(containerId.toString(),
+ currentPmemUsage, curRssMemUsageOfAgedProcesses,
+ pmemLimit)) {
+ // Container (the root process) is still alive and overflowing
+ // memory.
+ // Dump the process-tree and then clean it up.
+ msg =
+ "Container [pid="
+ + pId
+ + ",tipID="
+ + containerId
+ + "] is running beyond physical memory-limits."
+ + " Current usage : "
+ + currentPmemUsage
+ + "bytes. Limit : "
+ + pmemLimit
+ + "bytes. Killing container. \nDump of the process-tree for "
+ + containerId + " : \n" + pTree.getProcessTreeDump();
+ isMemoryOverLimit = true;
+ }
+
+ if (isMemoryOverLimit) {
+ // Virtual or physical memory over limit. Fail the container and
+ // remove
+ // the corresponding process tree
+ LOG.warn(msg);
+ // warn if not a leader
+ if (!pTree.checkPidPgrpidForMatch()) {
+ LOG.error("Killed container process with PID " + pId
+ + " but it is not a process group leader.");
+ }
+ eventDispatcher.getEventHandler().handle(
+ new ContainerDiagnosticsUpdateEvent(containerId, msg));
+ // kill the container
+ eventDispatcher.getEventHandler().handle(
+ new ContainerEvent(containerId,
+ ContainerEventType.KILL_CONTAINER));
+ it.remove();
+ LOG.info("Removed ProcessTree with root " + pId);
+ } else {
+ // Accounting the total memory in usage for all containers that
+ // are still
+ // alive and within limits.
+ vmemStillInUsage += currentVmemUsage;
+ pmemStillInUsage += currentPmemUsage;
+ }
+ } catch (Exception e) {
+ // Log the exception and proceed to the next container.
+ LOG.warn("Uncaught exception in ContainerMemoryManager "
+ + "while managing memory of " + containerId, e);
+ }
+ }
+
+ try {
+ Thread.sleep(monitoringInterval);
+ } catch (InterruptedException e) {
+ LOG.warn(ContainersMonitorImpl.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void handle(ContainersMonitorEvent monitoringEvent) {
+
+ if (!(doCheckPhysicalMemory() || doCheckVirtualMemory())) {
+ return;
+ }
+
+ ContainerId containerId = monitoringEvent.getContainerId();
+ switch (monitoringEvent.getType()) {
+ case START_MONITORING_CONTAINER:
+ ContainerStartMonitoringEvent startEvent =
+ (ContainerStartMonitoringEvent) monitoringEvent;
+ synchronized (this.containersToBeAdded) {
+ ProcessTreeInfo processTreeInfo =
+ new ProcessTreeInfo(containerId, null, null,
+ startEvent.getVmemLimit(), startEvent.getPmemLimit());
+ this.containersToBeAdded.put(containerId, processTreeInfo);
+ }
+ break;
+ case STOP_MONITORING_CONTAINER:
+ synchronized (this.containersToBeRemoved) {
+ this.containersToBeRemoved.add(containerId);
+ }
+ break;
+ default:
+ // TODO: Wrong event.
+ }
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Tue Apr 12 07:56:07 2011
@@ -124,14 +124,21 @@ public class DummyContainerManager exten
}
public static void waitForContainerState(ContainerManager containerManager,
- ContainerId containerID, ContainerState finalState)
+ ContainerId containerID, ContainerState finalState)
+ throws InterruptedException, YarnRemoteException {
+ waitForContainerState(containerManager, containerID, finalState, 20);
+ }
+
+ public static void waitForContainerState(ContainerManager containerManager,
+ ContainerId containerID, ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnRemoteException {
GetContainerStatusRequest request = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
request.setContainerId(containerID);
ContainerStatus containerStatus =
containerManager.getContainerStatus(request).getStatus();
int timeoutSecs = 0;
- while (!containerStatus.getState().equals(finalState) && timeoutSecs++ < 20) {
+ while (!containerStatus.getState().equals(finalState)
+ && timeoutSecs++ < timeOutMax) {
Thread.sleep(1000);
LOG.info("Waiting for container to get into state " + finalState
+ ". Current state is " + containerStatus.getState());
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Tue Apr 12 07:56:07 2011
@@ -18,10 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -44,12 +51,28 @@ public class TestEventFlow {
private static final Log LOG = LogFactory.getLog(TestEventFlow.class);
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private static File localDir = new File("target",
+ TestEventFlow.class.getName() + "-localDir").getAbsoluteFile();
+ private static File logDir = new File("target",
+ TestEventFlow.class.getName() + "-logDir").getAbsoluteFile();
+
@Test
public void testSuccessfulContainerLaunch() throws InterruptedException,
- YarnRemoteException {
+ IOException {
+
+ FileContext localFS = FileContext.getLocalFSFileContext();
+
+ localFS.delete(new Path(localDir.getAbsolutePath()), true);
+ localFS.delete(new Path(logDir.getAbsolutePath()), true);
+ localDir.mkdir();
+ logDir.mkdir();
+
Context context = new NMContext();
YarnConfiguration conf = new YarnConfiguration();
+ conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
+ conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
+
ContainerExecutor exec = new DefaultContainerExecutor();
DeletionService del = new DeletionService(exec);
Dispatcher dispatcher = new AsyncDispatcher();
@@ -70,7 +93,7 @@ public class TestEventFlow {
DummyContainerManager containerManager =
new DummyContainerManager(context, exec, del, nodeStatusUpdater);
- containerManager.init(new Configuration());
+ containerManager.init(conf);
containerManager.start();
ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1091316&r1=1091315&r2=1091316&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Tue Apr 12 07:56:07 2011
@@ -205,7 +205,7 @@ public class TestNodeStatusUpdater {
};
YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(NMConfig.NM_RESOURCE, 5); // 5GB
+ conf.setInt(NMConfig.NM_VMEM_GB, 5); // 5GB
conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345");
conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346");
conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath());
|