Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A simple wrapper for increasing the visibility.
+ */
+public class WrappedJvmID extends JVMId {
+
+ public WrappedJvmID(JobID jobID, boolean mapTask, int nextInt) {
+ super(jobID, mapTask, nextInt);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,320 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import static java.util.concurrent.TimeUnit.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+
+import org.apache.hadoop.mapred.YarnChild;
+
+import org.apache.log4j.LogManager;
+
+/**
+ * The main() for MapReduce task processes.
+ */
+class YarnChild {
+
+ public static final Log LOG = LogFactory.getLog(YarnChild.class);
+
+ static volatile TaskAttemptID taskid = null;
+
+ public static void main(String[] args) throws Throwable {
+ LOG.debug("Child starting");
+
+ final JobConf defaultConf = new JobConf();
+ defaultConf.addResource(YARNApplicationConstants.JOB_CONF_FILE);
+ UserGroupInformation.setConfiguration(defaultConf);
+
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+ final InetSocketAddress address = new InetSocketAddress(host, port);
+ final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+ final String logLocation = args[3];
+ int jvmIdInt = Integer.parseInt(args[4]);
+ JVMId jvmId = new JVMId(firstTaskid.getJobID(),
+ firstTaskid.getTaskType() == TaskType.MAP, jvmIdInt);
+
+ // initialize metrics
+ DefaultMetricsSystem.initialize(
+ StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
+
+ if (null == System.getenv().get(Constants.HADOOP_WORK_DIR)) {
+ throw new IOException("Environment variable " +
+ Constants.HADOOP_WORK_DIR + " is not set");
+ }
+ Token<JobTokenIdentifier> jt = loadCredentials(defaultConf, address);
+
+ // Create TaskUmbilicalProtocol as actual task owner.
+ UserGroupInformation taskOwner =
+ UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+ taskOwner.addToken(jt);
+ final TaskUmbilicalProtocol umbilical =
+ taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+ @Override
+ public TaskUmbilicalProtocol run() throws Exception {
+ return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+ TaskUmbilicalProtocol.versionID, address, defaultConf);
+ }
+ });
+
+ // report non-pid to application master
+ JvmContext context = new JvmContext(jvmId, "-1000");
+ Task task = null;
+ UserGroupInformation childUGI = null;
+
+ try {
+ int idleLoopCount = 0;
+ JvmTask myTask = null;;
+ // poll for new task
+ for (int idle = 0; null == myTask; ++idle) {
+ SECONDS.sleep(Math.min(idle * 500, 1500));
+ myTask = umbilical.getTask(context);
+ }
+ if (myTask.shouldDie()) {
+ return;
+ }
+
+ task = myTask.getTask();
+ YarnChild.taskid = task.getTaskID();
+
+ // Create the job-conf and set credentials
+ final JobConf job =
+ configureTask(task, defaultConf.getCredentials(), jt, logLocation);
+
+ //create the index file so that the log files
+ //are viewable immediately
+ TaskLog.syncLogs(logLocation, taskid, false);
+
+ // Initiate Java VM metrics
+ JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId());
+ LOG.debug("Remote user: " + job.get("user.name"));
+ childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
+ // Add tokens to new user so that it may execute its task correctly.
+ for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
+ childUGI.addToken(token);
+ }
+
+ // Create a final reference to the task for the doAs block
+ final Task taskFinal = task;
+ childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ try {
+ // use job-specified working directory
+ FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+ taskFinal.run(job, umbilical); // run the task
+ } finally {
+ TaskLog.syncLogs(logLocation, taskid, false);
+ }
+ return null;
+ }
+ });
+ } catch (FSError e) {
+ LOG.fatal("FSError from child", e);
+ umbilical.fsError(taskid, e.getMessage());
+ } catch (Exception exception) {
+ LOG.warn("Exception running child : "
+ + StringUtils.stringifyException(exception));
+ try {
+ if (task != null) {
+ // do cleanup for the task
+ if (childUGI == null) { // no need to job into doAs block
+ task.taskCleanup(umbilical);
+ } else {
+ final Task taskFinal = task;
+ childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ taskFinal.taskCleanup(umbilical);
+ return null;
+ }
+ });
+ }
+ }
+ } catch (Exception e) {
+ LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
+ }
+ // Report back any failures, for diagnostic purposes
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ exception.printStackTrace(new PrintStream(baos));
+ if (taskid != null) {
+ umbilical.reportDiagnosticInfo(taskid, baos.toString());
+ }
+ } catch (Throwable throwable) {
+ LOG.fatal("Error running child : "
+ + StringUtils.stringifyException(throwable));
+ if (taskid != null) {
+ Throwable tCause = throwable.getCause();
+ String cause = tCause == null
+ ? throwable.getMessage()
+ : StringUtils.stringifyException(tCause);
+ umbilical.fatalError(taskid, cause);
+ }
+ } finally {
+ RPC.stopProxy(umbilical);
+ DefaultMetricsSystem.shutdown();
+ // Shutting down log4j of the child-vm...
+ // This assumes that on return from Task.run()
+ // there is no more logging done.
+ LogManager.shutdown();
+ }
+ }
+
+ static Token<JobTokenIdentifier> loadCredentials(JobConf conf,
+ InetSocketAddress address) throws IOException {
+ //load token cache storage
+ String jobTokenFile = new Path("appTokens").makeQualified(FileSystem.getLocal(conf)).toUri().getPath();
+ Credentials credentials =
+ TokenCache.loadTokens(jobTokenFile, conf);
+ LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
+ "; from file=" + jobTokenFile);
+ Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
+ jt.setService(new Text(address.getAddress().getHostAddress() + ":"
+ + address.getPort()));
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ current.addToken(jt);
+ for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) {
+ current.addToken(tok);
+ }
+ // Set the credentials
+ conf.setCredentials(credentials);
+ return jt;
+ }
+
+ static void startTaskLogSync(final String logLocation) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ if (taskid != null) {
+ TaskLog.syncLogs(logLocation, taskid, false);
+ }
+ } catch (Throwable throwable) { }
+ }
+ });
+ Thread t = new Thread() {
+ public void run() {
+ //every so often wake up and syncLogs so that we can track
+ //logs of the currently running task
+ while (true) {
+ try {
+ Thread.sleep(5000);
+ if (taskid != null) {
+ TaskLog.syncLogs(logLocation, taskid, false);
+ }
+ } catch (InterruptedException ie) {
+ } catch (Throwable iee) {
+ LOG.error("Error in syncLogs: " + iee);
+ System.exit(-1);
+ }
+ }
+ }
+ };
+ t.setName("syncLogs");
+ t.setDaemon(true);
+ t.start();
+ }
+
+ static void configureLocalDirs(Task task, JobConf job) {
+ String[] localSysDirs = StringUtils.getTrimmedStrings(
+ System.getenv(YARNApplicationConstants.LOCAL_DIR_ENV));
+ job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+ LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
+ }
+
+ static JobConf configureTask(Task task, Credentials credentials,
+ Token<JobTokenIdentifier> jt, String logLocation) throws IOException {
+ final JobConf job = new JobConf(YARNApplicationConstants.JOB_CONF_FILE);
+ job.setCredentials(credentials);
+ // set tcp nodelay
+ job.setBoolean("ipc.client.tcpnodelay", true);
+ job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
+ YarnOutputFiles.class, MapOutputFile.class);
+ // set the jobTokenFile into task
+ task.setJobTokenSecret(
+ JobTokenSecretManager.createSecretKey(jt.getPassword()));
+
+ // setup the child's MRConfig.LOCAL_DIR.
+ configureLocalDirs(task, job);
+
+ // setup the child's attempt directories
+ // Do the task-type specific localization
+ task.localizeConfiguration(job);
+ //write the localized task jobconf
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+ Path localTaskFile =
+ lDirAlloc.getLocalPathForWrite(Constants.JOBFILE, job);
+ writeLocalJobFile(localTaskFile, job);
+ task.setJobFile(localTaskFile.toString());
+ task.setConf(job);
+ return job;
+ }
+
+ private static final FsPermission urw_gr =
+ FsPermission.createImmutable((short) 0640);
+
+ /**
+ * Write the task specific job-configuration file.
+ * @throws IOException
+ */
+ public static void writeLocalJobFile(Path jobFile, JobConf conf)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(jobFile);
+ OutputStream out = null;
+ try {
+ out = FileSystem.create(localFs, jobFile, urw_gr);
+ conf.writeXml(out);
+ } finally {
+ IOUtils.cleanup(LOG, out);
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,243 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+ private JobConf conf;
+
+ //static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
+ //static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
+ //static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+ static final String JOB_OUTPUT_DIR = "output";
+ static final String TMP_DIR = "%s/tmp";
+
+ public YarnOutputFiles() {
+ }
+
+ // assume configured to $localdir/usercache/$user/appcache/$appId
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+ private Path getAttemptOutputDir() {
+ return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFile() throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFileForWrite(long size) throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+ }
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ public Path getOutputFileForWriteInVolume(Path existing) {
+ // TODO
+ Path outputDir = new Path(existing.getParent().getParent().getParent(),
+ JOB_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir,
+ conf.get(JobContext.TASK_ATTEMPT_ID));
+ return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFile() throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+ MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFileForWrite(long size) throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+ MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+ size, conf);
+ }
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ public Path getOutputIndexFileForWriteInVolume(Path existing) {
+ // TODO
+ Path outputDir = new Path(existing.getParent().getParent().getParent(),
+ JOB_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir,
+ conf.get(JobContext.TASK_ATTEMPT_ID));
+ return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING +
+ MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ }
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(TMP_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)) +
+ "/spill" + spillNumber + ".out", conf);
+ }
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(TMP_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)) +
+ "/spill" + spillNumber + ".out", size, conf);
+ }
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(TMP_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)) +
+ "/spill" + spillNumber + ".out.index", conf);
+ }
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(TMP_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)) +
+ "/spill" + spillNumber + ".out.index", size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFile(int mapId) throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+ long size) throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING,
+ getAttemptOutputDir().toString(), mapId.getId()),
+ size, conf);
+ }
+
+ /** Removes all of the files related to a task. */
+ public void removeAll() throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf) conf;
+ } else {
+ this.conf = new JobConf(conf);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+
+public class JobHistoryEvent extends AbstractEvent<EventType>{
+
+ private final JobID jobID;
+ private final HistoryEvent historyEvent;
+
+ public JobHistoryEvent(JobID jobID, HistoryEvent historyEvent) {
+ super(historyEvent.getEventType());
+ this.jobID = jobID;
+ this.historyEvent = historyEvent;
+ }
+
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ public HistoryEvent getHistoryEvent() {
+ return historyEvent;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,309 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+
+/**
+ * The job history events get routed to this class. This class writes the
+ * Job history events to the local file and moves the local file to HDFS on
+ * job completion.
+ * JobHistory implementation is in this package to access package private
+ * classes.
+ */
+public class JobHistoryEventHandler
+ implements EventHandler<JobHistoryEvent> {
+
+ private FileContext logDirFc; // log Dir FileContext
+ private FileContext doneDirFc; // done Dir FileContext
+ private Configuration conf;
+
+ private Path logDir = null;
+ private Path done = null; // folder for completed jobs
+
+ private static final Log LOG = LogFactory.getLog(
+ JobHistoryEventHandler.class);
+
+ private EventWriter eventWriter = null;
+
+ private static final Map<JobID, MetaInfo> fileMap =
+ Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>());
+
+ static final FsPermission HISTORY_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0750); // rwxr-x---
+
+ public static final FsPermission HISTORY_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0740); // rwxr-----
+
+ public JobHistoryEventHandler(Configuration conf) {
+ this.conf = conf;
+/*
+ String localDir = conf.get("yarn.server.nodemanager.jobhistory",
+ "file:///" +
+ new File(System.getProperty("yarn.log.dir")).getAbsolutePath() +
+ File.separator + "history");
+*/
+ String localDir = conf.get("yarn.server.nodemanager.jobhistory.localdir",
+ "file:///tmp/yarn");
+ logDir = new Path(localDir);
+ String doneLocation =
+ conf.get("yarn.server.nodemanager.jobhistory",
+ "file:///tmp/yarn/done");
+ if (doneLocation != null) {
+ try {
+ done = FileContext.getFileContext(conf).makeQualified(new Path(doneLocation));
+ doneDirFc = FileContext.getFileContext(done.toUri(), conf);
+ if (!doneDirFc.util().exists(done))
+ doneDirFc.mkdir(done,
+ new FsPermission(HISTORY_DIR_PERMISSION), true);
+ } catch (IOException e) {
+ LOG.info("error creating done directory on dfs " + e);
+ throw new YarnException(e);
+ }
+ }
+ try {
+ logDirFc = FileContext.getFileContext(logDir.toUri(), conf);
+ if (!logDirFc.util().exists(logDir)) {
+ logDirFc.mkdir(logDir, new FsPermission(HISTORY_DIR_PERMISSION), true);
+ }
+ } catch (IOException ioe) {
+ LOG.info("Mkdirs failed to create " +
+ logDir.toString());
+ throw new YarnException(ioe);
+ }
+ }
+
+ /**
+ * Create an event writer for the Job represented by the jobID.
+ * This should be the first call to history for a job
+ * @param jobId
+ * @throws IOException
+ */
+ protected void setupEventWriter(JobID jobId)
+ throws IOException {
+ if (logDir == null) {
+ throw new IOException("Missing Log Directory for History");
+ }
+
+ MetaInfo oldFi = fileMap.get(jobId);
+
+ long submitTime = (oldFi == null ? System.currentTimeMillis() : oldFi.submitTime);
+
+ Path logFile = getJobHistoryFile(logDir, jobId);
+ // String user = conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
+ String user = conf.get(MRJobConfig.USER_NAME);
+ if (user == null) {
+ throw new IOException("User is null while setting up jobhistory eventwriter" );
+ }
+ String jobName = TypeConverter.fromYarn(jobId).toString();
+ EventWriter writer = (oldFi == null) ? null : oldFi.writer;
+
+ if (writer == null) {
+ try {
+ FSDataOutputStream out = logDirFc.create(logFile, EnumSet
+ .of(CreateFlag.OVERWRITE));
+ writer = new EventWriter(out);
+ } catch (IOException ioe) {
+ LOG.info("Could not create log file for job " + jobName);
+ throw ioe;
+ }
+ }
+ this.eventWriter = writer;
+ /*TODO Storing the job conf on the log dir if required*/
+ MetaInfo fi = new MetaInfo(logFile, writer, submitTime, user, jobName);
+ fileMap.put(jobId, fi);
+ }
+
+ /** Close the event writer for this id
+ * @throws IOException */
+ public void closeWriter(JobID id) throws IOException {
+ try {
+ final MetaInfo mi = fileMap.get(id);
+ if (mi != null) {
+ mi.closeWriter();
+ }
+ } catch (IOException e) {
+ LOG.info("Error closing writer for JobID: " + id);
+ throw e;
+ }
+ }
+
+ public synchronized void handle(JobHistoryEvent event) {
+ // check for first event from a job
+ if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
+ try {
+ setupEventWriter(event.getJobID());
+ } catch (IOException ioe) {
+ LOG.error("Error JobHistoryEventHandler in handle " + ioe);
+ throw new YarnException(ioe);
+ }
+ }
+ MetaInfo mi = fileMap.get(event.getJobID());
+ EventWriter writer = fileMap.get(event.getJobID()).writer;
+ try {
+ HistoryEvent historyEvent = event.getHistoryEvent();
+ mi.writeEvent(historyEvent);
+ } catch (IOException e) {
+ LOG.error("in handler ioException " + e);
+ throw new YarnException(e);
+ }
+ // check for done
+ if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
+ JobFinishedEvent jfe = (JobFinishedEvent) event.getHistoryEvent();
+ String statusstoredir = done + "/status/" + mi.user + "/" + mi.jobName;
+ try {
+ writeStatus(statusstoredir, jfe);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ throw new YarnException(e);
+ }
+ try {
+ closeEventWriter(event.getJobID());
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+ }
+
+ protected void closeEventWriter(JobID jobId) throws IOException {
+ final MetaInfo mi = fileMap.get(jobId);
+ try {
+ Path fromLocalFile = mi.getHistoryFile();
+ // Path toPath = new Path(done, mi.jobName);
+ String jobhistorydir = done + "/" + mi.user + "/";
+ Path jobhistorydirpath =
+ logDirFc.makeQualified(new Path(jobhistorydir));
+ logDirFc.mkdir(jobhistorydirpath,
+ new FsPermission(HISTORY_DIR_PERMISSION), true);
+ // Path localFile = new Path(fromLocalFile);
+ Path localQualifiedFile =
+ logDirFc.makeQualified(fromLocalFile);
+ Path jobHistoryFile =
+ logDirFc.makeQualified(new Path(jobhistorydirpath, mi.jobName));
+ if (mi != null) {
+ mi.closeWriter();
+ }
+ moveToDoneNow(localQualifiedFile, jobHistoryFile);
+ logDirFc.delete(localQualifiedFile, true);
+ } catch (IOException e) {
+ LOG.info("Error closing writer for JobID: " + jobId);
+ throw e;
+ }
+ }
+
+ private static class MetaInfo {
+ private Path historyFile;
+ private EventWriter writer;
+ long submitTime;
+ String user;
+ String jobName;
+
+ MetaInfo(Path historyFile, EventWriter writer, long submitTime,
+ String user, String jobName) {
+ this.historyFile = historyFile;
+ this.writer = writer;
+ this.submitTime = submitTime;
+ this.user = user;
+ this.jobName = jobName;
+ }
+
+ Path getHistoryFile() { return historyFile; }
+
+ synchronized void closeWriter() throws IOException {
+ if (writer != null) {
+ writer.close();
+ }
+ writer = null;
+ }
+
+ synchronized void writeEvent(HistoryEvent event) throws IOException {
+ if (writer != null) {
+ writer.write(event);
+ }
+ }
+ }
+
+ /**
+ * Get the job history file path
+ */
+ public static Path getJobHistoryFile(Path dir, JobID jobId) {
+ return new Path(dir, TypeConverter.fromYarn(jobId).toString());
+ }
+
+/*
+ *
+ */
+ private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
+ //check if path exists, in case of retries it may not exist
+ if (logDirFc.util().exists(fromPath)) {
+ LOG.info("Moving " + fromPath.toString() + " to " +
+ toPath.toString());
+ //TODO temporarily removing the existing dst
+ if (logDirFc.util().exists(toPath)) {
+ logDirFc.delete(toPath, true);
+ }
+ boolean copied = logDirFc.util().copy(fromPath, toPath);
+ if (copied)
+ LOG.info("Copied to done location: "+ toPath);
+ else
+ LOG.info("copy failed");
+ doneDirFc.setPermission(toPath,
+ new FsPermission(HISTORY_FILE_PERMISSION));
+ }
+ }
+
+ private void writeStatus(String statusstoredir, HistoryEvent event) throws IOException {
+ try {
+ Path statusstorepath = doneDirFc.makeQualified(new Path(statusstoredir));
+ doneDirFc.mkdir(statusstorepath,
+ new FsPermission(HISTORY_DIR_PERMISSION), true);
+ Path toPath = new Path(statusstoredir, "jobstats");
+ FSDataOutputStream out = doneDirFc.create(toPath, EnumSet
+ .of(CreateFlag.OVERWRITE));
+ EventWriter writer = new EventWriter(out);
+ writer.write(event);
+ writer.close();
+ out.close();
+ } catch (IOException ioe) {
+ LOG.error("Status file write failed" +ioe);
+ throw ioe;
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+
+/**
+ * Context interface for sharing information across components in YARN App.
+ */
+public interface AppContext {
+
+ ApplicationID getApplicationID();
+
+ CharSequence getUser();
+
+ Job getJob(JobID jobID);
+
+ Map<JobID, Job> getAllJobs();
+
+ EventHandler getEventHandler();
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/Clock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/Clock.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/Clock.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/Clock.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+/**
+ * A clock class - can be mocked out for testing.
+ */
+public class Clock {
+ long previous = Long.MIN_VALUE;
+
+ long getMeasuredTime() {
+ return System.currentTimeMillis();
+ }
+
+ public long getTime() {
+ // Make the result monotonic even if the time is glitched back due
+ // to some sort of leap second
+ previous = Math.max(previous, getMeasuredTime());
+ return previous;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,453 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+
+/**
+ * The Map-Reduce Application Master.
+ * The state machine is encapsulated in the implementation of Job interface.
+ * All state changes happens via Job interface. Each event
+ * results in a Finite State Transition in Job.
+ *
+ * MR AppMaster is the composition of loosely coupled services. The services
+ * interact with each other via events. The components resembles the
+ * Actors model. The component acts on received event and send out the
+ * events to other components.
+ * This keeps it highly concurrent with no or minimal synchronization needs.
+ *
+ * The events are dispatched by a central Dispatch mechanism. All components
+ * register to the Dispatcher.
+ *
+ * The information is shared across different components using AppContext.
+ */
+
+public class MRAppMaster extends CompositeService {
+
+ private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
+
+ private final Clock clock;
+
+ private ApplicationID appID;
+ private AppContext context;
+ private Dispatcher dispatcher;
+ private ClientService clientService;
+ private ContainerAllocator containerAllocator;
+ private ContainerLauncher containerLauncher;
+ private TaskCleaner taskCleaner;
+ private Speculator speculator;
+ private TaskAttemptListener taskAttemptListener;
+ private JobTokenSecretManager jobTokenSecretManager =
+ new JobTokenSecretManager();
+
+ public MRAppMaster(ApplicationID applicationId) {
+ this(applicationId, null);
+ }
+
+ public MRAppMaster(ApplicationID applicationId, Clock clock) {
+ super(MRAppMaster.class.getName());
+ if (clock == null) {
+ clock = new Clock();
+ }
+ this.clock = clock;
+ this.appID = applicationId;
+ LOG.info("Created MRAppMaster for application " + applicationId);
+ }
+
+ @Override
+ public void init(final Configuration conf) {
+ context = new RunningAppContext();
+
+ dispatcher = new AsyncDispatcher();
+ addIfService(dispatcher);
+
+ //service to handle requests to TaskUmbilicalProtocol
+ taskAttemptListener = createTaskAttemptListener(context);
+ addIfService(taskAttemptListener);
+
+ //service to do the task cleanup
+ taskCleaner = createTaskCleaner(context);
+ addIfService(taskCleaner);
+ //service to launch allocated containers via NodeManager
+ containerLauncher = createContainerLauncher(context);
+ addIfService(containerLauncher);
+
+ //service to handle requests from JobClient
+ clientService = createClientService(context);
+ addIfService(clientService);
+
+ //service to allocate containers from RM
+ containerAllocator = createContainerAllocator(clientService, context);
+ addIfService(containerAllocator);
+
+ //register the event dispatchers
+ dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+ dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+ dispatcher.register(JobEventType.class, new JobEventDispatcher());
+ dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+ dispatcher.register(TaskAttemptEventType.class,
+ new TaskAttemptEventDispatcher());
+ dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+
+ if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+ || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+ //optional service to speculate on task attempts' progress
+ speculator = createSpeculator(conf, context);
+ addIfService(speculator);
+ dispatcher.register(Speculator.EventType.class, speculator);
+ } else {
+ dispatcher.register
+ (Speculator.EventType.class, new NullSpeculatorEventHandler());
+ }
+
+ super.init(conf);
+ }
+
+ static class NullSpeculatorEventHandler
+ implements EventHandler<SpeculatorEvent> {
+ @Override
+ public void handle(SpeculatorEvent event) {
+ // no code
+ }
+ }
+
+ protected void addIfService(Object object) {
+ if (object instanceof Service) {
+ addService((Service) object);
+ }
+ }
+
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ Configuration conf) {
+ return new EventHandler<JobHistoryEvent>() {
+ @Override
+ public void handle(JobHistoryEvent event) {
+ }
+ };
+ //TODO use the real job history handler.
+ //return new JobHistoryEventHandler(conf);
+ }
+
+ protected Speculator createSpeculator(Configuration conf, AppContext context) {
+ Class<? extends Speculator> speculatorClass;
+
+ try {
+ speculatorClass
+ // "yarn.mapreduce.job.speculator.class"
+ = conf.getClass(YarnMRJobConfig.SPECULATOR_CLASS,
+ DefaultSpeculator.class,
+ Speculator.class);
+ Constructor<? extends Speculator> speculatorConstructor
+ = speculatorClass.getConstructor
+ (Configuration.class, AppContext.class);
+ Speculator result = speculatorConstructor.newInstance(conf, context);
+
+ return result;
+ } catch (InstantiationException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + YarnMRJobConfig.SPECULATOR_CLASS + " " + ex);
+ throw new YarnException(ex);
+ } catch (IllegalAccessException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + YarnMRJobConfig.SPECULATOR_CLASS + " " + ex);
+ throw new YarnException(ex);
+ } catch (InvocationTargetException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + YarnMRJobConfig.SPECULATOR_CLASS + " " + ex);
+ throw new YarnException(ex);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + YarnMRJobConfig.SPECULATOR_CLASS + " " + ex);
+ throw new YarnException(ex);
+ }
+ }
+
+ protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+ TaskAttemptListener lis =
+ new TaskAttemptListenerImpl(context, jobTokenSecretManager);
+ return lis;
+ }
+
+ protected TaskCleaner createTaskCleaner(AppContext context) {
+ return new TaskCleanerImpl(context);
+ }
+
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
+ return new ContainerLauncherImpl(context);
+ }
+
+ protected ContainerAllocator createContainerAllocator(ClientService
+ clientService, AppContext context) {
+ //return new StaticContainerAllocator(context);
+ return new RMContainerAllocator(clientService, context);
+ }
+
+ //TODO:should have an interface for MRClientService
+ protected ClientService createClientService(AppContext context) {
+ return new MRClientService(context);
+ }
+
+ public ApplicationID getAppID() {
+ return appID;
+ }
+
+ public AppContext getContext() {
+ return context;
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ //Returns null if speculation is not enabled
+ public Speculator getSpeculator() {
+ return speculator;
+ }
+
+ public ContainerAllocator getContainerAllocator() {
+ return containerAllocator;
+ }
+
+ public ContainerLauncher getContainerLauncher() {
+ return containerLauncher;
+ }
+
+ public TaskAttemptListener getTaskAttemptListener() {
+ return taskAttemptListener;
+ }
+
+ class RunningAppContext implements AppContext {
+
+ private Map<JobID, Job> jobs = new ConcurrentHashMap<JobID, Job>();
+
+ @Override
+ public ApplicationID getApplicationID() {
+ return appID;
+ }
+
+ @Override
+ public Job getJob(JobID jobID) {
+ return jobs.get(jobID);
+ }
+
+ @Override
+ public Map<JobID, Job> getAllJobs() {
+ return jobs;
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ @Override
+ public CharSequence getUser() {
+ return getConfig().get(MRJobConfig.USER_NAME);
+ }
+
+ }
+
+ @Override
+ public void start() {
+ startJobs();
+ //start all the components
+ super.start();
+ }
+
+ /**
+ * This can be overridden to instantiate multiple jobs and create a
+ * workflow.
+ */
+ protected void startJobs() {
+
+ Configuration config = getConfig();
+
+ Credentials fsTokens = new Credentials();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // Read the file-system tokens from the localized tokens-file.
+ try {
+ Path jobSubmitDir =
+ FileContext.getLocalFSFileContext().makeQualified(
+ new Path(new File(YARNApplicationConstants.JOB_SUBMIT_DIR)
+ .getAbsolutePath()));
+ Path jobTokenFile =
+ new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE);
+ fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, config));
+ LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ + jobTokenFile);
+
+ UserGroupInformation currentUser =
+ UserGroupInformation.getCurrentUser();
+ for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
+ LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
+ + "in current ugi in the AppMaster for service "
+ + tk.getService());
+ currentUser.addToken(tk); // For use by AppMaster itself.
+ }
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ //create single job
+ Job job =
+ new JobImpl(appID, config, dispatcher.getEventHandler(),
+ taskAttemptListener, jobTokenSecretManager, fsTokens);
+ ((RunningAppContext) context).jobs.put(job.getID(), job);
+
+ dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ createJobHistoryHandler(config));
+ dispatcher.register(JobFinishEvent.Type.class,
+ new EventHandler<JobFinishEvent>() {
+ @Override
+ public void handle(JobFinishEvent event) {
+ // job has finished
+ // this is the only job, so shutdown the Appmaster
+ // note in a workflow scenario, this may lead to creation of a new
+ // job
+
+ // TODO:currently just wait for sometime so clients can know the
+ // final states. Will be removed once RM come on.
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ LOG.info("Calling stop for all the services");
+ try {
+ stop();
+ } catch (Throwable t) {
+ LOG.warn("Graceful stop failed ", t);
+ }
+ //TODO: this is required because rpc server does not shutdown
+ //inspite of calling server.stop().
+ //Bring the process down by force.
+ //Not needed after HADOOP-7140
+ LOG.info("Exiting MR AppMaster..GoodBye!");
+ System.exit(0);
+ }
+ });
+
+ /** create a job event for job intialization **/
+ JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+ /** send init on the job. this triggers the job execution.**/
+ dispatcher.getEventHandler().handle(initJobEvent);
+ }
+
+ private class JobEventDispatcher implements EventHandler<JobEvent> {
+ @Override
+ public void handle(JobEvent event) {
+ ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
+ }
+ }
+ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+ @Override
+ public void handle(TaskEvent event) {
+ Task task = context.getJob(event.getTaskID().jobID).getTask(
+ event.getTaskID());
+ ((EventHandler<TaskEvent>)task).handle(event);
+ }
+ }
+
+ private class TaskAttemptEventDispatcher
+ implements EventHandler<TaskAttemptEvent> {
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ Job job = context.getJob(event.getTaskAttemptID().taskID.jobID);
+ Task task = job.getTask(event.getTaskAttemptID().taskID);
+ TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
+ ((EventHandler<TaskAttemptEvent>) attempt).handle(event);
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ //Configuration.addDefaultResource("job.xml");
+ ApplicationID applicationId = new ApplicationID();
+ applicationId.clusterTimeStamp = Long.valueOf(args[0]);
+ applicationId.id = Integer.valueOf(args[1]);
+ MRAppMaster appMaster = new MRAppMaster(applicationId);
+ YarnConfiguration conf = new YarnConfiguration(new JobConf());
+ conf.addResource(new Path(YARNApplicationConstants.JOB_CONF_FILE));
+ conf.set(MRJobConfig.USER_NAME,
+ System.getProperty("user.name"));
+ UserGroupInformation.setConfiguration(conf);
+ appMaster.init(conf);
+ appMaster.start();
+ } catch (Throwable t) {
+ LOG.error("Caught throwable. Exiting:", t);
+ System.exit(1);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMasterConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMasterConstants.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMasterConstants.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMasterConstants.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+public interface MRAppMasterConstants {
+
+ public static final String CONTAINERLAUNCHER_THREADPOOL_SIZE =
+ "yarn.mapreduce.containerlauncher.threadpool-size";
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,35 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public interface TaskAttemptListener {
+
+ InetSocketAddress getAddress();
+
+ void register(TaskAttemptID attemptID, Task task, WrappedJvmID jvmID);
+
+ void unregister(TaskAttemptID attemptID, WrappedJvmID jvmID);
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,133 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+/**
+ * This class keeps track of tasks that have already been launched. It
+ * determines if a task is alive and running or marks a task as dead if it does
+ * not hear from it for a long time.
+ *
+ */
+public class TaskHeartbeatHandler extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
+
+ //thread which runs periodically to see the last time since a heartbeat is
+ //received from a task.
+ private Thread lostTaskCheckerThread;
+ private volatile boolean stopped;
+ private int taskTimeOut = 5*60*1000;//5 mins
+
+ private EventHandler eventHandler;
+
+ private Map<TaskAttemptID, Long> runningAttempts
+ = new HashMap<TaskAttemptID, Long>();
+
+ public TaskHeartbeatHandler(EventHandler eventHandler) {
+ super("TaskHeartbeatHandler");
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ taskTimeOut = conf.getInt("mapreduce.task.timeout", 5*60*1000);
+ }
+
+ @Override
+ public void start() {
+ lostTaskCheckerThread = new Thread(new PingChecker());
+ lostTaskCheckerThread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ lostTaskCheckerThread.interrupt();
+ super.stop();
+ }
+
+ public synchronized void receivedPing(TaskAttemptID attemptID) {
+ //only put for the registered attempts
+ if (runningAttempts.containsKey(attemptID)) {
+ runningAttempts.put(attemptID, System.currentTimeMillis());
+ }
+ }
+
+ public synchronized void register(TaskAttemptID attemptID) {
+ runningAttempts.put(attemptID, System.currentTimeMillis());
+ }
+
+ public synchronized void unregister(TaskAttemptID attemptID) {
+ runningAttempts.remove(attemptID);
+ }
+
+ private class PingChecker implements Runnable {
+
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ synchronized (TaskHeartbeatHandler.this) {
+ Iterator<Map.Entry<TaskAttemptID, Long>> iterator =
+ runningAttempts.entrySet().iterator();
+
+ //avoid calculating current time everytime in loop
+ long currentTime = System.currentTimeMillis();
+
+ while (iterator.hasNext()) {
+ Map.Entry<TaskAttemptID, Long> entry = iterator.next();
+ if (currentTime > entry.getValue() + taskTimeOut) {
+ //task is lost, remove from the list and raise lost event
+ iterator.remove();
+ eventHandler.handle(
+ new TaskAttemptDiagnosticsUpdateEvent(entry.getKey(),
+ "AttemptID:" + entry.getKey().toString() +
+ " Timed out after " + taskTimeOut/1000 + " secs"));
+ eventHandler.handle(new TaskAttemptEvent(entry
+ .getKey(), TaskAttemptEventType.TA_TIMED_OUT));
+ }
+ }
+ }
+ try {
+ Thread.sleep(taskTimeOut);
+ } catch (InterruptedException e) {
+ LOG.info("TaskHeartbeatHandler thread interrupted");
+ break;
+ }
+ }
+ }
+
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ClientService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.client;
+
+import java.net.InetSocketAddress;
+
+public interface ClientService {
+
+ InetSocketAddress getBindAddress();
+
+ int getHttpPort();
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,250 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.client;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * This module is responsible for talking to the
+ * jobclient (user facing).
+ *
+ */
+public class MRClientService extends AbstractService
+ implements ClientService {
+
+ private static final Log LOG = LogFactory.getLog(MRClientService.class);
+
+ private MRClientProtocol protocolHandler;
+ private Server server;
+ private WebApp webApp;
+ private InetSocketAddress bindAddress;
+ private AppContext appContext;
+ private EventHandler<Event> handler;
+
+ public MRClientService(AppContext appContext) {
+ super("MRClientService");
+ this.appContext = appContext;
+ this.protocolHandler = new MRClientProtocolHandler(appContext);
+ }
+
+ public void start() {
+ Configuration conf = new Configuration(getConfig()); // Just for not messing up sec-info class config
+ YarnRPC rpc = YarnRPC.create(conf);
+ InetSocketAddress address = NetUtils.createSocketAddr("0.0.0.0:0");
+ InetAddress hostNameResolved = null;
+ try {
+ hostNameResolved = address.getAddress().getLocalHost();
+ } catch (UnknownHostException e) {
+ throw new YarnException(e);
+ }
+
+ ClientToAMSecretManager secretManager = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ secretManager = new ClientToAMSecretManager();
+ String secretKeyStr =
+ System.getenv(YarnConfiguration.APPLICATION_CLIENT_SECRET_ENV_NAME);
+ byte[] bytes = Base64.decodeBase64(secretKeyStr);
+ ApplicationTokenIdentifier identifier =
+ new ApplicationTokenIdentifier(this.appContext.getApplicationID());
+ secretManager.setMasterKey(identifier, bytes);
+ conf.setClass(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
+ SchedulerSecurityInfo.class, SecurityInfo.class); // Same for now.
+ }
+ server =
+ rpc.getServer(MRClientProtocol.class, protocolHandler, address,
+ conf, secretManager);
+ server.start();
+ this.bindAddress =
+ NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
+ + ":" + server.getPort());
+ LOG.info("Instantiated MRClientService at " + this.bindAddress);
+ try {
+ webApp = WebApps.$for("yarn", AppContext.class, appContext).with(conf).
+ start(new AMWebApp());
+ } catch (Exception e) {
+ LOG.error("Webapps failed to start. Ignoring for now:", e);
+ }
+ super.start();
+ }
+
+ public void stop() {
+ server.close();
+ if (webApp != null) {
+ webApp.stop();
+ }
+ super.stop();
+ }
+
+ @Override
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ @Override
+ public int getHttpPort() {
+ return webApp.port();
+ }
+
+ class MRClientProtocolHandler implements MRClientProtocol {
+
+ private AppContext appContext;
+
+ private Job getJob(JobID jobID) throws AvroRemoteException {
+ Job job = appContext.getJob(jobID);
+ if (job == null) {
+ throw RPCUtil.getRemoteException("Unknown job " + jobID);
+ }
+ return job;
+ }
+
+ MRClientProtocolHandler(AppContext appContext) {
+ this.appContext = appContext;
+ }
+
+ @Override
+ public Counters getCounters(JobID jobID) throws AvroRemoteException {
+ Job job = getJob(jobID);
+ return job.getCounters();
+ }
+
+ @Override
+ public JobReport getJobReport(JobID jobID) throws AvroRemoteException {
+ Job job = getJob(jobID);
+ return job.getReport();
+ }
+
+ @Override
+ public TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID)
+ throws AvroRemoteException {
+ Job job = getJob(taskAttemptID.taskID.jobID);
+ return job.getTask(taskAttemptID.taskID).
+ getAttempt(taskAttemptID).getReport();
+ }
+
+ @Override
+ public TaskReport getTaskReport(TaskID taskID) throws AvroRemoteException {
+ Job job = appContext.getJob(taskID.jobID);
+ return job.getTask(taskID).getReport();
+ }
+
+ @Override
+ public List<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(JobID jobID,
+ int fromEventId, int maxEvents) throws AvroRemoteException {
+ Job job = appContext.getJob(jobID);
+ return Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents));
+ }
+
+ @Override
+ public Void killJob(JobID jobID) throws AvroRemoteException {
+ getJob(jobID);
+ handler.handle(
+ new JobEvent(jobID, JobEventType.JOB_KILL));
+ return null;
+ }
+
+ @Override
+ public Void killTask(TaskID taskID) throws AvroRemoteException {
+ getJob(taskID.jobID);
+ handler.handle(
+ new TaskEvent(taskID, TaskEventType.T_KILL));
+ return null;
+ }
+
+ @Override
+ public Void killTaskAttempt(TaskAttemptID taskAttemptID)
+ throws AvroRemoteException {
+ getJob(taskAttemptID.taskID.jobID);
+ handler.handle(
+ new TaskAttemptEvent(taskAttemptID,
+ TaskAttemptEventType.TA_KILL));
+ return null;
+ }
+
+ @Override
+ public List<CharSequence> getDiagnostics(TaskAttemptID taskAttemptID)
+ throws AvroRemoteException {
+ Job job = getJob(taskAttemptID.taskID.jobID);
+ return job.getTask(taskAttemptID.taskID).
+ getAttempt(taskAttemptID).getDiagnostics();
+ }
+
+ @Override
+ public List<TaskReport> getTaskReports(JobID jobID, TaskType taskType)
+ throws AvroRemoteException {
+ Job job = appContext.getJob(jobID);
+ List<TaskReport> reports = new ArrayList<TaskReport>();
+ Collection<Task> tasks = job.getTasks(taskType).values();
+ for (Task task : tasks) {
+ reports.add(task.getReport());
+ }
+ return reports;
+ }
+
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,53 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * Main interface to interact with the job. Provides only getters.
+ */
+public interface Job {
+
+ JobID getID();
+ CharSequence getName();
+ JobState getState();
+ JobReport getReport();
+ Counters getCounters();
+ Map<TaskID,Task> getTasks();
+ Map<TaskID,Task> getTasks(TaskType taskType);
+ Task getTask(TaskID taskID);
+ List<String> getDiagnostics();
+ int getTotalMaps();
+ int getTotalReduces();
+ int getCompletedMaps();
+ int getCompletedReduces();
+
+ TaskAttemptCompletionEvent[]
+ getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * Read only view of Task.
+ */
+public interface Task {
+ TaskID getID();
+ TaskReport getReport();
+ TaskState getState();
+ Counters getCounters();
+ float getProgress();
+ TaskType getType();
+ Map<TaskAttemptID, TaskAttempt> getAttempts();
+ TaskAttempt getAttempt(TaskAttemptID attemptID);
+
+ /** Has Task reached the final state or not.
+ */
+ boolean isFinished();
+
+ /**
+ * Can the output of the taskAttempt be committed. Note that once the task
+ * gives a go for a commit, further canCommit requests from any other attempts
+ * should return false.
+ *
+ * @param taskAttemptID
+ * @return whether the attempt's output can be committed or not.
+ */
+ boolean canCommit(TaskAttemptID taskAttemptID);
+
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,61 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+
+/**
+ * Read only view of TaskAttempt.
+ */
+public interface TaskAttempt {
+ TaskAttemptID getID();
+ TaskAttemptReport getReport();
+ List<CharSequence> getDiagnostics();
+ Counters getCounters();
+ float getProgress();
+ TaskAttemptState getState();
+
+ /** Has attempt reached the final state or not.
+ */
+ boolean isFinished();
+
+ /**If container Assigned then return container ID, otherwise null.
+ */
+ ContainerID getAssignedContainerID();
+
+ /**If container Assigned then return container mgr address, otherwise null.
+ */
+ String getAssignedContainerMgrAddress();
+
+ /** Returns time at which container is launched. If container is not launched
+ * yet, returns 0.
+ */
+ long getLaunchTime();
+
+ /** Returns attempt's finish time. If attempt is not finished
+ * yet, returns 0.
+ */
+ long getFinishTime();
+}
|