hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [3/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A simple wrapper for increasing the visibility.
+ */
+public class WrappedJvmID extends JVMId {
+
+  public WrappedJvmID(JobID jobID, boolean mapTask, int nextInt) {
+    super(jobID, mapTask, nextInt);
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,320 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import static java.util.concurrent.TimeUnit.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+
+import org.apache.hadoop.mapred.YarnChild;
+
+import org.apache.log4j.LogManager;
+
+/**
+ * The main() for MapReduce task processes.
+ */
+class YarnChild {
+
+  public static final Log LOG = LogFactory.getLog(YarnChild.class);
+
+  static volatile TaskAttemptID taskid = null;
+
+  public static void main(String[] args) throws Throwable {
+    LOG.debug("Child starting");
+
+    final JobConf defaultConf = new JobConf();
+    defaultConf.addResource(YARNApplicationConstants.JOB_CONF_FILE);
+    UserGroupInformation.setConfiguration(defaultConf);
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final InetSocketAddress address = new InetSocketAddress(host, port);
+    final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+    final String logLocation = args[3];
+    int jvmIdInt = Integer.parseInt(args[4]);
+    JVMId jvmId = new JVMId(firstTaskid.getJobID(),
+        firstTaskid.getTaskType() == TaskType.MAP, jvmIdInt);
+
+    // initialize metrics
+    DefaultMetricsSystem.initialize(
+        StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
+
+    if (null == System.getenv().get(Constants.HADOOP_WORK_DIR)) {
+      throw new IOException("Environment variable " +
+          Constants.HADOOP_WORK_DIR + " is not set");
+    }
+    Token<JobTokenIdentifier> jt = loadCredentials(defaultConf, address);
+
+    // Create TaskUmbilicalProtocol as actual task owner.
+    UserGroupInformation taskOwner =
+      UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+    taskOwner.addToken(jt);
+    final TaskUmbilicalProtocol umbilical =
+      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+      @Override
+      public TaskUmbilicalProtocol run() throws Exception {
+        return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+            TaskUmbilicalProtocol.versionID, address, defaultConf);
+      }
+    });
+
+    // report non-pid to application master
+    JvmContext context = new JvmContext(jvmId, "-1000");
+    Task task = null;
+    UserGroupInformation childUGI = null;
+
+    try {
+      int idleLoopCount = 0;
+      JvmTask myTask = null;;
+      // poll for new task
+      for (int idle = 0; null == myTask; ++idle) {
+        SECONDS.sleep(Math.min(idle * 500, 1500));
+        myTask = umbilical.getTask(context);
+      }
+      if (myTask.shouldDie()) {
+        return;
+      }
+
+      task = myTask.getTask();
+      YarnChild.taskid = task.getTaskID();
+
+      // Create the job-conf and set credentials
+      final JobConf job =
+        configureTask(task, defaultConf.getCredentials(), jt, logLocation);
+
+      //create the index file so that the log files
+      //are viewable immediately
+      TaskLog.syncLogs(logLocation, taskid, false);
+
+      // Initiate Java VM metrics
+      JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId());
+      LOG.debug("Remote user: " + job.get("user.name"));
+      childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
+      // Add tokens to new user so that it may execute its task correctly.
+      for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
+        childUGI.addToken(token);
+      }
+
+      // Create a final reference to the task for the doAs block
+      final Task taskFinal = task;
+      childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          try {
+            // use job-specified working directory
+            FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+            taskFinal.run(job, umbilical);             // run the task
+          } finally {
+            TaskLog.syncLogs(logLocation, taskid, false);
+          }
+          return null;
+        }
+      });
+    } catch (FSError e) {
+      LOG.fatal("FSError from child", e);
+      umbilical.fsError(taskid, e.getMessage());
+    } catch (Exception exception) {
+      LOG.warn("Exception running child : "
+          + StringUtils.stringifyException(exception));
+      try {
+        if (task != null) {
+          // do cleanup for the task
+          if (childUGI == null) { // no need to job into doAs block
+            task.taskCleanup(umbilical);
+          } else {
+            final Task taskFinal = task;
+            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                taskFinal.taskCleanup(umbilical);
+                return null;
+              }
+            });
+          }
+        }
+      } catch (Exception e) {
+        LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
+      }
+      // Report back any failures, for diagnostic purposes
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      exception.printStackTrace(new PrintStream(baos));
+      if (taskid != null) {
+        umbilical.reportDiagnosticInfo(taskid, baos.toString());
+      }
+    } catch (Throwable throwable) {
+      LOG.fatal("Error running child : "
+    	        + StringUtils.stringifyException(throwable));
+      if (taskid != null) {
+        Throwable tCause = throwable.getCause();
+        String cause = tCause == null
+                                 ? throwable.getMessage()
+                                 : StringUtils.stringifyException(tCause);
+        umbilical.fatalError(taskid, cause);
+      }
+    } finally {
+      RPC.stopProxy(umbilical);
+      DefaultMetricsSystem.shutdown();
+      // Shutting down log4j of the child-vm...
+      // This assumes that on return from Task.run()
+      // there is no more logging done.
+      LogManager.shutdown();
+    }
+  }
+
+  static Token<JobTokenIdentifier> loadCredentials(JobConf conf,
+      InetSocketAddress address) throws IOException {
+    //load token cache storage
+    String jobTokenFile = new Path("appTokens").makeQualified(FileSystem.getLocal(conf)).toUri().getPath();
+    Credentials credentials =
+      TokenCache.loadTokens(jobTokenFile, conf);
+    LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
+        "; from file=" + jobTokenFile);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
+    jt.setService(new Text(address.getAddress().getHostAddress() + ":"
+        + address.getPort()));
+    UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    current.addToken(jt);
+    for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) {
+      current.addToken(tok);
+    }
+    // Set the credentials
+    conf.setCredentials(credentials);
+    return jt;
+  }
+
+  static void startTaskLogSync(final String logLocation) {
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      public void run() {
+        try {
+          if (taskid != null) {
+            TaskLog.syncLogs(logLocation, taskid, false);
+          }
+        } catch (Throwable throwable) { }
+      }
+    });
+    Thread t = new Thread() {
+      public void run() {
+        //every so often wake up and syncLogs so that we can track
+        //logs of the currently running task
+        while (true) {
+          try {
+            Thread.sleep(5000);
+            if (taskid != null) {
+              TaskLog.syncLogs(logLocation, taskid, false);
+            }
+          } catch (InterruptedException ie) {
+          } catch (Throwable iee) {
+            LOG.error("Error in syncLogs: " + iee);
+            System.exit(-1);
+          }
+        }
+      }
+    };
+    t.setName("syncLogs");
+    t.setDaemon(true);
+    t.start();
+  }
+
+  static void configureLocalDirs(Task task, JobConf job) {
+    String[] localSysDirs = StringUtils.getTrimmedStrings(
+        System.getenv(YARNApplicationConstants.LOCAL_DIR_ENV));
+    job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+    LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
+  }
+
+  static JobConf configureTask(Task task, Credentials credentials,
+      Token<JobTokenIdentifier> jt, String logLocation) throws IOException {
+    final JobConf job = new JobConf(YARNApplicationConstants.JOB_CONF_FILE);
+    job.setCredentials(credentials);
+    // set tcp nodelay
+    job.setBoolean("ipc.client.tcpnodelay", true);
+    job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
+        YarnOutputFiles.class, MapOutputFile.class);
+    // set the jobTokenFile into task
+    task.setJobTokenSecret(
+        JobTokenSecretManager.createSecretKey(jt.getPassword()));
+
+    // setup the child's MRConfig.LOCAL_DIR.
+    configureLocalDirs(task, job);
+
+    // setup the child's attempt directories
+    // Do the task-type specific localization
+    task.localizeConfiguration(job);
+    //write the localized task jobconf
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+    Path localTaskFile =
+      lDirAlloc.getLocalPathForWrite(Constants.JOBFILE, job);
+    writeLocalJobFile(localTaskFile, job);
+    task.setJobFile(localTaskFile.toString());
+    task.setConf(job);
+    return job;
+  }
+
+  private static final FsPermission urw_gr =
+    FsPermission.createImmutable((short) 0640);
+
+  /**
+   * Write the task specific job-configuration file.
+   * @throws IOException
+   */
+  public static void writeLocalJobFile(Path jobFile, JobConf conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(jobFile);
+    OutputStream out = null;
+    try {
+      out = FileSystem.create(localFs, jobFile, urw_gr);
+      conf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,243 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+  private JobConf conf;
+
+  //static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
+  //static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
+  //static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+  static final String JOB_OUTPUT_DIR = "output";
+  static final String TMP_DIR = "%s/tmp";
+
+  public YarnOutputFiles() {
+  }
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+  private Path getAttemptOutputDir() {
+    return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+  }
+  
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput = 
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    // TODO
+    Path outputDir = new Path(existing.getParent().getParent().getParent(),
+        JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    // TODO
+    Path outputDir = new Path(existing.getParent().getParent().getParent(),
+        JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(TMP_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)) +
+          "/spill" + spillNumber + ".out", conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(TMP_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)) +
+          "/spill" + spillNumber + ".out", size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(TMP_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)) +
+          "/spill" + spillNumber + ".out.index", conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(TMP_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)) +
+          "/spill" + spillNumber + ".out.index", size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        REDUCE_INPUT_FILE_FORMAT_STRING,
+        getAttemptOutputDir().toString(), mapId.getId()),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+
+public class JobHistoryEvent extends AbstractEvent<EventType>{
+
+  private final JobID jobID;
+  private final HistoryEvent historyEvent;
+
+  public JobHistoryEvent(JobID jobID, HistoryEvent historyEvent) {
+    super(historyEvent.getEventType());
+    this.jobID = jobID;
+    this.historyEvent = historyEvent;
+  }
+
+  public JobID getJobID() {
+    return jobID;
+  }
+
+  public HistoryEvent getHistoryEvent() {
+    return historyEvent;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,309 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+
+/**
+ * The job history events get routed to this class. This class writes the 
+ * Job history events to the local file and moves the local file to HDFS on 
+ * job completion.
+ * JobHistory implementation is in this package to access package private 
+ * classes.
+ */
+public class JobHistoryEventHandler 
+    implements EventHandler<JobHistoryEvent> {
+
+  private FileContext logDirFc; // log Dir FileContext
+  private FileContext doneDirFc; // done Dir FileContext
+  private Configuration conf;
+
+  private Path logDir = null;
+  private Path done = null; // folder for completed jobs
+
+  private static final Log LOG = LogFactory.getLog(
+      JobHistoryEventHandler.class);
+
+  private EventWriter eventWriter = null;
+
+  private static final Map<JobID, MetaInfo> fileMap =
+    Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>());
+
+  static final FsPermission HISTORY_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0750); // rwxr-x---
+  
+  public static final FsPermission HISTORY_FILE_PERMISSION =
+    FsPermission.createImmutable((short) 0740); // rwxr-----
+
+  public JobHistoryEventHandler(Configuration conf) {
+    this.conf = conf;
+/*
+    String localDir = conf.get("yarn.server.nodemanager.jobhistory",
+        "file:///" +
+        new File(System.getProperty("yarn.log.dir")).getAbsolutePath() +
+        File.separator + "history");
+*/
+    String localDir = conf.get("yarn.server.nodemanager.jobhistory.localdir",
+      "file:///tmp/yarn");
+    logDir = new Path(localDir);
+    String  doneLocation =
+      conf.get("yarn.server.nodemanager.jobhistory",
+      "file:///tmp/yarn/done");
+    if (doneLocation != null) {
+      try {
+        done = FileContext.getFileContext(conf).makeQualified(new Path(doneLocation));
+        doneDirFc = FileContext.getFileContext(done.toUri(), conf);
+        if (!doneDirFc.util().exists(done))
+          doneDirFc.mkdir(done,
+            new FsPermission(HISTORY_DIR_PERMISSION), true);
+        } catch (IOException e) {
+          LOG.info("error creating done directory on dfs " + e);
+          throw new YarnException(e);
+      }
+    }
+    try {
+      logDirFc = FileContext.getFileContext(logDir.toUri(), conf);
+      if (!logDirFc.util().exists(logDir)) {
+        logDirFc.mkdir(logDir, new FsPermission(HISTORY_DIR_PERMISSION), true);
+      }
+    } catch (IOException ioe) {
+      LOG.info("Mkdirs failed to create " +
+          logDir.toString());
+      throw new YarnException(ioe);
+    }
+  }
+
+  /**
+   * Create an event writer for the Job represented by the jobID.
+   * This should be the first call to history for a job
+   * @param jobId
+   * @throws IOException
+   */
+  protected void setupEventWriter(JobID jobId)
+  throws IOException {
+    if (logDir == null) {
+      throw new IOException("Missing Log Directory for History");
+    }
+
+    MetaInfo oldFi = fileMap.get(jobId);
+
+    long submitTime = (oldFi == null ? System.currentTimeMillis() : oldFi.submitTime);
+
+    Path logFile = getJobHistoryFile(logDir, jobId);
+    // String user = conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
+    String user = conf.get(MRJobConfig.USER_NAME);
+    if (user == null) {
+      throw new IOException("User is null while setting up jobhistory eventwriter" );
+    }
+    String jobName = TypeConverter.fromYarn(jobId).toString();
+    EventWriter writer = (oldFi == null) ? null : oldFi.writer;
+ 
+    if (writer == null) {
+      try {
+        FSDataOutputStream out = logDirFc.create(logFile, EnumSet
+            .of(CreateFlag.OVERWRITE));
+        writer = new EventWriter(out);
+      } catch (IOException ioe) {
+        LOG.info("Could not create log file for job " + jobName);
+        throw ioe;
+      }
+    }
+    this.eventWriter = writer;
+    /*TODO Storing the job conf on the log dir if required*/
+    MetaInfo fi = new MetaInfo(logFile, writer, submitTime, user, jobName);
+    fileMap.put(jobId, fi);
+  }
+
+  /** Close the event writer for this id 
+   * @throws IOException */
+  public void closeWriter(JobID id) throws IOException {
+    try {
+      final MetaInfo mi = fileMap.get(id);
+      if (mi != null) {
+        mi.closeWriter();
+      }
+    } catch (IOException e) {
+      LOG.info("Error closing writer for JobID: " + id);
+      throw e;
+    }
+  }
+
+  public synchronized void handle(JobHistoryEvent event) {
+    // check for first event from a job
+    if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
+      try {
+        setupEventWriter(event.getJobID());
+      } catch (IOException ioe) {
+        LOG.error("Error JobHistoryEventHandler in handle " + ioe);
+        throw new YarnException(ioe);
+      }
+    }
+    MetaInfo mi = fileMap.get(event.getJobID());
+    EventWriter writer = fileMap.get(event.getJobID()).writer;
+    try {
+      HistoryEvent historyEvent = event.getHistoryEvent();
+      mi.writeEvent(historyEvent);
+    } catch (IOException e) {
+      LOG.error("in handler ioException " + e);
+      throw new YarnException(e);
+    }
+    // check for done
+    if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
+      JobFinishedEvent jfe = (JobFinishedEvent) event.getHistoryEvent();
+      String statusstoredir = done + "/status/" + mi.user + "/" + mi.jobName;
+      try {
+        writeStatus(statusstoredir, jfe);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        throw new YarnException(e);
+      }
+      try {
+        closeEventWriter(event.getJobID());
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+    }
+  }
+
+  protected void closeEventWriter(JobID jobId) throws IOException {
+    final MetaInfo mi = fileMap.get(jobId);
+    try {
+      Path fromLocalFile = mi.getHistoryFile();
+      // Path toPath = new Path(done, mi.jobName);
+      String jobhistorydir = done + "/" + mi.user + "/";
+      Path jobhistorydirpath =
+    	  logDirFc.makeQualified(new Path(jobhistorydir));
+      logDirFc.mkdir(jobhistorydirpath,
+         new FsPermission(HISTORY_DIR_PERMISSION), true);
+      // Path localFile = new Path(fromLocalFile);
+      Path localQualifiedFile =
+    	  logDirFc.makeQualified(fromLocalFile);
+      Path jobHistoryFile =
+    	  logDirFc.makeQualified(new Path(jobhistorydirpath, mi.jobName));
+      if (mi != null) {
+        mi.closeWriter();
+      }
+      moveToDoneNow(localQualifiedFile, jobHistoryFile);
+      logDirFc.delete(localQualifiedFile, true);
+    } catch (IOException e) {
+      LOG.info("Error closing writer for JobID: " + jobId);
+      throw e;
+    }
+  }
+
+  private static class MetaInfo {
+    private Path historyFile;
+    private EventWriter writer;
+    long submitTime;
+    String user;
+    String jobName;
+
+    MetaInfo(Path historyFile, EventWriter writer, long submitTime,
+             String user, String jobName) {
+      this.historyFile = historyFile;
+      this.writer = writer;
+      this.submitTime = submitTime;
+      this.user = user;
+      this.jobName = jobName;
+    }
+
+    Path getHistoryFile() { return historyFile; }
+
+    synchronized void closeWriter() throws IOException {
+      if (writer != null) {
+        writer.close();
+      }
+      writer = null;
+    }
+
+    synchronized void writeEvent(HistoryEvent event) throws IOException {
+      if (writer != null) {
+        writer.write(event);
+      }
+    }
+  }
+
+  /**
+   * Get the job history file path
+   */
+  public static Path getJobHistoryFile(Path dir, JobID jobId) {
+    return new Path(dir, TypeConverter.fromYarn(jobId).toString());
+  }
+
+/*
+ * 
+ */
+  private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
+    //check if path exists, in case of retries it may not exist
+    if (logDirFc.util().exists(fromPath)) {
+      LOG.info("Moving " + fromPath.toString() + " to " +
+          toPath.toString());
+      //TODO temporarily removing the existing dst
+      if (logDirFc.util().exists(toPath)) {
+        logDirFc.delete(toPath, true);
+      }
+      boolean copied = logDirFc.util().copy(fromPath, toPath);
+      if (copied)
+          LOG.info("Copied to done location: "+ toPath);
+      else 
+          LOG.info("copy failed");
+      doneDirFc.setPermission(toPath,
+          new FsPermission(HISTORY_FILE_PERMISSION));
+    }
+  }  
+
+  private void writeStatus(String statusstoredir, HistoryEvent event) throws IOException {
+    try {
+      Path statusstorepath = doneDirFc.makeQualified(new Path(statusstoredir));
+      doneDirFc.mkdir(statusstorepath,
+         new FsPermission(HISTORY_DIR_PERMISSION), true);
+      Path toPath = new Path(statusstoredir, "jobstats");
+      FSDataOutputStream out = doneDirFc.create(toPath, EnumSet
+           .of(CreateFlag.OVERWRITE));
+      EventWriter writer = new EventWriter(out);
+      writer.write(event);
+      writer.close();
+      out.close();
+    } catch (IOException ioe) {
+        LOG.error("Status file write failed" +ioe);
+        throw ioe;
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+
+/**
+ * Context interface for sharing information across components in YARN App.
+ */
+public interface AppContext {
+
+  ApplicationID getApplicationID();
+
+  CharSequence getUser();
+
+  Job getJob(JobID jobID);
+
+  Map<JobID, Job> getAllJobs();
+
+  EventHandler getEventHandler();
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/Clock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/Clock.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/Clock.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/Clock.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+/**
+ * A clock class - can be mocked out for testing.
+ */
+public class Clock {
+  long previous = Long.MIN_VALUE;
+
+  long getMeasuredTime() {
+    return System.currentTimeMillis();
+  }
+
+  public long getTime() {
+    // Make the result monotonic even if the time is glitched back due
+    //  to some sort of leap second
+    previous = Math.max(previous, getMeasuredTime());
+    return previous;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,453 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+
+/**
+ * The Map-Reduce Application Master.
+ * The state machine is encapsulated in the implementation of Job interface.
+ * All state changes happens via Job interface. Each event 
+ * results in a Finite State Transition in Job.
+ * 
+ * MR AppMaster is the composition of loosely coupled services. The services 
+ * interact with each other via events. The components resembles the 
+ * Actors model. The component acts on received event and send out the 
+ * events to other components.
+ * This keeps it highly concurrent with no or minimal synchronization needs.
+ * 
+ * The events are dispatched by a central Dispatch mechanism. All components
+ * register to the Dispatcher.
+ * 
+ * The information is shared across different components using AppContext.
+ */
+
+public class MRAppMaster extends CompositeService {
+
+  private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
+
+  private final Clock clock;
+
+  private ApplicationID appID;
+  private AppContext context;
+  private Dispatcher dispatcher;
+  private ClientService clientService;
+  private ContainerAllocator containerAllocator;
+  private ContainerLauncher containerLauncher;
+  private TaskCleaner taskCleaner;
+  private Speculator speculator;
+  private TaskAttemptListener taskAttemptListener;
+  private JobTokenSecretManager jobTokenSecretManager =
+      new JobTokenSecretManager();
+
+  public MRAppMaster(ApplicationID applicationId) {
+    this(applicationId, null);
+  }
+
+  public MRAppMaster(ApplicationID applicationId, Clock clock) {
+    super(MRAppMaster.class.getName());
+    if (clock == null) {
+      clock = new Clock();
+    }
+    this.clock = clock;
+    this.appID = applicationId;
+    LOG.info("Created MRAppMaster for application " + applicationId);
+  }
+
+  @Override
+  public void init(final Configuration conf) {
+    context = new RunningAppContext(); 
+
+    dispatcher = new AsyncDispatcher();
+    addIfService(dispatcher);
+
+    //service to handle requests to TaskUmbilicalProtocol
+    taskAttemptListener = createTaskAttemptListener(context);
+    addIfService(taskAttemptListener);
+
+    //service to do the task cleanup
+    taskCleaner = createTaskCleaner(context);
+    addIfService(taskCleaner);
+    //service to launch allocated containers via NodeManager
+    containerLauncher = createContainerLauncher(context);
+    addIfService(containerLauncher);
+    
+    //service to handle requests from JobClient
+    clientService = createClientService(context);
+    addIfService(clientService);
+
+    //service to allocate containers from RM
+    containerAllocator = createContainerAllocator(clientService, context);
+    addIfService(containerAllocator);
+
+    //register the event dispatchers
+    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+    dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+    dispatcher.register(JobEventType.class, new JobEventDispatcher());
+    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+    dispatcher.register(TaskAttemptEventType.class, 
+        new TaskAttemptEventDispatcher());
+    dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+    
+    if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+        || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+      //optional service to speculate on task attempts' progress
+      speculator = createSpeculator(conf, context);
+      addIfService(speculator);
+      dispatcher.register(Speculator.EventType.class, speculator);
+    } else {
+      dispatcher.register
+          (Speculator.EventType.class, new NullSpeculatorEventHandler());
+    }
+
+    super.init(conf);
+  }
+
+  static class NullSpeculatorEventHandler
+      implements EventHandler<SpeculatorEvent> {
+    @Override
+    public void handle(SpeculatorEvent event) {
+      // no code
+    }
+  }
+
+  protected void addIfService(Object object) {
+    if (object instanceof Service) {
+      addService((Service) object);
+    }
+  }
+
+  protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+      Configuration conf) {
+    return new EventHandler<JobHistoryEvent>() {
+      @Override
+      public void handle(JobHistoryEvent event) {
+      }
+    };
+    //TODO use the real job history handler.
+    //return new JobHistoryEventHandler(conf);
+  }
+
+  protected Speculator createSpeculator(Configuration conf, AppContext context) {
+    Class<? extends Speculator> speculatorClass;
+
+    try {
+      speculatorClass
+          // "yarn.mapreduce.job.speculator.class"
+          = conf.getClass(YarnMRJobConfig.SPECULATOR_CLASS,
+                          DefaultSpeculator.class,
+                          Speculator.class);
+      Constructor<? extends Speculator> speculatorConstructor
+          = speculatorClass.getConstructor
+               (Configuration.class, AppContext.class);
+      Speculator result = speculatorConstructor.newInstance(conf, context);
+
+      return result;
+    } catch (InstantiationException ex) {
+      LOG.error("Can't make a speculator -- check "
+          + YarnMRJobConfig.SPECULATOR_CLASS + " " + ex);
+      throw new YarnException(ex);
+    } catch (IllegalAccessException ex) {
+      LOG.error("Can't make a speculator -- check "
+          + YarnMRJobConfig.SPECULATOR_CLASS + " " + ex);
+      throw new YarnException(ex);
+    } catch (InvocationTargetException ex) {
+      LOG.error("Can't make a speculator -- check "
+          + YarnMRJobConfig.SPECULATOR_CLASS + " " + ex);
+      throw new YarnException(ex);
+    } catch (NoSuchMethodException ex) {
+      LOG.error("Can't make a speculator -- check "
+          + YarnMRJobConfig.SPECULATOR_CLASS + " " + ex);
+      throw new YarnException(ex);
+    }
+  }
+
+  protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+    TaskAttemptListener lis =
+        new TaskAttemptListenerImpl(context, jobTokenSecretManager);
+    return lis;
+  }
+
+  protected TaskCleaner createTaskCleaner(AppContext context) {
+    return new TaskCleanerImpl(context);
+  }
+
+  protected ContainerLauncher createContainerLauncher(AppContext context) {
+    return new ContainerLauncherImpl(context);
+  }
+  
+  protected ContainerAllocator createContainerAllocator(ClientService
+      clientService, AppContext context) {
+    //return new StaticContainerAllocator(context);
+    return new RMContainerAllocator(clientService, context);
+  }
+
+  //TODO:should have an interface for MRClientService
+  protected ClientService createClientService(AppContext context) {
+    return new MRClientService(context);
+  }
+
+  public ApplicationID getAppID() {
+    return appID;
+  }
+
+  public AppContext getContext() {
+    return context;
+  }
+
+  public Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  //Returns null if speculation is not enabled
+  public Speculator getSpeculator() {
+    return speculator;
+  }
+
+  public ContainerAllocator getContainerAllocator() {
+    return containerAllocator;
+  }
+  
+  public ContainerLauncher getContainerLauncher() {
+    return containerLauncher;
+  }
+
+  public TaskAttemptListener getTaskAttemptListener() {
+    return taskAttemptListener;
+  }
+
+  class RunningAppContext implements AppContext {
+
+    private Map<JobID, Job> jobs = new ConcurrentHashMap<JobID, Job>();
+   
+    @Override
+    public ApplicationID getApplicationID() {
+      return appID;
+    }
+
+    @Override
+    public Job getJob(JobID jobID) {
+      return jobs.get(jobID);
+    }
+
+    @Override
+    public Map<JobID, Job> getAllJobs() {
+      return jobs;
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    @Override
+    public CharSequence getUser() {
+      return getConfig().get(MRJobConfig.USER_NAME);
+    }
+
+  }
+
+  @Override
+  public void start() {
+    startJobs();
+    //start all the components
+    super.start();
+  }
+
+  /**
+   * This can be overridden to instantiate multiple jobs and create a 
+   * workflow.
+   */
+  protected void startJobs() {
+
+    Configuration config = getConfig();
+
+    Credentials fsTokens = new Credentials();
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      // Read the file-system tokens from the localized tokens-file.
+      try {
+        Path jobSubmitDir =
+            FileContext.getLocalFSFileContext().makeQualified(
+                new Path(new File(YARNApplicationConstants.JOB_SUBMIT_DIR)
+                    .getAbsolutePath()));
+        Path jobTokenFile =
+            new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE);
+        fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, config));
+        LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+            + jobTokenFile);
+
+        UserGroupInformation currentUser =
+            UserGroupInformation.getCurrentUser();
+        for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
+          LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
+              + "in current ugi in the AppMaster for service "
+              + tk.getService());
+          currentUser.addToken(tk); // For use by AppMaster itself.
+        }
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+    }
+
+    //create single job
+    Job job =
+        new JobImpl(appID, config, dispatcher.getEventHandler(),
+            taskAttemptListener, jobTokenSecretManager, fsTokens);
+    ((RunningAppContext) context).jobs.put(job.getID(), job);
+
+    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        createJobHistoryHandler(config));
+    dispatcher.register(JobFinishEvent.Type.class,
+        new EventHandler<JobFinishEvent>() {
+          @Override
+          public void handle(JobFinishEvent event) {
+            // job has finished
+            // this is the only job, so shutdown the Appmaster
+            // note in a workflow scenario, this may lead to creation of a new
+            // job
+
+            // TODO:currently just wait for sometime so clients can know the
+            // final states. Will be removed once RM come on.
+            try {
+              Thread.sleep(15000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            LOG.info("Calling stop for all the services");
+            try {
+              stop();
+            } catch (Throwable t) {
+              LOG.warn("Graceful stop failed ", t);
+            }
+            //TODO: this is required because rpc server does not shutdown
+            //inspite of calling server.stop().
+            //Bring the process down by force.
+            //Not needed after HADOOP-7140
+            LOG.info("Exiting MR AppMaster..GoodBye!");
+            System.exit(0);
+          }
+        });
+
+    /** create a job event for job intialization **/
+    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+    /** send init on the job. this triggers the job execution.**/
+    dispatcher.getEventHandler().handle(initJobEvent);
+  }
+
+  private class JobEventDispatcher implements EventHandler<JobEvent> {
+    @Override
+    public void handle(JobEvent event) {
+      ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
+    }
+  }  
+  private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+    @Override
+    public void handle(TaskEvent event) {
+      Task task = context.getJob(event.getTaskID().jobID).getTask(
+          event.getTaskID());
+      ((EventHandler<TaskEvent>)task).handle(event);
+    }
+  }
+
+  private class TaskAttemptEventDispatcher 
+          implements EventHandler<TaskAttemptEvent> {
+    @Override
+    public void handle(TaskAttemptEvent event) {
+      Job job = context.getJob(event.getTaskAttemptID().taskID.jobID);
+      Task task = job.getTask(event.getTaskAttemptID().taskID);
+      TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
+      ((EventHandler<TaskAttemptEvent>) attempt).handle(event);
+    }
+  }
+
+  public static void main(String[] args) {
+    try {
+      //Configuration.addDefaultResource("job.xml");
+      ApplicationID applicationId = new ApplicationID();
+      applicationId.clusterTimeStamp = Long.valueOf(args[0]);
+      applicationId.id = Integer.valueOf(args[1]);
+      MRAppMaster appMaster = new MRAppMaster(applicationId);
+      YarnConfiguration conf = new YarnConfiguration(new JobConf());
+      conf.addResource(new Path(YARNApplicationConstants.JOB_CONF_FILE));
+      conf.set(MRJobConfig.USER_NAME, 
+          System.getProperty("user.name")); 
+      UserGroupInformation.setConfiguration(conf);
+      appMaster.init(conf);
+      appMaster.start();
+    } catch (Throwable t) {
+      LOG.error("Caught throwable. Exiting:", t);
+      System.exit(1);
+    }
+  } 
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMasterConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMasterConstants.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMasterConstants.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMasterConstants.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+public interface MRAppMasterConstants {
+
+  public static final String CONTAINERLAUNCHER_THREADPOOL_SIZE =
+      "yarn.mapreduce.containerlauncher.threadpool-size";
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,35 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public interface TaskAttemptListener {
+
+  InetSocketAddress getAddress();
+
+  void register(TaskAttemptID attemptID, Task task, WrappedJvmID jvmID);
+
+  void unregister(TaskAttemptID attemptID, WrappedJvmID jvmID);
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,133 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+/**
+ * This class keeps track of tasks that have already been launched. It
+ * determines if a task is alive and running or marks a task as dead if it does
+ * not hear from it for a long time.
+ * 
+ */
+public class TaskHeartbeatHandler extends AbstractService {
+
+  private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
+
+  //thread which runs periodically to see the last time since a heartbeat is
+  //received from a task.
+  private Thread lostTaskCheckerThread;
+  private volatile boolean stopped;
+  private int taskTimeOut = 5*60*1000;//5 mins
+
+  private EventHandler eventHandler;
+
+  private Map<TaskAttemptID, Long> runningAttempts 
+    = new HashMap<TaskAttemptID, Long>();
+
+  public TaskHeartbeatHandler(EventHandler eventHandler) {
+    super("TaskHeartbeatHandler");
+    this.eventHandler = eventHandler;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+   super.init(conf);
+   taskTimeOut = conf.getInt("mapreduce.task.timeout", 5*60*1000);
+  }
+
+  @Override
+  public void start() {
+    lostTaskCheckerThread = new Thread(new PingChecker());
+    lostTaskCheckerThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    lostTaskCheckerThread.interrupt();
+    super.stop();
+  }
+
+  public synchronized void receivedPing(TaskAttemptID attemptID) {
+    //only put for the registered attempts
+    if (runningAttempts.containsKey(attemptID)) {
+      runningAttempts.put(attemptID, System.currentTimeMillis());
+    }
+  }
+
+  public synchronized void register(TaskAttemptID attemptID) {
+    runningAttempts.put(attemptID, System.currentTimeMillis());
+  }
+
+  public synchronized void unregister(TaskAttemptID attemptID) {
+    runningAttempts.remove(attemptID);
+  }
+
+  private class PingChecker implements Runnable {
+
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        synchronized (TaskHeartbeatHandler.this) {
+          Iterator<Map.Entry<TaskAttemptID, Long>> iterator = 
+            runningAttempts.entrySet().iterator();
+
+          //avoid calculating current time everytime in loop
+          long currentTime = System.currentTimeMillis();
+
+          while (iterator.hasNext()) {
+            Map.Entry<TaskAttemptID, Long> entry = iterator.next();
+            if (currentTime > entry.getValue() + taskTimeOut) {
+              //task is lost, remove from the list and raise lost event
+              iterator.remove();
+              eventHandler.handle(
+                  new TaskAttemptDiagnosticsUpdateEvent(entry.getKey(),
+                      "AttemptID:" + entry.getKey().toString() + 
+                      " Timed out after " + taskTimeOut/1000 + " secs"));
+              eventHandler.handle(new TaskAttemptEvent(entry
+                  .getKey(), TaskAttemptEventType.TA_TIMED_OUT));
+            }
+          }
+        }
+        try {
+          Thread.sleep(taskTimeOut);
+        } catch (InterruptedException e) {
+          LOG.info("TaskHeartbeatHandler thread interrupted");
+          break;
+        }
+      }
+    }
+    
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.client;
+
+import java.net.InetSocketAddress;
+
+public interface ClientService {
+
+  InetSocketAddress getBindAddress();
+
+  int getHttpPort();
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,250 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.client;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * This module is responsible for talking to the 
+ * jobclient (user facing).
+ *
+ */
+public class MRClientService extends AbstractService 
+    implements ClientService {
+
+  private static final Log LOG = LogFactory.getLog(MRClientService.class);
+  
+  private MRClientProtocol protocolHandler;
+  private Server server;
+  private WebApp webApp;
+  private InetSocketAddress bindAddress;
+  private AppContext appContext;
+  private EventHandler<Event> handler;
+
+  public MRClientService(AppContext appContext) {
+    super("MRClientService");
+    this.appContext = appContext;
+    this.protocolHandler = new MRClientProtocolHandler(appContext);
+  }
+
+  public void start() {
+    Configuration conf = new Configuration(getConfig()); // Just for not messing up sec-info class config
+    YarnRPC rpc = YarnRPC.create(conf);
+    InetSocketAddress address = NetUtils.createSocketAddr("0.0.0.0:0");
+    InetAddress hostNameResolved = null;
+    try {
+      hostNameResolved = address.getAddress().getLocalHost();
+    } catch (UnknownHostException e) {
+      throw new YarnException(e);
+    }
+
+    ClientToAMSecretManager secretManager = null;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      secretManager = new ClientToAMSecretManager();
+      String secretKeyStr =
+          System.getenv(YarnConfiguration.APPLICATION_CLIENT_SECRET_ENV_NAME);
+      byte[] bytes = Base64.decodeBase64(secretKeyStr);
+      ApplicationTokenIdentifier identifier =
+          new ApplicationTokenIdentifier(this.appContext.getApplicationID());
+      secretManager.setMasterKey(identifier, bytes);
+      conf.setClass(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
+          SchedulerSecurityInfo.class, SecurityInfo.class); // Same for now.
+    }
+    server =
+        rpc.getServer(MRClientProtocol.class, protocolHandler, address,
+            conf, secretManager);
+    server.start();
+    this.bindAddress =
+        NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
+            + ":" + server.getPort());
+    LOG.info("Instantiated MRClientService at " + this.bindAddress);
+    try {
+      webApp = WebApps.$for("yarn", AppContext.class, appContext).with(conf).
+          start(new AMWebApp());
+    } catch (Exception e) {
+      LOG.error("Webapps failed to start. Ignoring for now:", e);
+    }
+    super.start();
+  }
+
+  public void stop() {
+    server.close();
+    if (webApp != null) {
+      webApp.stop();
+    }
+    super.stop();
+  }
+
+  @Override
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  @Override
+  public int getHttpPort() {
+    return webApp.port();
+  }
+
+  class MRClientProtocolHandler implements MRClientProtocol {
+
+    private AppContext appContext;
+
+    private Job getJob(JobID jobID) throws AvroRemoteException {
+      Job job = appContext.getJob(jobID);
+      if (job == null) {
+        throw RPCUtil.getRemoteException("Unknown job " + jobID);
+      }
+      return job;
+    }
+    
+    MRClientProtocolHandler(AppContext appContext) {
+      this.appContext = appContext;
+    }
+
+    @Override
+    public Counters getCounters(JobID jobID) throws AvroRemoteException {
+      Job job = getJob(jobID);
+      return job.getCounters();
+    }
+
+    @Override
+    public JobReport getJobReport(JobID jobID) throws AvroRemoteException {
+      Job job = getJob(jobID);
+      return job.getReport();
+    }
+
+    @Override
+    public TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID)
+        throws AvroRemoteException {
+      Job job = getJob(taskAttemptID.taskID.jobID);
+      return job.getTask(taskAttemptID.taskID).
+          getAttempt(taskAttemptID).getReport();
+    }
+
+    @Override
+    public TaskReport getTaskReport(TaskID taskID) throws AvroRemoteException {
+      Job job = appContext.getJob(taskID.jobID);
+      return job.getTask(taskID).getReport();
+    }
+
+    @Override
+    public List<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(JobID jobID, 
+        int fromEventId, int maxEvents) throws AvroRemoteException {
+      Job job = appContext.getJob(jobID);
+      return Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents));
+    }
+
+    @Override
+    public Void killJob(JobID jobID) throws AvroRemoteException {
+      getJob(jobID);
+      handler.handle(
+          new JobEvent(jobID, JobEventType.JOB_KILL));
+      return null;
+    }
+
+    @Override
+    public Void killTask(TaskID taskID) throws AvroRemoteException {
+      getJob(taskID.jobID);
+      handler.handle(
+          new TaskEvent(taskID, TaskEventType.T_KILL));
+      return null;
+    }
+
+    @Override
+    public Void killTaskAttempt(TaskAttemptID taskAttemptID)
+        throws AvroRemoteException {
+      getJob(taskAttemptID.taskID.jobID);
+     handler.handle(
+          new TaskAttemptEvent(taskAttemptID, 
+              TaskAttemptEventType.TA_KILL));
+      return null;
+    }
+
+    @Override
+    public List<CharSequence> getDiagnostics(TaskAttemptID taskAttemptID)
+        throws AvroRemoteException {
+      Job job = getJob(taskAttemptID.taskID.jobID);
+      return job.getTask(taskAttemptID.taskID).
+                 getAttempt(taskAttemptID).getDiagnostics();
+    }
+
+    @Override
+    public List<TaskReport> getTaskReports(JobID jobID, TaskType taskType)
+        throws AvroRemoteException {
+      Job job = appContext.getJob(jobID);
+      List<TaskReport> reports = new ArrayList<TaskReport>();
+      Collection<Task> tasks = job.getTasks(taskType).values();
+      for (Task task : tasks) {
+        reports.add(task.getReport());
+      }
+      return reports;
+    }
+
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,53 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * Main interface to interact with the job. Provides only getters. 
+ */
+public interface Job {
+
+  JobID getID();
+  CharSequence getName();
+  JobState getState();
+  JobReport getReport();
+  Counters getCounters();
+  Map<TaskID,Task> getTasks();
+  Map<TaskID,Task> getTasks(TaskType taskType);
+  Task getTask(TaskID taskID);
+  List<String> getDiagnostics();
+  int getTotalMaps();
+  int getTotalReduces();
+  int getCompletedMaps();
+  int getCompletedReduces();
+
+  TaskAttemptCompletionEvent[]
+      getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * Read only view of Task.
+ */
+public interface Task {
+  TaskID getID();
+  TaskReport getReport();
+  TaskState getState();
+  Counters getCounters();
+  float getProgress();
+  TaskType getType();
+  Map<TaskAttemptID, TaskAttempt> getAttempts();
+  TaskAttempt getAttempt(TaskAttemptID attemptID);
+
+  /** Has Task reached the final state or not.
+   */
+  boolean isFinished();
+
+  /**
+   * Can the output of the taskAttempt be committed. Note that once the task
+   * gives a go for a commit, further canCommit requests from any other attempts
+   * should return false.
+   * 
+   * @param taskAttemptID
+   * @return whether the attempt's output can be committed or not.
+   */
+  boolean canCommit(TaskAttemptID taskAttemptID);
+
+  
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,61 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+
+/**
+ * Read only view of TaskAttempt.
+ */
+public interface TaskAttempt {
+  TaskAttemptID getID();
+  TaskAttemptReport getReport();
+  List<CharSequence> getDiagnostics();
+  Counters getCounters();
+  float getProgress();
+  TaskAttemptState getState();
+
+  /** Has attempt reached the final state or not.
+   */
+  boolean isFinished();
+
+  /**If container Assigned then return container ID, otherwise null.
+   */
+  ContainerID getAssignedContainerID();
+
+  /**If container Assigned then return container mgr address, otherwise null.
+   */
+  String getAssignedContainerMgrAddress();
+
+  /** Returns time at which container is launched. If container is not launched
+   * yet, returns 0.
+   */
+  long getLaunchTime();
+
+  /** Returns attempt's finish time. If attempt is not finished
+   *  yet, returns 0.
+   */
+  long getFinishTime();
+}



Mime
View raw message