helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [1/2] Adding support to use YARN for provisioning
Date Tue, 07 Jan 2014 22:30:53 GMT
Updated Branches:
  refs/heads/helix-provisioning 3153c5e95 -> d23198c76


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
new file mode 100644
index 0000000..b535bd9
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
@@ -0,0 +1,692 @@
+package org.apache.helix.provisioning.yarn;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Client for Distributed Shell application submission to YARN.
+ * 
+ * <p> The distributed shell client allows an application master to be launched that in turn would run 
+ * the provided shell command on a set of containers. </p>
+ * 
+ * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
+ * 
+ * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
+ * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 
+ * provides a way for the client to get access to cluster information and to request for a
+ * new {@link ApplicationId}. <p>
+ * 
+ * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
+ * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
+ * and application name, the priority assigned to the application and the queue
+ * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
+ * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
+ * the {@link ApplicationMaster} is launched. </p>
+ * 
+ * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
+ * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
+ * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
+ * {@link ApplicationMaster}. <p>
+ * 
+ * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
+ * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
+ * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
+ * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class Client {
+
+  private static final Log LOG = LogFactory.getLog(Client.class);
+
+  // Configuration
+  private Configuration conf;
+  private YarnClient yarnClient;
+  // Application master specific info to register a new Application with RM/ASM
+  private String appName = "";
+  // App master priority
+  private int amPriority = 0;
+  // Queue for App master
+  private String amQueue = "";
+  // Amt. of memory resource to request for to run the App Master
+  private int amMemory = 10; 
+
+  // Application master jar file
+  private String appMasterJar = ""; 
+  // Main class to invoke application master
+  private final String appMasterMainClass;
+
+  // Shell command to be executed 
+  private String shellCommand = ""; 
+  // Location of shell script 
+  private String shellScriptPath = ""; 
+  // Args to be passed to the shell command
+  private String shellArgs = "";
+  // Env variables to be setup for the shell command 
+  private Map<String, String> shellEnv = new HashMap<String, String>();
+  // Shell Command Container priority 
+  private int shellCmdPriority = 0;
+
+  // Amt of memory to request for container in which shell script will be executed
+  private int containerMemory = 10; 
+  // No. of containers in which the shell script needs to be executed
+  private int numContainers = 1;
+
+  // log4j.properties file 
+  // if available, add to local resources and set into classpath 
+  private String log4jPropFile = "";  
+
+  // Start time for client
+  private final long clientStartTime = System.currentTimeMillis();
+  // Timeout threshold for client. Kill app after time interval expires.
+  private long clientTimeout = 600000;
+
+  // Debug flag
+  boolean debugFlag = false;  
+
+  // Command line options
+  private Options opts;
+
+  /**
+   * @param args Command line arguments 
+   */
+  public static void main(String[] args) {
+    boolean result = false;
+    try {
+      Client client = new Client();
+      LOG.info("Initializing Client");
+      try {
+        boolean doRun = client.init(args);
+        if (!doRun) {
+          System.exit(0);
+        }
+      } catch (IllegalArgumentException e) {
+        System.err.println(e.getLocalizedMessage());
+        client.printUsage();
+        System.exit(-1);
+      }
+      result = client.run();
+    } catch (Throwable t) {
+      LOG.fatal("Error running CLient", t);
+      System.exit(1);
+    }
+    if (result) {
+      LOG.info("Application completed successfully");
+      System.exit(0);     
+    } 
+    LOG.error("Application failed to complete successfully");
+    System.exit(2);
+  }
+
+  /**
+   */
+  public Client(Configuration conf) throws Exception  {
+    this(
+      "org.apache.helix.provisioning.yarn.ApplicationMaster",
+      conf);
+  }
+
+  Client(String appMasterMainClass, Configuration conf) {
+    this.conf = conf;
+    this.appMasterMainClass = appMasterMainClass;
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    opts = new Options();
+    opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
+    opts.addOption("priority", true, "Application Priority. Default 0");
+    opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
+    opts.addOption("timeout", true, "Application timeout in milliseconds");
+    opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
+    opts.addOption("jar", true, "Jar file containing the application master");
+    opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
+    opts.addOption("shell_script", true, "Location of the shell script to be executed");
+    opts.addOption("shell_args", true, "Command line args for the shell script");
+    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
+    opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
+    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
+    opts.addOption("log_properties", true, "log4j.properties file");
+    opts.addOption("debug", false, "Dump out debug information");
+    opts.addOption("help", false, "Print usage");
+
+  }
+
+  /**
+   */
+  public Client() throws Exception  {
+    this(new YarnConfiguration());
+  }
+
+  /**
+   * Helper function to print out usage
+   */
+  private void printUsage() {
+    new HelpFormatter().printHelp("Client", opts);
+  }
+
+  /**
+   * Parse command line options
+   * @param args Parsed command line options 
+   * @return Whether the init was successful to run the client
+   * @throws ParseException
+   */
+  public boolean init(String[] args) throws ParseException {
+
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+
+    if (args.length == 0) {
+      throw new IllegalArgumentException("No args specified for client to initialize");
+    }   
+
+    if (cliParser.hasOption("help")) {
+      printUsage();
+      return false;
+    }
+
+    if (cliParser.hasOption("debug")) {
+      debugFlag = true;
+
+    }
+
+    appName = cliParser.getOptionValue("appname", "DistributedShell");
+    amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+    amQueue = cliParser.getOptionValue("queue", "default");
+    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));   
+
+    if (amMemory < 0) {
+      throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
+          + " Specified memory=" + amMemory);
+    }
+
+    if (!cliParser.hasOption("jar")) {
+      throw new IllegalArgumentException("No jar file specified for application master");
+    }   
+
+    appMasterJar = cliParser.getOptionValue("jar");
+
+    if (!cliParser.hasOption("shell_command")) {
+      throw new IllegalArgumentException("No shell command specified to be executed by application master");
+    }
+    shellCommand = cliParser.getOptionValue("shell_command");
+
+    if (cliParser.hasOption("shell_script")) {
+      shellScriptPath = cliParser.getOptionValue("shell_script");
+    }
+    if (cliParser.hasOption("shell_args")) {
+      shellArgs = cliParser.getOptionValue("shell_args");
+    }
+    if (cliParser.hasOption("shell_env")) { 
+      String envs[] = cliParser.getOptionValues("shell_env");
+      for (String env : envs) {
+        env = env.trim();
+        int index = env.indexOf('=');
+        if (index == -1) {
+          shellEnv.put(env, "");
+          continue;
+        }
+        String key = env.substring(0, index);
+        String val = "";
+        if (index < (env.length()-1)) {
+          val = env.substring(index+1);
+        }
+        shellEnv.put(key, val);
+      }
+    }
+    shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
+
+    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
+    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+
+    if (containerMemory < 0 || numContainers < 1) {
+      throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
+          + " Specified containerMemory=" + containerMemory
+          + ", numContainer=" + numContainers);
+    }
+
+    clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
+
+    log4jPropFile = cliParser.getOptionValue("log_properties", "");
+
+    return true;
+  }
+
+  /**
+   * Main run function for the client
+   * @return true if application completed successfully
+   * @throws IOException
+   * @throws YarnException
+   */
+  public boolean run() throws IOException, YarnException {
+
+    LOG.info("Running Client");
+    yarnClient.start();
+
+    YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
+    LOG.info("Got Cluster metric info from ASM" 
+        + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
+
+    List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
+        NodeState.RUNNING);
+    LOG.info("Got Cluster node info from ASM");
+    for (NodeReport node : clusterNodeReports) {
+      LOG.info("Got node report from ASM for"
+          + ", nodeId=" + node.getNodeId() 
+          + ", nodeAddress" + node.getHttpAddress()
+          + ", nodeRackName" + node.getRackName()
+          + ", nodeNumContainers" + node.getNumContainers());
+    }
+
+    QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
+    LOG.info("Queue info"
+        + ", queueName=" + queueInfo.getQueueName()
+        + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
+        + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+        + ", queueApplicationCount=" + queueInfo.getApplications().size()
+        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());   
+
+    List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
+    for (QueueUserACLInfo aclInfo : listAclInfo) {
+      for (QueueACL userAcl : aclInfo.getUserAcls()) {
+        LOG.info("User ACL Info for Queue"
+            + ", queueName=" + aclInfo.getQueueName()     
+            + ", userAcl=" + userAcl.name());
+      }
+    }   
+
+    // Get a new application id
+    YarnClientApplication app = yarnClient.createApplication();
+    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+    // TODO get min/max resource capabilities from RM and change memory ask if needed
+    // If we do not have min/max, we may not be able to correctly request 
+    // the required resources from the RM for the app master
+    // Memory ask has to be a multiple of min and less than max. 
+    // Dump out information about cluster capability as seen by the resource manager
+    int maxMem = appResponse.getMaximumResourceCapability().getMemory();
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    // A resource ask cannot exceed the max. 
+    if (amMemory > maxMem) {
+      LOG.info("AM memory specified above max threshold of cluster. Using max value."
+          + ", specified=" + amMemory
+          + ", max=" + maxMem);
+      amMemory = maxMem;
+    }       
+
+    // set the application name
+    ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+    ApplicationId appId = appContext.getApplicationId();
+    appContext.setApplicationName(appName);
+
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+    // set local resources for the application master
+    // local files or archives as needed
+    // In this scenario, the jar file for the application master is part of the local resources     
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+    LOG.info("Copy App Master jar from local filesystem and add to local environment");
+    // Copy the application master jar to the filesystem 
+    // Create a local resource to point to the destination jar path 
+    FileSystem fs = FileSystem.get(conf);
+    Path src = new Path(appMasterJar);
+    String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";     
+    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+    fs.copyFromLocalFile(false, true, src, dst);
+    FileStatus destStatus = fs.getFileStatus(dst);
+    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+
+    // Set the type of resource - file or archive
+    // archives are untarred at destination
+    // we don't need the jar file to be untarred for now
+    amJarRsrc.setType(LocalResourceType.FILE);
+    // Set visibility of the resource 
+    // Setting to most private option
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);    
+    // Set the resource to be copied over
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); 
+    // Set timestamp and length of file so that the framework 
+    // can do basic sanity checks for the local resource 
+    // after it has been copied over to ensure it is the same 
+    // resource the client intended to use with the application
+    amJarRsrc.setTimestamp(destStatus.getModificationTime());
+    amJarRsrc.setSize(destStatus.getLen());
+    localResources.put("AppMaster.jar",  amJarRsrc);
+
+    // Set the log4j properties if needed 
+    if (!log4jPropFile.isEmpty()) {
+      Path log4jSrc = new Path(log4jPropFile);
+      Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
+      fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
+      FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
+      LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
+      log4jRsrc.setType(LocalResourceType.FILE);
+      log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);    
+      log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
+      log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
+      log4jRsrc.setSize(log4jFileStatus.getLen());
+      localResources.put("log4j.properties", log4jRsrc);
+    }     
+
+    // The shell script has to be made available on the final container(s)
+    // where it will be executed. 
+    // To do this, we need to first copy into the filesystem that is visible 
+    // to the yarn framework. 
+    // We do not need to set this as a local resource for the application 
+    // master as the application master does not need it.     
+    String hdfsShellScriptLocation = ""; 
+    long hdfsShellScriptLen = 0;
+    long hdfsShellScriptTimestamp = 0;
+    if (!shellScriptPath.isEmpty()) {
+      Path shellSrc = new Path(shellScriptPath);
+      String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
+      Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
+      fs.copyFromLocalFile(false, true, shellSrc, shellDst);
+      hdfsShellScriptLocation = shellDst.toUri().toString(); 
+      FileStatus shellFileStatus = fs.getFileStatus(shellDst);
+      hdfsShellScriptLen = shellFileStatus.getLen();
+      hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
+    }
+
+    // Set local resource info into app master container launch context
+    amContainer.setLocalResources(localResources);
+
+    // Set the necessary security tokens as needed
+    //amContainer.setContainerTokens(containerToken);
+
+    // Set the env variables to be setup in the env where the application master will be run
+    LOG.info("Set the environment for the application master");
+    Map<String, String> env = new HashMap<String, String>();
+
+    // put location of shell script into env
+    // using the env info, the application master will create the correct local resource for the 
+    // eventual containers that will be launched to execute the shell scripts
+    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
+    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
+    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
+
+    // Add AppMaster.jar location to classpath    
+    // At some point we should not be required to add 
+    // the hadoop specific classpaths to the env. 
+    // It should be provided out of the box. 
+    // For now setting all required classpaths including
+    // the classpath to "." for the application jar
+    StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
+      .append(File.pathSeparatorChar).append("./*");
+    for (String c : conf.getStrings(
+        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(File.pathSeparatorChar);
+      classPathEnv.append(c.trim());
+    }
+    classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
+
+    // add the runtime classpath needed for tests to work
+    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      classPathEnv.append(':');
+      classPathEnv.append(System.getProperty("java.class.path"));
+    }
+
+    env.put("CLASSPATH", classPathEnv.toString());
+
+    amContainer.setEnvironment(env);
+
+    // Set the necessary command to execute the application master 
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command 
+    LOG.info("Setting up app master command");
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+    // Set Xmx based on am memory size
+    vargs.add("-Xmx" + amMemory + "m");
+    // Set class name 
+    vargs.add(appMasterMainClass);
+    // Set params for Application Master
+    vargs.add("--container_memory " + String.valueOf(containerMemory));
+    vargs.add("--num_containers " + String.valueOf(numContainers));
+    vargs.add("--priority " + String.valueOf(shellCmdPriority));
+    if (!shellCommand.isEmpty()) {
+      vargs.add("--shell_command " + shellCommand + "");
+    }
+    if (!shellArgs.isEmpty()) {
+      vargs.add("--shell_args " + shellArgs + "");
+    }
+    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
+      vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
+    }     
+    if (debugFlag) {
+      vargs.add("--debug");
+    }
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up app master command " + command.toString());     
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());   
+    amContainer.setCommands(commands);
+
+    // Set up resource type requirements
+    // For now, only memory is supported so we set memory requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(amMemory);
+    appContext.setResource(capability);
+
+    // Service data is a binary blob that can be passed to the application
+    // Not needed in this scenario
+    // amContainer.setServiceData(serviceData);
+
+    // Setup security tokens
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials credentials = new Credentials();
+      String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
+      if (tokenRenewer == null || tokenRenewer.length() == 0) {
+        throw new IOException(
+          "Can't get Master Kerberos principal for the RM to use as renewer");
+      }
+
+      // For now, only getting tokens for the default file-system.
+      final Token<?> tokens[] =
+          fs.addDelegationTokens(tokenRenewer, credentials);
+      if (tokens != null) {
+        for (Token<?> token : tokens) {
+          LOG.info("Got dt for " + fs.getUri() + "; " + token);
+        }
+      }
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      amContainer.setTokens(fsTokens);
+    }
+
+    appContext.setAMContainerSpec(amContainer);
+
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    // TODO - what is the range for priority? how to decide? 
+    pri.setPriority(amPriority);
+    appContext.setPriority(pri);
+
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue(amQueue);
+
+    // Submit the application to the applications manager
+    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
+    // Ignore the response as either a valid response object is returned on success 
+    // or an exception thrown to denote some form of a failure
+    LOG.info("Submitting application to ASM");
+
+    yarnClient.submitApplication(appContext);
+
+    // TODO
+    // Try submitting the same request again
+    // app submission failure?
+
+    // Monitor the application
+    return monitorApplication(appId);
+
+  }
+
+  /**
+   * Monitor the submitted application for completion. 
+   * Kill application if time expires. 
+   * @param appId Application Id of application to be monitored
+   * @return true if application completed successfully
+   * @throws YarnException
+   * @throws IOException
+   */
+  private boolean monitorApplication(ApplicationId appId)
+      throws YarnException, IOException {
+
+    while (true) {
+
+      // Check app status every 1 second.
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.debug("Thread sleep in monitoring loop interrupted");
+      }
+
+      // Get application report for the appId we are interested in 
+      ApplicationReport report = yarnClient.getApplicationReport(appId);
+
+      LOG.info("Got application report from ASM for"
+          + ", appId=" + appId.getId()
+          + ", clientToAMToken=" + report.getClientToAMToken()
+          + ", appDiagnostics=" + report.getDiagnostics()
+          + ", appMasterHost=" + report.getHost()
+          + ", appQueue=" + report.getQueue()
+          + ", appMasterRpcPort=" + report.getRpcPort()
+          + ", appStartTime=" + report.getStartTime()
+          + ", yarnAppState=" + report.getYarnApplicationState().toString()
+          + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+          + ", appTrackingUrl=" + report.getTrackingUrl()
+          + ", appUser=" + report.getUser());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+      if (YarnApplicationState.FINISHED == state) {
+        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+          LOG.info("Application has completed successfully. Breaking monitoring loop");
+          return true;        
+        }
+        else {
+          LOG.info("Application did finished unsuccessfully."
+              + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+              + ". Breaking monitoring loop");
+          return false;
+        }       
+      }
+      else if (YarnApplicationState.KILLED == state 
+          || YarnApplicationState.FAILED == state) {
+        LOG.info("Application did not finish."
+            + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+            + ". Breaking monitoring loop");
+        return false;
+      }     
+
+      if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
+        LOG.info("Reached client specified timeout for application. Killing application");
+        forceKillApplication(appId);
+        return false;       
+      }
+    }     
+
+  }
+
+  /**
+   * Kill a submitted application by sending a call to the ASM
+   * @param appId Application Id to be killed. 
+   * @throws YarnException
+   * @throws IOException
+   */
+  private void forceKillApplication(ApplicationId appId)
+      throws YarnException, IOException {
+    // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
+    // the same time. 
+    // If yes, can we kill a particular attempt only?
+
+    // Response can be ignored as it is non-null on success or 
+    // throws an exception in case of failures
+    yarnClient.killApplication(appId);  
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerAskResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerAskResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerAskResponse.java
new file mode 100644
index 0000000..c570932
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerAskResponse.java
@@ -0,0 +1,17 @@
+package org.apache.helix.provisioning.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+public class ContainerAskResponse {
+  
+  Container container;
+
+  public Container getContainer() {
+    return container;
+  }
+
+  public void setContainer(Container container) {
+    this.container = container;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerLaunchResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerLaunchResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerLaunchResponse.java
new file mode 100644
index 0000000..c91cb93
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerLaunchResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning.yarn;
+
+public class ContainerLaunchResponse {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerReleaseResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerReleaseResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerReleaseResponse.java
new file mode 100644
index 0000000..77d50ba
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerReleaseResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning.yarn;
+
+public class ContainerReleaseResponse {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerStopResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerStopResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerStopResponse.java
new file mode 100644
index 0000000..4c0022a
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerStopResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning.yarn;
+
+public class ContainerStopResponse {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/DSConstants.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/DSConstants.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/DSConstants.java
new file mode 100644
index 0000000..a9fdf3d
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/DSConstants.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.provisioning.yarn;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Constants used in both Client and Application Master
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class DSConstants {
+
+  /**
+   * Environment key name pointing to the shell script's location
+   */
+  public static final String DISTRIBUTEDSHELLSCRIPTLOCATION = "DISTRIBUTEDSHELLSCRIPTLOCATION";
+
+  /**
+   * Environment key name denoting the file timestamp for the shell script. 
+   * Used to validate the local resource. 
+   */
+  public static final String DISTRIBUTEDSHELLSCRIPTTIMESTAMP = "DISTRIBUTEDSHELLSCRIPTTIMESTAMP";
+
+  /**
+   * Environment key name denoting the file content length for the shell script. 
+   * Used to validate the local resource. 
+   */
+  public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN";
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
new file mode 100644
index 0000000..d3f410f
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.provisioning.yarn;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class GenericApplicationMaster {
+
+  static final Log LOG = LogFactory.getLog(GenericApplicationMaster.class);
+
+  // Configuration
+  private Configuration conf;
+
+  // Handle to communicate with the Resource Manager
+  AMRMClientAsync<ContainerRequest> amRMClient;
+
+  // Handle to communicate with the Node Manager
+  NMClientAsync nmClientAsync;
+  // Listen to process the response from the Node Manager
+  NMCallbackHandler containerListener;
+
+  // Application Attempt Id ( combination of attemptId and fail count )
+  private ApplicationAttemptId appAttemptID;
+
+  // TODO
+  // For status update for clients - yet to be implemented
+  // Hostname of the container
+  private String appMasterHostname = "";
+  // Port on which the app master listens for status updates from clients
+  private int appMasterRpcPort = -1;
+  // Tracking url to which app master publishes info for clients to monitor
+  private String appMasterTrackingUrl = "";
+
+  // Counter for completed containers ( complete denotes successful or failed )
+  AtomicInteger numCompletedContainers = new AtomicInteger();
+  // Allocated container count so that we know how many containers has the RM
+  // allocated to us
+  AtomicInteger numAllocatedContainers = new AtomicInteger();
+  // Count of failed containers
+  AtomicInteger numFailedContainers = new AtomicInteger();
+  // Count of containers already requested from the RM
+  // Needed as once requested, we should not request for containers again.
+  // Only request for more if the original requirement changes.
+  AtomicInteger numRequestedContainers = new AtomicInteger();
+  Map<ContainerRequest, SettableFuture<ContainerAskResponse>> containerRequestMap =
+      new LinkedHashMap<AMRMClient.ContainerRequest, SettableFuture<ContainerAskResponse>>();
+
+  ByteBuffer allTokens;
+
+  // Launch threads
+  List<Thread> launchThreads = new ArrayList<Thread>();
+
+  public GenericApplicationMaster(ApplicationAttemptId appAttemptID) {
+    this.appAttemptID = appAttemptID;
+    // Set up the configuration
+    conf = new YarnConfiguration();
+  }
+  
+  /**
+   * Dump out contents of $CWD and the environment to stdout for debugging
+   */
+  private void dumpOutDebugInfo() {
+
+    LOG.info("Dump debug output");
+    Map<String, String> envs = System.getenv();
+    for (Map.Entry<String, String> env : envs.entrySet()) {
+      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
+      System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
+    }
+
+    String cmd = "ls -al";
+    Runtime run = Runtime.getRuntime();
+    Process pr = null;
+    try {
+      pr = run.exec(cmd);
+      pr.waitFor();
+
+      BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+      String line = "";
+      while ((line = buf.readLine()) != null) {
+        LOG.info("System CWD content: " + line);
+        System.out.println("System CWD content: " + line);
+      }
+      buf.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+
+
+  /**
+   * Parse command line options
+   * @param args Command line args
+   * @return Whether init successful and run should be invoked
+   * @throws ParseException
+   * @throws IOException
+   * @throws YarnException 
+   */
+  public boolean start() throws ParseException, IOException, YarnException {
+
+    if (Boolean.getBoolean(System.getenv("debug"))) {
+      dumpOutDebugInfo();
+    }
+
+    Map<String, String> envs = System.getenv();
+
+    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
+      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+          + " not set in the environment");
+    }
+    if (!envs.containsKey(Environment.NM_HOST.name())) {
+      throw new RuntimeException(Environment.NM_HOST.name() + " not set in the environment");
+    }
+    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
+      throw new RuntimeException(Environment.NM_HTTP_PORT + " not set in the environment");
+    }
+    if (!envs.containsKey(Environment.NM_PORT.name())) {
+      throw new RuntimeException(Environment.NM_PORT.name() + " not set in the environment");
+    }
+
+    LOG.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
+        + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp()
+        + ", attemptId=" + appAttemptID.getAttemptId());
+
+    LOG.info("Starting ApplicationMaster");
+
+    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    // Now remove the AM->RM token so that containers cannot access it.
+    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(this);
+    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+    amRMClient.init(conf);
+    amRMClient.start();
+
+    containerListener = createNMCallbackHandler();
+    nmClientAsync = new NMClientAsyncImpl(containerListener);
+    nmClientAsync.init(conf);
+    nmClientAsync.start();
+
+    // Setup local RPC Server to accept status requests directly from clients
+    // TODO need to setup a protocol for client to be able to communicate to
+    // the RPC server
+    // TODO use the rpc port info to register with the RM for the client to
+    // send requests to this app master
+
+    // Register self with ResourceManager
+    // This will start heartbeating to the RM
+    appMasterHostname = NetUtils.getHostname();
+    RegisterApplicationMasterResponse response =
+        amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+            appMasterTrackingUrl);
+    // Dump out information about cluster capability as seen by the
+    // resource manager
+    int maxMem = response.getMaximumResourceCapability().getMemory();
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+    return true;
+  }
+
+
+  public Future<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
+    amRMClient.addContainerRequest(containerAsk);
+    numRequestedContainers.incrementAndGet();
+    SettableFuture<ContainerAskResponse> future = SettableFuture.create();
+    return future;
+  }
+
+  public Future<ContainerStopResponse> stopContainer(Container container) {
+    nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+    SettableFuture<ContainerStopResponse> future = SettableFuture.create();
+    return future;
+  }
+
+  public Future<ContainerReleaseResponse> releaseContainer(Container container) {
+    amRMClient.releaseAssignedContainer(container.getId());
+    SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
+    return future;
+  }
+
+  public Future<ContainerLaunchResponse> launchContainer(Container container,
+      ContainerLaunchContext containerLaunchContext) {
+    nmClientAsync.startContainerAsync(container, containerLaunchContext);
+    SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
+    return future;
+  }
+
+  @VisibleForTesting
+  NMCallbackHandler createNMCallbackHandler() {
+    return new NMCallbackHandler(this);
+  }
+
+  public void finish() {
+    // Join all launched threads
+    // needed for when we time out
+    // and we need to release containers
+    for (Thread launchThread : launchThreads) {
+      try {
+        launchThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Exception thrown in thread join: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+
+    // When the application completes, it should stop all running containers
+    LOG.info("Application completed. Stopping running containers");
+    nmClientAsync.stop();
+
+    // When the application completes, it should send a finish application
+    // signal to the RM
+    LOG.info("Application completed. Signalling finish to RM");
+
+    FinalApplicationStatus appStatus;
+    String appMessage = null;
+    appStatus = FinalApplicationStatus.SUCCEEDED;
+    try {
+      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+    } catch (YarnException ex) {
+      LOG.error("Failed to unregister application", ex);
+    } catch (IOException e) {
+      LOG.error("Failed to unregister application", e);
+    }
+
+    amRMClient.stop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
new file mode 100644
index 0000000..c54f87f
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
@@ -0,0 +1,79 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+ * that will execute the shell command.
+ */
+class LaunchContainerRunnable implements Runnable {
+
+  /**
+   * 
+   */
+  private final GenericApplicationMaster _genericApplicationMaster;
+
+  // Allocated container
+  Container container;
+
+  NMCallbackHandler containerListener;
+
+  /**
+   * @param lcontainer Allocated container
+   * @param containerListener Callback handler of the container
+   * @param genericApplicationMaster TODO
+   */
+  public LaunchContainerRunnable(GenericApplicationMaster genericApplicationMaster, Container lcontainer, NMCallbackHandler containerListener) {
+    _genericApplicationMaster = genericApplicationMaster;
+    this.container = lcontainer;
+    this.containerListener = containerListener;
+  }
+
+  @Override
+  /**
+   * Connects to CM, sets up container launch context 
+   * for shell command and eventually dispatches the container 
+   * start request to the CM. 
+   */
+  public void run() {
+    GenericApplicationMaster.LOG.info("Setting up container launch container for containerid=" + container.getId());
+    ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+    // Set the environment
+    //ctx.setEnvironment(shellEnv);
+
+    // Set the local resources
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+    ctx.setLocalResources(localResources);
+
+    // Set the necessary command to execute on the allocated container
+    Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+  
+    List<String> commands = new ArrayList<String>();
+   // commands.add(command.toString());
+    ctx.setCommands(commands);
+
+    // Set up tokens for the container too. Today, for normal shell commands,
+    // the container in distribute-shell doesn't need any tokens. We are
+    // populating them mainly for NodeManagers to be able to download any
+    // files in the distributed file-system. The tokens are otherwise also
+    // useful in cases, for e.g., when one is running a "hadoop dfs" command
+    // inside the distributed shell.
+    ctx.setTokens(_genericApplicationMaster.allTokens.duplicate());
+
+    containerListener.addContainer(container.getId(), container);
+    _genericApplicationMaster.nmClientAsync.startContainerAsync(container, ctx);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
new file mode 100644
index 0000000..df73946
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
@@ -0,0 +1,74 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+
+  private ConcurrentMap<ContainerId, Container> containers =
+      new ConcurrentHashMap<ContainerId, Container>();
+  private final GenericApplicationMaster applicationMaster;
+
+  public NMCallbackHandler(GenericApplicationMaster applicationMaster) {
+    this.applicationMaster = applicationMaster;
+  }
+
+  public void addContainer(ContainerId containerId, Container container) {
+    containers.putIfAbsent(containerId, container);
+  }
+
+  @Override
+  public void onContainerStopped(ContainerId containerId) {
+    if (GenericApplicationMaster.LOG.isDebugEnabled()) {
+      GenericApplicationMaster.LOG.debug("Succeeded to stop Container " + containerId);
+    }
+    containers.remove(containerId);
+  }
+
+  @Override
+  public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+    if (GenericApplicationMaster.LOG.isDebugEnabled()) {
+      GenericApplicationMaster.LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus);
+    }
+  }
+
+  @Override
+  public void onContainerStarted(ContainerId containerId,
+      Map<String, ByteBuffer> allServiceResponse) {
+    if (GenericApplicationMaster.LOG.isDebugEnabled()) {
+      GenericApplicationMaster.LOG.debug("Succeeded to start Container " + containerId);
+    }
+    Container container = containers.get(containerId);
+    if (container != null) {
+      applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+    }
+  }
+
+  @Override
+  public void onStartContainerError(ContainerId containerId, Throwable t) {
+    GenericApplicationMaster.LOG.error("Failed to start Container " + containerId);
+    containers.remove(containerId);
+    applicationMaster.numCompletedContainers.incrementAndGet();
+    applicationMaster.numFailedContainers.incrementAndGet();
+  }
+
+  @Override
+  public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+    GenericApplicationMaster.LOG.error("Failed to query the status of Container " + containerId);
+  }
+
+  @Override
+  public void onStopContainerError(ContainerId containerId, Throwable t) {
+    GenericApplicationMaster.LOG.error("Failed to stop Container " + containerId);
+    containers.remove(containerId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
new file mode 100644
index 0000000..659411c
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -0,0 +1,109 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+
+class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+  /**
+   * 
+   */
+  private final GenericApplicationMaster _genericApplicationMaster;
+
+  /**
+   * @param genericApplicationMaster
+   */
+  RMCallbackHandler(GenericApplicationMaster genericApplicationMaster) {
+    _genericApplicationMaster = genericApplicationMaster;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+    GenericApplicationMaster.LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+    for (ContainerStatus containerStatus : completedContainers) {
+      GenericApplicationMaster.LOG.info("Got container status for containerID=" + containerStatus.getContainerId()
+          + ", state=" + containerStatus.getState() + ", exitStatus="
+          + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
+
+      // non complete containers should not be here
+      assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+      // increment counters for completed/failed containers
+      int exitStatus = containerStatus.getExitStatus();
+      if (0 != exitStatus) {
+        // container failed
+        if (ContainerExitStatus.ABORTED != exitStatus) {
+          // shell script failed
+          // counts as completed
+          _genericApplicationMaster.numCompletedContainers.incrementAndGet();
+          _genericApplicationMaster.numFailedContainers.incrementAndGet();
+        } else {
+          // container was killed by framework, possibly preempted
+          // we should re-try as the container was lost for some reason
+          _genericApplicationMaster.numAllocatedContainers.decrementAndGet();
+          _genericApplicationMaster.numRequestedContainers.decrementAndGet();
+          // we do not need to release the container as it would be done
+          // by the RM
+        }
+      } else {
+        // nothing to do
+        // container completed successfully
+        _genericApplicationMaster.numCompletedContainers.incrementAndGet();
+        GenericApplicationMaster.LOG.info("Container completed successfully." + ", containerId="
+            + containerStatus.getContainerId());
+      }
+    }
+  }
+
+  @Override
+  public void onContainersAllocated(List<Container> allocatedContainers) {
+    GenericApplicationMaster.LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+    _genericApplicationMaster.numAllocatedContainers.addAndGet(allocatedContainers.size());
+    for (Container allocatedContainer : allocatedContainers) {
+      GenericApplicationMaster.LOG.info("Launching shell command on a new container." + ", containerId="
+          + allocatedContainer.getId() + ", containerNode="
+          + allocatedContainer.getNodeId().getHost() + ":"
+          + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+          + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+          + allocatedContainer.getResource().getMemory());
+      // + ", containerToken"
+      // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+      LaunchContainerRunnable runnableLaunchContainer =
+          new LaunchContainerRunnable(_genericApplicationMaster, allocatedContainer, _genericApplicationMaster.containerListener);
+      Thread launchThread = new Thread(runnableLaunchContainer);
+
+      // launch and start the container on a separate thread to keep
+      // the main thread unblocked
+      // as all containers may not be allocated at one go.
+      _genericApplicationMaster.launchThreads.add(launchThread);
+      launchThread.start();
+    }
+  }
+
+  @Override
+  public void onShutdownRequest() {
+  }
+
+  @Override
+  public void onNodesUpdated(List<NodeReport> updatedNodes) {
+  }
+
+  @Override
+  public float getProgress() {
+    // set progress to deliver to RM on next heartbeat
+    return 0.5f;
+  }
+
+  @Override
+  public void onError(Throwable e) {
+    _genericApplicationMaster.amRMClient.stop();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnApplicationMaster.java
new file mode 100644
index 0000000..55d79bb
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnApplicationMaster.java
@@ -0,0 +1,16 @@
+package org.apache.helix.provisioning.yarn;
+
+/**
+ * This will <br/>
+ * <ul>
+ * <li> start zookeeper automatically</li>
+ * <li>create the cluster</li>
+ * <li>set up resource(s)</li>
+ * <li>start helix controller</li> 
+ * </ul>
+ */
+public class YarnApplicationMaster {
+  public static void main(String[] args) {
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
new file mode 100644
index 0000000..bfaa209
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -0,0 +1,120 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ContainerId;
+import org.apache.helix.controller.provisioner.ContainerSpec;
+import org.apache.helix.controller.provisioner.ContainerState;
+import org.apache.helix.controller.provisioner.Provisioner;
+import org.apache.helix.controller.provisioner.TargetProviderResponse;
+
+public class YarnProvisioner implements Provisioner {
+
+  private static final Log LOG = LogFactory.getLog(YarnProvisioner.class);
+  static GenericApplicationMaster applicationMaster;
+  Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
+
+  @Override
+  public ContainerId allocateContainer(ContainerSpec spec) {
+    ContainerRequest containerAsk = setupContainerAskForRM(spec);
+    Future<ContainerAskResponse> requestNewContainer =
+        applicationMaster.acquireContainer(containerAsk);
+    ContainerAskResponse containerAskResponse;
+    try {
+      containerAskResponse = requestNewContainer.get();
+      ContainerId helixContainerId =
+          ContainerId.from(containerAskResponse.getContainer().getId().toString());
+      allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
+      return helixContainerId;
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  @Override
+  public boolean deallocateContainer(ContainerId containerId) {
+    Future<ContainerReleaseResponse> releaseContainer =
+        applicationMaster.releaseContainer(allocatedContainersMap.get(containerId));
+    try {
+      releaseContainer.get();
+      return true;
+    } catch (InterruptedException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return false;
+  }
+
+  @Override
+  public boolean startContainer(ContainerId containerId) {
+    Container container = allocatedContainersMap.get(containerId);
+    ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
+    applicationMaster.launchContainer(container, containerLaunchContext);
+    return false;
+  }
+
+  @Override
+  public boolean stopContainer(ContainerId containerId) {
+    return false;
+  }
+
+  @Override
+  public ContainerState getContainerState(ContainerId containerId) {
+    return null;
+  }
+
+  @Override
+  public void init(HelixManager helixManager) {
+
+  }
+
+  @Override
+  public TargetProviderResponse evaluateExistingContainers(Cluster cluster, ResourceId resourceId,
+      Collection<Participant> participants) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private ContainerRequest setupContainerAskForRM(ContainerSpec spec) {
+    // setup requirements for hosts
+    // using * as any host will do for the distributed shell app
+    // set the priority for the request
+    Priority pri = Records.newRecord(Priority.class);
+    int requestPriority = 0;
+    // TODO - what is the range for priority? how to decide?
+    pri.setPriority(requestPriority);
+
+    // Set up resource type requirements
+    // For now, only memory is supported so we set memory requirements
+    Resource capability = Records.newRecord(Resource.class);
+    int memory = 1024;
+    capability.setMemory(memory);
+
+    ContainerRequest request = new ContainerRequest(capability, null, null, pri);
+    LOG.info("Requested container ask: " + request.toString());
+    return request;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/test/conf/testng.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/test/conf/testng.xml b/helix-provisioning/src/test/conf/testng.xml
new file mode 100644
index 0000000..37bccf3
--- /dev/null
+++ b/helix-provisioning/src/test/conf/testng.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
+<suite name="Suite" parallel="none">
+  <test name="Test" preserve-order="false">
+    <packages>
+      <package name="org.apache.helix.agent"/>
+    </packages>
+  </test>
+</suite>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5c17bd1..f3d2e61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,7 @@ under the License.
     <module>helix-core</module>
     <module>helix-admin-webapp</module>
     <module>helix-agent</module>
+    <module>helix-provisioning</module>
     <module>helix-examples</module>
     <module>recipes</module>
     <module>site-releases</module>


Mime
View raw message