drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [08/12] drill git commit: DRILL-1170: YARN integration for Drill
Date Sun, 04 Mar 2018 17:13:48 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java
new file mode 100644
index 0000000..01b89ae
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public abstract class AbstractScheduler implements Scheduler {
+  private static final Log LOG = LogFactory.getLog(AbstractScheduler.class);
+  private final String name;
+  private final String type;
+  protected TaskSpec taskSpec;
+  protected int priority;
+  protected int failCount;
+  protected TaskManager taskManager;
+  protected SchedulerState state;
+  protected boolean isTracked;
+
+  public AbstractScheduler(String type, String name) {
+    this.type = type;
+    this.name = name;
+    taskManager = new AbstractTaskManager();
+  }
+
+  public void setTaskManager(TaskManager taskManager) {
+    this.taskManager = taskManager;
+  }
+
+  @Override
+  public void registerState(SchedulerState state) {
+    this.state = state;
+  }
+
+  @Override
+  public void setPriority(int priority) {
+    this.priority = priority;
+    taskSpec.containerSpec.priority = priority;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+  @Override
+  public TaskManager getTaskManager() {
+    return taskManager;
+  }
+
+  @Override
+  public void change(int delta) {
+    resize(getTarget() + delta);
+  }
+
+  protected void addTasks(int n) {
+    LOG.info( "[" + getName( ) + "] - Adding " + n + " tasks" );
+    for (int i = 0; i < n; i++) {
+      state.start(new Task(this, taskSpec));
+    }
+  }
+
+  @Override
+  public boolean isTracked() {
+    return isTracked;
+  }
+
+  @Override
+  public ContainerRequestSpec getResource() {
+    return taskSpec.containerSpec;
+  }
+
+  @Override
+  public void limitContainerSize(Resource maxResource) throws AMException {
+    if (taskSpec.containerSpec.memoryMb > maxResource.getMemory()) {
+      LOG.warn(taskSpec.name + " requires " + taskSpec.containerSpec.memoryMb
+          + " MB but the maximum YARN container size is "
+          + maxResource.getMemory() + " MB");
+      taskSpec.containerSpec.memoryMb = maxResource.getMemory();
+    }
+    if (taskSpec.containerSpec.vCores > maxResource.getVirtualCores()) {
+      LOG.warn(taskSpec.name + " requires " + taskSpec.containerSpec.vCores
+          + " vcores but the maximum YARN container size is "
+          + maxResource.getVirtualCores() + " vcores");
+      taskSpec.containerSpec.vCores = maxResource.getVirtualCores();
+    }
+  }
+
+  @Override
+  public int getRequestTimeoutSec() { return 0; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java
new file mode 100644
index 0000000..7acd402
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.drill.yarn.appMaster.Scheduler.TaskManager;
+import org.apache.drill.yarn.core.LaunchSpec;
+
+/**
+ * Task manager that does nothing.
+ */
+
+public class AbstractTaskManager implements TaskManager {
+  @Override
+  public int maxConcurrentAllocs() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public void allocated(EventContext context) {
+  }
+
+  @Override
+  public LaunchSpec getLaunchSpec(Task task) {
+    return task.getLaunchSpec();
+  }
+
+  @Override
+  public boolean stop(Task task) { return false; }
+
+  @Override
+  public void completed(EventContext context) { }
+
+  @Override
+  public boolean isLive(EventContext context) { return true; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java
new file mode 100644
index 0000000..8f3aaab
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public class BatchScheduler extends AbstractScheduler {
+  private int quantity;
+  private int completedCount;
+
+  public BatchScheduler(String name, int quantity) {
+    super("batch", name);
+    this.quantity = quantity;
+  }
+
+  @Override
+  public void completed(Task task) {
+    completedCount++;
+    if (task.getDisposition() != Task.Disposition.COMPLETED) {
+      failCount++;
+    }
+  }
+
+  @Override
+  public int resize(int level) { quantity = level; return quantity; }
+
+  @Override
+  public int getTarget() { return quantity; }
+
+  @Override
+  public int[] getProgress() {
+    return new int[] { Math.min(completedCount, quantity), quantity };
+  }
+
+  @Override
+  public void adjust() {
+    int activeCount = state.getTaskCount();
+    int delta = quantity - activeCount - completedCount;
+    if (delta < 0) {
+      addTasks(-delta);
+    }
+    if (delta > 0) {
+      cancelTasks(delta);
+    }
+  }
+
+  /**
+   * Cancel any starting tasks. We don't cancel launched, in-flight tasks
+   * because there is no way to tell YARN to cancel tasks that are in the
+   * process of being launched: we have to wait for them to start
+   * before canceling.
+   *
+   * @param n
+   */
+
+  private void cancelTasks(int n) {
+    for (Task task : state.getStartingTasks()) {
+      state.cancel(task);
+      if (--n == 0) {
+        break;
+      }
+    }
+  }
+
+  @Override
+  public boolean hasMoreTasks() {
+    return completedCount < quantity;
+  }
+
+  @Override
+  public void requestTimedOut() {
+    // Not clear what to do here. Since this case is used only for testing,
+    // deal with this case later.
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java
new file mode 100644
index 0000000..6aaa18b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ * Interface which identifies the cluster controller methods that are save to
+ * call from the {@link Dispatcher}. Methods here are either designed to be
+ * called before the event threads start or after they complete. The remainder
+ * synchronized to coordinate between event threads.
+ */
+
+public interface ClusterController extends RegistryHandler {
+  void enableFailureCheck(boolean flag);
+
+  void registerLifecycleListener(TaskLifecycleListener listener);
+
+  void registerScheduler(Scheduler resourceGroup);
+
+  void setProperty(String key, Object value);
+
+  Object getProperty(String key);
+
+  /**
+   * Called after the dispatcher has started YARN and other server
+   * components. The controller can now begin to spin up tasks.
+   */
+
+  void started( ) throws YarnFacadeException, AMException;
+
+  /**
+   * Called by the timer ("pulse") thread to trigger time-based events.
+   *
+   * @param curTime
+   */
+
+  void tick(long curTime);
+
+  /**
+   * The RM has allocated one or more containers in response to container
+   * requests submitted to the RM.
+   *
+   * @param containers
+   *          the set of containers provided by YARN
+   * @return the set of tasks to launch
+   */
+
+  void containersAllocated(List<Container> containers);
+
+  /**
+   * The NM reports that a container has successfully started.
+   *
+   * @param containerId
+   *          the container which started
+   */
+
+  void containerStarted(ContainerId containerId);
+
+  /**
+   * The RM API reports that an attempt to start a container has failed locally.
+   *
+   * @param containerId
+   *          the container that failed to launch
+   * @param t
+   *          the error that occurred
+   */
+
+  void taskStartFailed(ContainerId containerId, Throwable t);
+
+  /**
+   * The Node Manager reports that a container has stopped.
+   *
+   * @param containerId
+   */
+  void containerStopped(ContainerId containerId);
+
+  /**
+   * The Resource Manager reports that containers have completed with the given
+   * statuses. Find the task for each container and mark them as completed.
+   *
+   * @param statuses
+   */
+
+  void containersCompleted(List<ContainerStatus> statuses);
+
+  float getProgress();
+
+  /**
+   * The Node Manager API reports that a request sent to the NM to stop a task
+   * has failed.
+   *
+   * @param containerId
+   *          the container that failed to stop
+   * @param t
+   *          the reason that the stop request failed
+   */
+
+  void stopTaskFailed(ContainerId containerId, Throwable t);
+
+  /**
+   * Request to resize the Drill cluster by a relative amount.
+   *
+   * @param delta
+   *          the amount of change. Can be positive (to grow) or negative (to
+   *          shrink the cluster)
+   */
+
+  void resizeDelta(int delta);
+
+  /**
+   * Request to resize the Drill cluster to the given size.
+   *
+   * @param n
+   *          the desired cluster size
+   */
+
+  int resizeTo(int n);
+
+  /**
+   * Indicates a request to gracefully shut down the cluster.
+   */
+
+  void shutDown();
+
+  /**
+   * Called by the main thread to wait for the normal shutdown of the
+   * controller. Such shutdown occurs when the admin sends a sutdown
+   * command from the UI or REST API.
+   *
+   * @return
+   */
+
+  boolean waitForCompletion();
+
+  void updateRMStatus();
+
+  void setMaxRetries(int value);
+
+  /**
+   * Allow an observer to see a consistent view of the controller's
+   * state by performing the visit in a synchronized block.
+   * @param visitor
+   */
+
+  void visit( ControllerVisitor visitor );
+
+  /**
+   * Allow an observer to see a consistent view of the controller's
+   * task state by performing the visit in a synchronized block.
+   *
+   * @param visitor
+   */
+
+  void visitTasks( TaskVisitor visitor );
+
+  /**
+   * Return the target number of tasks that the controller seeks to maintain.
+   * This is the sum across all pools.
+   *
+   * @return
+   */
+
+  int getTargetCount();
+
+  boolean isTaskLive(int id);
+
+  /**
+   * Cancels the given task, reducing the target task count. Called
+   * from the UI to allow the user to select the specific task to end
+   * when reducing cluster size.
+   *
+   * @param id
+   * @return
+   */
+
+  boolean cancelTask(int id);
+
+  /**
+   * Whether this distribution of YARN supports disk resources.
+   * @return
+   */
+
+  boolean supportsDiskResource();
+
+  int getFreeNodeCount();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java
new file mode 100644
index 0000000..3c011ec
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.TaskLifecycleListener.Event;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+
+/**
+ * Controls the Drill cluster by representing the current cluster state with a
+ * desired state, taking corrective action to keep the cluster in the desired
+ * state. The cluster as a whole has a state, as do each task (node) within the
+ * cluster.
+ * <p>
+ * This class is designed to allow unit tests. In general, testing the
+ * controller on a live cluster is tedious. This class encapsulates the
+ * controller algorithm so it can be driven by a simulated cluster.
+ * <p>
+ * This object is shared between threads, thus synchronized.
+ */
+
+public class ClusterControllerImpl implements ClusterController {
+  /**
+   * Controller lifecycle state.
+   */
+
+  public enum State {
+    /**
+     * Cluster is starting. Things are in a partially-built state. No tasks are
+     * started until the cluster moves to LIVE.
+     */
+
+    START,
+
+    /**
+     * Normal operating state: the controller seeks to maintain the desired
+     * number of tasks.
+     */
+
+    LIVE,
+
+    /**
+     * Controller is shutting down. Tasks are gracefully (where possible) ended;
+     * no new tasks are started. (That is, when we detect the exit of a task,
+     * the controller no longer immediately tries to start a replacement.
+     */
+
+    ENDING,
+
+    /**
+     * The controller has shut down. All tasks and threads are stopped. The
+     * controller allows the main thread (which has been patiently waiting) to
+     * continue, allowing the AM itself to shut down. Thus, this is a very
+     * short-lived state.
+     */
+
+    ENDED,
+
+    /**
+     * Something bad happened on start-up; the AM can't start and must shut
+     * down.
+     */
+
+    FAILED
+  }
+
+  private final static int PRIORITY_OFFSET = 1;
+
+  private static final Log LOG = LogFactory.getLog(ClusterControllerImpl.class);
+
+  /**
+   * Signals the completion of the cluster run. The main program waits on this
+   * mutex until all tasks complete (batch) or the cluster is explicitly shut
+   * down (persistent tasks.)
+   */
+
+  private Object completionMutex = new Object();
+
+  /**
+   * Maximum number of retries for each task launch.
+   */
+
+  protected int maxRetries = 3;
+
+  /**
+   * Controller state.
+   *
+   * @see {@link State}
+   */
+
+  State state = State.START;
+
+  /**
+   * Definition of the task types that can be run by this controller, along with
+   * the target task levels for each.
+   */
+
+  private Map<String, SchedulerStateActions> taskPools = new HashMap<>();
+
+  /**
+   * List of task pools prioritized in the order in which tasks should start.
+   * DoY supports only one task pool at present. The idea is to, later, support
+   * multiple pools that represent, say, pool 1 as the minimum number of
+   * Drillbits to run at all times, with pool 2 as extra Drillbits to start up
+   * during peak demand.
+   * <p>
+   * The priority also gives rise to YARN request priorities which are the only
+   * tool the AM has to associate container grants with the requests to which
+   * they correspond.
+   */
+
+  private List<SchedulerStateActions> prioritizedGroups = new ArrayList<>();
+
+  /**
+   * Cluster-wide association of YARN container IDs to tasks.
+   */
+
+  private Set<ContainerId> allocatedContainers = new HashSet<>();
+
+  /**
+   * Cluster-wide list of active tasks. Allows lookup from container ID to task
+   * (and then from task to task type.)
+   */
+
+  private Map<ContainerId, Task> activeContainers = new HashMap<>();
+
+  /**
+   * Tracks the tasks that have completed: either successfully (state == ENDED)
+   * or failed (state == FAILED). Eventually store this information elsewhere to
+   * avoid cluttering memory with historical data. Entries here are static
+   * copies, preserving the state at the time that the task completed.
+   */
+
+  private List<Task> completedTasks = new LinkedList<>();
+
+  /**
+   * Wrapper around the YARN API. Abstracts the details of YARN operations.
+   */
+
+  private final AMYarnFacade yarn;
+
+  /**
+   * Maximum number of new tasks to start on each "pulse" tick.
+   */
+
+  private int maxRequestsPerTick = 2;
+
+  private int stopTimoutMs = 10_000;
+
+  /**
+   * Time (in ms) between request to YARN to get an updated list of the node
+   * "inventory".
+   */
+
+  private int configPollPeriod = 60_000;
+  private long nextResourcePollTime;
+
+  /**
+   * List of nodes available in the cluster. Necessary as part of the process of
+   * ensuring that we run one Drillbit per node. (The YARN blacklist only half
+   * works for this purpose.)
+   */
+
+  private NodeInventory nodeInventory;
+
+  private long lastFailureCheckTime;
+
+  private int failureCheckPeriodMs = 60_000;
+
+  private int taskCheckPeriodMs = 10_000;
+  private long lastTaskCheckTime;
+
+  /**
+   * To increase code modularity, add-ons (such as the ZK monitor) register as
+   * lifecycle listeners that are alerted to "interesting" lifecycle events.
+   */
+
+  private List<TaskLifecycleListener> lifecycleListeners = new ArrayList<>();
+
+  /**
+   * Handy mechanism for setting properties on this controller that are
+   * available to plugins and UI without cluttering this class with member
+   * variables.
+   */
+
+  private Map<String, Object> properties = new HashMap<>();
+
+  /**
+   * When enabled, allows the controller to check for failures that result in no
+   * drillbits running. The controller will then automatically exit as no useful
+   * work can be done. Disable this to make debugging easier on a single-node
+   * cluster (lets you, say, start a "stray" drill bit and see what happens
+   * without the AM exiting.)
+   */
+
+  private boolean enableFailureCheck = true;
+
+  public ClusterControllerImpl(AMYarnFacade yarn) {
+    this.yarn = yarn;
+  }
+
+  @Override
+  public void enableFailureCheck(boolean flag) {
+    this.enableFailureCheck = flag;
+  }
+
+  /**
+   * Define a task type. Registration order is important: the controller starts
+   * task in the order that they are registered. Must happen before the YARN
+   * callbacks start.
+   *
+   * @param scheduler
+   */
+
+  @Override
+  public void registerScheduler(Scheduler scheduler) {
+    assert !taskPools.containsKey(scheduler.getName());
+    scheduler.setPriority(taskPools.size() + PRIORITY_OFFSET);
+    SchedulerStateActions taskGroup = new SchedulerStateImpl(this, scheduler);
+    taskPools.put(taskGroup.getName(), taskGroup);
+    prioritizedGroups.add(taskGroup);
+  }
+
+  /**
+   * Called when the caller has completed start-up and the controller should
+   * become live.
+   */
+
+  @Override
+  public synchronized void started() throws YarnFacadeException, AMException {
+    nodeInventory = new NodeInventory(yarn);
+
+    // Verify that no resource seeks a container larger than
+    // what YARN can provide. Ensures a graceful exit in this
+    // case.
+
+    Resource maxResource = yarn.getRegistrationResponse()
+        .getMaximumResourceCapability();
+    for (SchedulerStateActions group : prioritizedGroups) {
+      group.getScheduler().limitContainerSize(maxResource);
+    }
+    state = State.LIVE;
+  }
+
+  @Override
+  public synchronized void tick(long curTime) {
+    if (state == State.LIVE) {
+      adjustTasks(curTime);
+      requestContainers();
+    }
+    if (state == State.LIVE || state == State.ENDING) {
+      checkTasks(curTime);
+    }
+  }
+
+  /**
+   * Adjust the number of running tasks to match the desired level.
+   *
+   * @param curTime
+   */
+
+  private void adjustTasks(long curTime) {
+    if (enableFailureCheck && getFreeNodeCount() == 0) {
+      checkForFailure(curTime);
+    }
+    if (state != State.LIVE) {
+      return;
+    }
+    for (SchedulerStateActions group : prioritizedGroups) {
+      group.adjustTasks();
+    }
+  }
+
+  /**
+   * Get the approximate number of free YARN nodes (those that can
+   * accept a task request.) Starts with the number of nodes from
+   * the node inventory, then subtracts any in-flight requests (which
+   * do not, by definition, have node allocated.)
+   * <p>
+   * This approximation <b>does not</b> consider whether the node
+   * has sufficient resources to run a task; only whether the node
+   * itself exists.
+   * @return
+   */
+
+  @Override
+  public int getFreeNodeCount( ) {
+    int count = nodeInventory.getFreeNodeCount();
+    for (SchedulerStateActions group : prioritizedGroups) {
+      count -= group.getRequestCount( );
+    }
+    return Math.max( 0, count );
+  }
+
+  /**
+   * Check if the controller is unable to run any tasks. If so, and the option
+   * is enabled, then automatically exit since no useful work can be done.
+   *
+   * @param curTime
+   */
+
+  private void checkForFailure(long curTime) {
+    if (lastFailureCheckTime + failureCheckPeriodMs > curTime) {
+      return;
+    }
+    lastFailureCheckTime = curTime;
+    for (SchedulerStateActions group : prioritizedGroups) {
+      if (group.getTaskCount() > 0) {
+        return;
+      }
+    }
+    LOG.error(
+        "Application failure: no tasks are running and no nodes are available -- exiting.");
+    terminate(State.FAILED);
+  }
+
+  /**
+   * Periodically check tasks, handling any timeout issues.
+   *
+   * @param curTime
+   */
+
+  private void checkTasks(long curTime) {
+
+    // Check periodically, not on every tick.
+
+    if (lastTaskCheckTime + taskCheckPeriodMs > curTime) {
+      return;
+    }
+    lastTaskCheckTime = curTime;
+
+    // Check for task timeouts in states that have a timeout.
+
+    EventContext context = new EventContext(this);
+    for (SchedulerStateActions group : prioritizedGroups) {
+      context.setGroup(group);
+      group.checkTasks(context, curTime);
+    }
+  }
+
+  /**
+   * Get an update from YARN on available resources.
+   */
+
+  @Override
+  public void updateRMStatus() {
+    long curTime = System.currentTimeMillis();
+    if (nextResourcePollTime > curTime) {
+      return;
+    }
+
+    // yarnNodeCount = yarn.getNodeCount();
+    // LOG.info("YARN reports " + yarnNodeCount + " nodes.");
+
+    // Resource yarnResources = yarn.getResources();
+    // if (yarnResources != null) {
+    // LOG.info("YARN reports " + yarnResources.getMemory() + " MB, " +
+    // yarnResources.getVirtualCores()
+    // + " vcores available.");
+    // }
+    nextResourcePollTime = curTime + configPollPeriod;
+  }
+
+  /**
+   * Request any containers that have accumulated.
+   */
+
+  private void requestContainers() {
+    EventContext context = new EventContext(this);
+    for (SchedulerStateActions group : prioritizedGroups) {
+      context.setGroup(group);
+      if (group.requestContainers(context, maxRequestsPerTick)) {
+        break;
+      }
+    }
+  }
+
+  @Override
+  public synchronized void containersAllocated(List<Container> containers) {
+    EventContext context = new EventContext(this);
+    for (Container container : containers) {
+      if (allocatedContainers.contains(container.getId())) {
+        continue;
+      }
+
+      // We should never get a container on a node in the blacklist we
+      // sent to YARN. If we do, something is wrong. Log the error and
+      // reject the container. Else, bad things happen further along as
+      // the tracking mechanisms assume one task per node.
+
+      String host = container.getNodeId().getHost();
+      if (nodeInventory.isInUse(host)) {
+        LOG.error( "Host is in use, but YARN allocated a container: " +
+                   DoYUtil.labelContainer(container) + " - container rejected." );
+        yarn.releaseContainer(container);
+        continue;
+      }
+
+      // The container is fine.
+
+      allocatedContainers.add(container.getId());
+      int priority = container.getPriority().getPriority();
+      int offset = priority - PRIORITY_OFFSET;
+      if (offset < 0 || offset > prioritizedGroups.size()) {
+        LOG.error("Container allocated with unknown priority " + DoYUtil.labelContainer(container));
+        continue;
+      }
+      context.setGroup(prioritizedGroups.get(offset));
+      context.group.containerAllocated(context, container);
+    }
+  }
+
+  @Override
+  public synchronized void containerStarted(ContainerId containerId) {
+    Task task = getTask(containerId);
+    if (task == null) {
+      return;
+    }
+    EventContext context = new EventContext(this, task);
+    context.getState().containerStarted(context);
+    LOG.trace("Container started: " + containerId);
+  }
+
+  @Override
+  public synchronized void taskStartFailed(ContainerId containerId,
+      Throwable t) {
+    Task task = getTask(containerId);
+    if (task == null) {
+      return;
+    }
+    EventContext context = new EventContext(this, task);
+    context.getState().launchFailed(context, t);
+  }
+
+  private Task getTask(ContainerId containerId) {
+    return activeContainers.get(containerId);
+  }
+
+  @Override
+  public synchronized void containerStopped(ContainerId containerId) {
+    // Ignored because the node manager notification is very
+    // unreliable. Better to rely on the Resource Manager
+    // completion request.
+    // Task task = getTask(containerId);
+    // if (task == null) {
+    // return; }
+    // EventContext context = new EventContext(this, task);
+    // context.getState().containerStopped(context);
+  }
+
+  @Override
+  public synchronized void containersCompleted(List<ContainerStatus> statuses) {
+    EventContext context = new EventContext(this);
+    for (ContainerStatus status : statuses) {
+      Task task = getTask(status.getContainerId());
+      if (task == null) {
+        if (task == null) {
+          // Will occur if a container was allocated but rejected.
+          // Any other occurrence is unexpected and an error.
+
+          LOG.warn("Container completed but no associated task state: " + status.getContainerId() );
+        }
+        continue;
+      }
+      context.setTask(task);
+      context.getState().containerCompleted(context, status);
+    }
+    checkStatus();
+  }
+
+  @Override
+  public synchronized float getProgress() {
+    int numerator = 0;
+    int denominator = 0;
+    for (SchedulerStateActions group : taskPools.values()) {
+      Scheduler sched = group.getScheduler();
+      int[] progress = sched.getProgress();
+      numerator += progress[0];
+      denominator += progress[1];
+    }
+    if (numerator == 0) {
+      return 1;
+    }
+    return (float) denominator / (float) numerator;
+  }
+
+  @Override
+  public synchronized void stopTaskFailed(ContainerId containerId,
+      Throwable t) {
+    Task task = getTask(containerId);
+    if (task == null) {
+      return;
+    }
+    EventContext context = new EventContext(this, task);
+    context.getState().stopTaskFailed(context, t);
+  }
+
+  @Override
+  public synchronized void resizeDelta(int delta) {
+    // TODO: offer the delta to each scheduler in turn.
+    // For now, we support only one scheduler.
+
+    prioritizedGroups.get(0).getScheduler().change(delta);
+  }
+
+  @Override
+  public synchronized int resizeTo(int n) {
+    // TODO: offer the delta to each scheduler in turn.
+    // For now, we support only one scheduler.
+
+    return prioritizedGroups.get(0).getScheduler().resize(n);
+  }
+
+  @Override
+  public synchronized void shutDown() {
+    LOG.info("Shut down request received");
+    this.state = State.ENDING;
+    EventContext context = new EventContext(this);
+    for (SchedulerStateActions group : prioritizedGroups) {
+      group.shutDown(context);
+    }
+    checkStatus();
+  }
+
+  @Override
+  public boolean waitForCompletion() {
+    start();
+    synchronized (completionMutex) {
+      try {
+        completionMutex.wait();
+        LOG.info("Controller shut down completed");
+      } catch (InterruptedException e) {
+        // Should not happen
+      }
+    }
+    return succeeded();
+  }
+
+  private void start() {
+    yarnReport();
+  }
+
+  private void yarnReport() {
+    RegisterApplicationMasterResponse response = yarn.getRegistrationResponse();
+    LOG.info("YARN queue: " + response.getQueue());
+    Resource resource = response.getMaximumResourceCapability();
+    LOG.info("YARN max resource: " + resource.getMemory() + " MB, "
+        + resource.getVirtualCores() + " cores");
+    EnumSet<SchedulerResourceTypes> types = response
+        .getSchedulerResourceTypes();
+    StringBuilder buf = new StringBuilder();
+    String sep = "";
+    for (SchedulerResourceTypes type : types) {
+      buf.append(sep);
+      buf.append(type.toString());
+      sep = ", ";
+    }
+    LOG.info("YARN scheduler resource types: " + buf.toString());
+  }
+
+  /**
+   * Check for overall completion. We are done when either we've successfully
+   * run all tasks, or we've run some and given up on others. We're done when
+   * the number of completed or failed tasks reaches our target.
+   */
+
+  private void checkStatus() {
+    if (state != State.ENDING) {
+      return;
+    }
+    for (SchedulerStateActions group : prioritizedGroups) {
+      if (!group.isDone()) {
+        return;
+      }
+    }
+    terminate(State.ENDED);
+  }
+
+  private void terminate(State state) {
+    this.state = state;
+    synchronized (completionMutex) {
+      completionMutex.notify();
+    }
+  }
+
+  public boolean isLive() {
+    return state == State.LIVE;
+  }
+
+  public boolean succeeded() {
+    return state == State.ENDED;
+  }
+
+  public void containerAllocated(Task task) {
+    activeContainers.put(task.getContainerId(), task);
+  }
+
+  public AMYarnFacade getYarn() {
+    return yarn;
+  }
+
+  public void containerReleased(Task task) {
+    activeContainers.remove(task.getContainerId());
+  }
+
+  public void taskEnded(Task task) {
+    completedTasks.add(task);
+  }
+
+  public void taskRetried(Task task) {
+    Task copy = task.copy();
+    copy.disposition = Task.Disposition.RETRIED;
+    completedTasks.add(copy);
+  }
+
+  public void taskGroupCompleted(SchedulerStateActions taskGroup) {
+    checkStatus();
+  }
+
+  public int getMaxRetries() {
+    return maxRetries;
+  }
+
+  public int getStopTimeoutMs() {
+    return stopTimoutMs;
+  }
+
+  @Override
+  public synchronized void reserveHost(String hostName) {
+    nodeInventory.reserve(hostName);
+  }
+
+  @Override
+  public synchronized void releaseHost(String hostName) {
+    nodeInventory.release(hostName);
+  }
+
+  public NodeInventory getNodeInventory() {
+    return nodeInventory;
+  }
+
+  @Override
+  public void setProperty(String key, Object value) {
+    properties.put(key, value);
+  }
+
+  @Override
+  public Object getProperty(String key) {
+    return properties.get(key);
+  }
+
+  @Override
+  public void registerLifecycleListener(TaskLifecycleListener listener) {
+    lifecycleListeners.add(listener);
+  }
+
+  public void fireLifecycleChange(Event event, EventContext context) {
+    for (TaskLifecycleListener listener : lifecycleListeners) {
+      listener.stateChange(event, context);
+    }
+  }
+
+  @Override
+  public void setMaxRetries(int value) {
+    maxRetries = value;
+  }
+
+  @Override
+  public int getTargetCount() {
+    int count = 0;
+    for (SchedulerStateActions group : prioritizedGroups) {
+      count += group.getScheduler().getTarget();
+    }
+    return count;
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public synchronized void visit(ControllerVisitor visitor) {
+    visitor.visit(this);
+  }
+
+  public List<SchedulerStateActions> getPools() {
+    return prioritizedGroups;
+  }
+
+  @Override
+  public synchronized void visitTasks(TaskVisitor visitor) {
+    for (SchedulerStateActions pool : prioritizedGroups) {
+      pool.visitTaskModels(visitor);
+    }
+  }
+
+  public List<Task> getHistory() {
+    return completedTasks;
+  }
+
+  @Override
+  public boolean isTaskLive(int id) {
+    for (SchedulerStateActions group : prioritizedGroups) {
+      Task task = group.getTask(id);
+      if (task != null) {
+        return task.isLive();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized boolean cancelTask(int id) {
+    for (SchedulerStateActions group : prioritizedGroups) {
+      Task task = group.getTask(id);
+      if (task != null) {
+        group.cancel(task);
+        group.getScheduler().change(-1);
+        return true;
+      }
+    }
+    LOG.warn( "Requested to cancel task, but no task found: " + id );
+    return false;
+  }
+
+  @Override
+  public synchronized void completionAck(Task task, String propertyKey) {
+    EventContext context = new EventContext(this);
+    context.setTask(task);
+    context.getState().completionAck(context);
+    if (propertyKey != null) {
+      task.properties.remove(propertyKey);
+    }
+  }
+
+  @Override
+  public synchronized void startAck(Task task, String propertyKey,
+      Object value) {
+    if (propertyKey != null && value != null) {
+      task.properties.put(propertyKey, value);
+    }
+    EventContext context = new EventContext(this);
+    context.setTask(task);
+    context.getState().startAck(context);
+  }
+
+  @Override
+  public boolean supportsDiskResource() {
+    return getYarn().supportsDiskResource();
+  }
+
+  @Override
+  public void registryDown() { shutDown( ); }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java
new file mode 100644
index 0000000..b8d6e06
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public interface ControllerFactory {
+  public static class ControllerFactoryException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public ControllerFactoryException(String msg, Exception e) {
+      super(msg, e);
+    }
+  }
+
+  Dispatcher build() throws ControllerFactoryException;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java
new file mode 100644
index 0000000..5774d7d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public interface ControllerVisitor {
+  void visit(ClusterController controller);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java
new file mode 100644
index 0000000..f5257e6
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.AMRegistrar.AMRegistrationException;
+import org.apache.drill.yarn.appMaster.AMYarnFacade.YarnAppHostReport;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+/**
+ * Dispatches YARN, timer and ZooKeeper events to the cluster controller.
+ * Allows the controller to be independent of the plumbing needed to
+ * receive events. Divides work among
+ * various components to separate concerns. Three streams of events
+ * feed into an app master "strategy". The three streams are
+ * <ol>
+ * <li>Resource manager</li>
+ * <li>Node manager</li>
+ * <li>Timer</li>
+ * </ol>
+ * <p>
+ * This class is "lightly" multi-threaded: it responds to events
+ * from the RM, NM and timer threads. Within each of these, events
+ * are sequential. So, synchronization is needed across the three event
+ * types, but not within event types. (That is, we won't see two RM events,
+ * say, occurring at the same time from separate threads.)
+ */
+
+public class Dispatcher
+{
+  private static final Log LOG = LogFactory.getLog(Dispatcher.class);
+
+  /**
+   * Handle YARN Resource Manager events. This is a separate class to clarify
+   * which events are from the Resource Manager.
+   */
+
+  private class ResourceCallback implements AMRMClientAsync.CallbackHandler {
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+      LOG.trace("NM: Containers allocated: " + containers.size());
+      controller.containersAllocated(containers);
+    }
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+      LOG.trace("NM: Containers completed: " + statuses.size());
+      controller.containersCompleted(statuses);
+    }
+
+    @Override
+    public void onShutdownRequest() {
+      LOG.trace("RM: Shutdown request");
+      controller.shutDown();
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {
+      LOG.trace("RM: Nodes updated, count= " + updatedNodes.size());
+    }
+
+    @Override
+    public float getProgress() {
+      // getProgress is called on each fetch from the NM response queue.
+      // This is a good time to update status, even if it looks a bit
+      // bizarre...
+
+      controller.updateRMStatus();
+      return controller.getProgress();
+    }
+
+    @Override
+    public void onError(Throwable e) {
+      LOG.error("Fatal RM Error: " + e.getMessage());
+      LOG.error("AM Shutting down!");
+      controller.shutDown();
+    }
+  }
+
+  /**
+   * Handle YARN Node Manager events. This is a separate class to clarify which
+   * events are, in fact, from the node manager.
+   */
+
+  public class NodeCallback implements NMClientAsync.CallbackHandler {
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      LOG.trace("CNM: ontainer start error: " + containerId, t);
+      controller.taskStartFailed(containerId, t);
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      LOG.trace("NM: Container started: " + containerId);
+      controller.containerStarted(containerId);
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+      LOG.trace("NM: Container status: " + containerId + " - "
+          + containerStatus.toString());
+    }
+
+    @Override
+    public void onGetContainerStatusError(ContainerId containerId,
+        Throwable t) {
+      LOG.trace("NM: Container error: " + containerId, t);
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+      LOG.trace("NM: Stop container error: " + containerId, t);
+      controller.stopTaskFailed(containerId, t);
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+      LOG.trace("NM: Container stopped: " + containerId);
+      controller.containerStopped(containerId);
+    }
+  }
+
+  /**
+   * Handle timer events: a constant tick to handle time-based actions such as
+   * timeouts.
+   */
+
+  public class TimerCallback implements PulseRunnable.PulseCallback {
+    /**
+     * The lifecycle of each task is driven by RM and NM callbacks. We use the
+     * timer to start the process. While this is overkill here, in a real app,
+     * we'd check requested resource levels (which might change) and number of
+     * tasks (which might change if tasks die), and take corrective action:
+     * adding or removing tasks.
+     */
+
+    @Override
+    public void onTick(long curTime) {
+      for (Pollable pollable : pollables) {
+        pollable.tick(curTime);
+      }
+      controller.tick(curTime);
+    }
+  }
+
+  private AMYarnFacade yarn;
+  private ClusterController controller;
+
+  /**
+   * Add-on tools that are called once on each timer tick.
+   */
+
+  private List<Pollable> pollables = new ArrayList<>();
+
+  /**
+   * Add-ons for which the dispatcher should managed the start/end lifecycle.
+   */
+
+  private List<DispatcherAddOn> addOns = new ArrayList<>();
+  private String trackingUrl;
+  private AMRegistrar amRegistrar;
+  private int httpPort;
+  private PulseRunnable timer;
+  private Thread pulseThread;
+  private final int timerPeriodMs;
+
+  public Dispatcher(int timerPeriodMs) {
+    this.timerPeriodMs = timerPeriodMs;
+  }
+
+  public void setYarn(AMYarnFacade yarn) throws YarnFacadeException {
+    this.yarn = yarn;
+    controller = new ClusterControllerImpl(yarn);
+  }
+
+  public ClusterController getController() {
+    return controller;
+  }
+
+  public void registerPollable(Pollable pollable) {
+    pollables.add(pollable);
+  }
+
+  public void registerAddOn(DispatcherAddOn addOn) {
+    addOns.add(addOn);
+  }
+
+  public void setHttpPort(int port) {
+    httpPort = port;
+  }
+
+  public void setTrackingUrl(String trackingUrl) {
+    this.trackingUrl = trackingUrl;
+  }
+
+  public String getTrackingUrl() {
+    return yarn.getTrackingUrl();
+  }
+
+  public void setAMRegistrar(AMRegistrar registrar) {
+    amRegistrar = registrar;
+  }
+
+  /**
+   * Start the dispatcher by initializing YARN and registering the AM.
+   *
+   * @return true if successful, false if the dispatcher did not start.
+   */
+
+  public boolean start() throws YarnFacadeException {
+
+    // Start the connection to YARN to get information about this app, and to
+    // create a session we can use to report problems.
+
+    try {
+      setup();
+    } catch (AMException e) {
+      String msg = e.getMessage();
+      LOG.error("Fatal error: " + msg);
+      yarn.finish(false, msg);
+      return false;
+    }
+
+    // Ensure that this is the only AM. If not, shut down the AM,
+    // reporting to YARN that this is a failure and the message explaining
+    // the conflict. Report this as a SUCCESS run so that YARN does not
+    // attempt to retry the AM.
+
+    try {
+      register();
+    } catch (AMRegistrationException e) {
+      LOG.error(e.getMessage(), e);
+      yarn.finish(true, e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  public void run() throws YarnFacadeException {
+    // Only if registration is successful do we start the pulse thread
+    // which will cause containers to be requested.
+
+    startTimer();
+
+    // Run until the controller decides to shut down.
+
+    LOG.trace("Running");
+    boolean success = controller.waitForCompletion();
+
+    // Shut down.
+
+    LOG.trace("Finishing");
+    finish(success, null);
+  }
+
+  private void setup() throws YarnFacadeException, AMException {
+    LOG.trace("Starting YARN agent");
+    yarn.start(new ResourceCallback(), new NodeCallback());
+    String url = trackingUrl.replace("<port>", Integer.toString(httpPort));
+    if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) {
+      url = url.replace("http:", "https:");
+    }
+    LOG.trace("Registering YARN application, URL: " + url);
+    yarn.register(url);
+    controller.started();
+
+    for (DispatcherAddOn addOn : addOns) {
+      addOn.start(controller);
+    }
+  }
+
+  private void register() throws AMRegistrationException {
+    if (amRegistrar == null) {
+      LOG.warn(
+          "No AM Registrar provided: cannot check if this is the only AM for the Drill cluster.");
+    } else {
+      YarnAppHostReport rpt = yarn.getAppHostReport();
+      amRegistrar.register(rpt.amHost, httpPort, rpt.appId);
+    }
+  }
+
+  private void startTimer() {
+    timer = new PulseRunnable(timerPeriodMs, new TimerCallback());
+
+    // Start the pulse thread after registering so that we're in
+    // a state where we can interact with the RM.
+
+    pulseThread = new Thread(timer);
+    pulseThread.setName("Pulse");
+    pulseThread.start();
+  }
+
+  private void finish(boolean success, String msg) throws YarnFacadeException {
+    for (DispatcherAddOn addOn : addOns) {
+      addOn.finish(controller);
+    }
+
+    LOG.trace("Shutting down YARN agent");
+
+    // Stop the timer thread first. This ensures that the
+    // timer events don't try to use the YARN API during
+    // shutdown.
+
+    stopTimer();
+    yarn.finish(success, msg);
+  }
+
+  private void stopTimer() {
+    timer.stop();
+    try {
+      pulseThread.join();
+    } catch (InterruptedException e) {
+      // Ignore
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java
new file mode 100644
index 0000000..5c7100b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+/**
+ * Interface for an add-on to the dispatcher that
+ * should be started at start of the run and ended
+ * at the end of the run.
+ */
+
+public interface DispatcherAddOn {
+  void start(ClusterController controller);
+
+  void finish(ClusterController controller);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java
new file mode 100644
index 0000000..c0db9a1
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.ControllerFactory.ControllerFactoryException;
+import org.apache.drill.yarn.appMaster.http.WebServer;
+import org.apache.drill.yarn.core.DoyConfigException;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+
+/**
+ * Application Master for Drill. The name is visible when using the "jps"
+ * command and is chosen to make sense on a busy YARN node.
+ * <p>
+ * To debug this AM use the customized unmanaged AM launcher in this
+ * jar. (The "stock" YARN version does not give you time to attach
+ * the debugger.)
+ * <pre><code>
+ * TARGET_JAR=/your-git-folder/drill-yarn/target/drill-yarn-1.6-SNAPSHOT.jar
+ * TARGET_CLASS=org.apache.drill.yarn.appMaster.ApplicationMaster
+ * LAUNCHER_JAR=$TARGET_JAR
+ * LAUNCHER_CLASS=org.apache.drill.yarn.mock.UnmanagedAMLauncher
+ * $HH/bin/hadoop jar $LAUNCHER_JAR \
+ *   $LAUNCHER_CLASS -classpath $TARGET_JAR \
+ *   -cmd "java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 \
+ *   $TARGET_CLASS"
+ * </pre></code>
+ */
+
+public class DrillApplicationMaster {
+  private static final Log LOG = LogFactory
+      .getLog(DrillApplicationMaster.class);
+
+  public static void main(String[] args) {
+    LOG.trace("Drill Application Master starting.");
+
+    // Load the configuration. Assumes that the user's Drill-on-YARN
+    // configuration was archived along with the Drill software in
+    // the $DRILL_HOME/conf directory, and that $DRILL_HOME/conf is
+    // on the class-path.
+
+    try {
+      DrillOnYarnConfig.load().setAmDrillHome();
+    } catch (DoyConfigException e) {
+      System.err.println(e.getMessage());
+      System.exit(-1);
+    }
+
+    // Build the dispatcher using the Drillbit factory. Allows inserting
+    // other factories for testing, or if we need to manage a cluster of
+    // processes other than Drillbits.
+
+    // Dispatcher am = (new SimpleBatchFactory( )).build( );
+    // Dispatcher am = (new MockDrillbitFactory( )).build( );
+    Dispatcher dispatcher;
+    try {
+      dispatcher = (new DrillControllerFactory()).build();
+    } catch (ControllerFactoryException e) {
+      LOG.error("Setup failed, exiting: " + e.getMessage(), e);
+      System.exit(-1);
+      return;
+    }
+
+    // Start the Dispatcher. This will return false if this AM conflicts with
+    // a running AM.
+
+    try {
+      if (!dispatcher.start()) {
+        return;
+      }
+    } catch (Throwable e) {
+      LOG.error("Fatal error, exiting: " + e.getMessage(), e);
+      System.exit(-1);
+    }
+
+    // Create and start the web server. Do this after starting the AM
+    // so that we don't learn about a conflict via the a web server port
+    // conflict.
+
+    WebServer webServer = new WebServer(dispatcher);
+    try {
+      webServer.start();
+    } catch (Exception e) {
+      LOG.error("Web server setup failed, exiting: " + e.getMessage(), e);
+      System.exit(-1);
+    }
+
+    // Run the dispatcher until the cluster shuts down.
+
+    try {
+      dispatcher.run();
+    } catch (Throwable e) {
+      LOG.error("Fatal error, exiting: " + e.getMessage(), e);
+      System.exit(-1);
+    } finally {
+      try {
+        webServer.close();
+      } catch (Exception e) {
+        // Ignore
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java
new file mode 100644
index 0000000..013fdba
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.drill.yarn.core.DfsFacade;
+import org.apache.drill.yarn.core.DfsFacade.DfsFacadeException;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.drill.yarn.core.DoyConfigException;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.drill.yarn.core.LaunchSpec;
+import org.apache.drill.yarn.appMaster.http.AMSecurityManagerImpl;
+import org.apache.drill.yarn.core.ClusterDef;
+import org.apache.drill.yarn.zk.ZKClusterCoordinatorDriver;
+import org.apache.drill.yarn.zk.ZKRegistry;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+
+import com.typesafe.config.Config;
+
+/**
+ * Builds a controller for a cluster of Drillbits. The AM is designed to be
+ * mostly generic; only this class contains knowledge that the tasks being
+ * managed are drillbits. This design ensures that we can add other Drill
+ * components in the future without the need to make major changes to the AM
+ * logic.
+ * <p>
+ * The controller consists of a generic dispatcher and cluster controller, along
+ * with a Drill-specific scheduler and task launch specification. Drill also
+ * includes an interface to ZooKeeper to monitor Drillbits.
+ * <p>
+ * The AM is launched by YARN. All it knows is what is in its launch environment
+ * or configuration files. The client must set up all the information that the
+ * AM needs. Static information appears in configuration files. But, dynamic
+ * information (or that which is inconvenient to repeat in configuration files)
+ * must arrive in environment variables. See {@link DrillOnYarnConfig} for more
+ * information.
+ */
+
+public class DrillControllerFactory implements ControllerFactory {
+  private static final Log LOG = LogFactory.getLog(DrillControllerFactory.class);
+  private Config config = DrillOnYarnConfig.config();
+  private String drillArchivePath;
+  private String siteArchivePath;
+  private boolean localized;
+
+  @Override
+  public Dispatcher build() throws ControllerFactoryException {
+    LOG.info(
+        "Initializing AM for " + config.getString(DrillOnYarnConfig.APP_NAME));
+    Dispatcher dispatcher;
+    try {
+      Map<String, LocalResource> resources = prepareResources();
+
+      TaskSpec taskSpec = buildDrillTaskSpec(resources);
+
+      // Prepare dispatcher
+
+      int timerPeriodMs = config.getInt(DrillOnYarnConfig.AM_TICK_PERIOD_MS);
+      dispatcher = new Dispatcher(timerPeriodMs);
+      int pollPeriodMs = config.getInt(DrillOnYarnConfig.AM_POLL_PERIOD_MS);
+      AMYarnFacadeImpl yarn = new AMYarnFacadeImpl(pollPeriodMs);
+      dispatcher.setYarn(yarn);
+      dispatcher.getController()
+          .setMaxRetries(config.getInt(DrillOnYarnConfig.DRILLBIT_MAX_RETRIES));
+
+      int requestTimeoutSecs = DrillOnYarnConfig.config().getInt( DrillOnYarnConfig.DRILLBIT_REQUEST_TIMEOUT_SEC);
+      int maxExtraNodes = DrillOnYarnConfig.config().getInt(DrillOnYarnConfig.DRILLBIT_MAX_EXTRA_NODES);
+
+      // Assume basic scheduler for now.
+      ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0);
+      Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec,
+          pool.getCount(), requestTimeoutSecs, maxExtraNodes);
+      dispatcher.getController().registerScheduler(testGroup);
+      pool.modifyTaskSpec(taskSpec);
+
+      // ZooKeeper setup
+
+      buildZooKeeper(config, dispatcher);
+    } catch (YarnFacadeException | DoyConfigException e) {
+      throw new ControllerFactoryException("Drill AM intitialization failed", e);
+    }
+
+    // Tracking Url
+    // TODO: HTTPS support
+
+    dispatcher.setHttpPort(config.getInt(DrillOnYarnConfig.HTTP_PORT));
+    String trackingUrl = null;
+    if (config.getBoolean(DrillOnYarnConfig.HTTP_ENABLED)) {
+      trackingUrl = "http://<host>:<port>/redirect";
+      dispatcher.setTrackingUrl(trackingUrl);
+    }
+
+    // Enable/disable check for auto shutdown when no nodes are running.
+
+    dispatcher.getController().enableFailureCheck(
+        config.getBoolean(DrillOnYarnConfig.AM_ENABLE_AUTO_SHUTDOWN));
+
+    // Define the security manager
+
+    AMSecurityManagerImpl.setup();
+
+    return dispatcher;
+  }
+
+  /**
+   * Prepare the files ("resources" in YARN terminology) that YARN should
+   * download ("localize") for the Drillbit. We need both the Drill software and
+   * the user's site-specific configuration.
+   *
+   * @return
+   * @throws YarnFacadeException
+   */
+
+  private Map<String, LocalResource> prepareResources()
+      throws YarnFacadeException {
+    try {
+      DfsFacade dfs = new DfsFacade(config);
+      localized = dfs.isLocalized();
+      if (!localized) {
+        return null;
+      }
+      dfs.connect();
+      Map<String, LocalResource> resources = new HashMap<>();
+      DrillOnYarnConfig drillConfig = DrillOnYarnConfig.instance();
+
+      // Localize the Drill archive.
+
+      drillArchivePath = drillConfig.getDrillArchiveDfsPath();
+      DfsFacade.Localizer localizer = new DfsFacade.Localizer(dfs,
+          drillArchivePath);
+      String key = config.getString(DrillOnYarnConfig.DRILL_ARCHIVE_KEY);
+      localizer.defineResources(resources, key);
+      LOG.info("Localizing " + drillArchivePath + " with key \"" + key + "\"");
+
+      // Localize the site archive, if any.
+
+      siteArchivePath = drillConfig.getSiteArchiveDfsPath();
+      if (siteArchivePath != null) {
+        localizer = new DfsFacade.Localizer(dfs, siteArchivePath);
+        key = config.getString(DrillOnYarnConfig.SITE_ARCHIVE_KEY);
+        localizer.defineResources(resources, key);
+        LOG.info("Localizing " + siteArchivePath + " with key \"" + key + "\"");
+      }
+      return resources;
+    } catch (DfsFacadeException e) {
+      throw new YarnFacadeException(
+          "Failed to get DFS status for Drill archive", e);
+    }
+  }
+
+  /**
+   * Constructs the Drill launch command. The launch uses the YARN-specific
+   * yarn-drillbit.sh script, setting up the required input environment
+   * variables.
+   * <p>
+   * This is an exercise in getting many details just right. The code here sets
+   * the environment variables required by (and documented in) yarn-drillbit.sh.
+   * The easiest way to understand this code is to insert an "echo" statement in
+   * drill-bit.sh to echo the launch command there. Then, look in YARN's NM
+   * private container directory for the launch_container.sh script to see the
+   * command generated by the following code. Compare the two to validate that
+   * the code does the right thing.
+   * <p>
+   * This class is very Linux-specific. The usual adjustments must be made to
+   * adapt it to Windows.
+   *
+   * @param config
+   * @return
+   * @throws DoyConfigException
+   */
+
+  private TaskSpec buildDrillTaskSpec(Map<String, LocalResource> resources)
+      throws DoyConfigException {
+    DrillOnYarnConfig doyConfig = DrillOnYarnConfig.instance();
+
+    // Drillbit launch description
+
+    ContainerRequestSpec containerSpec = new ContainerRequestSpec();
+    containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY);
+    containerSpec.vCores = config.getInt(DrillOnYarnConfig.DRILLBIT_VCORES);
+    containerSpec.disks = config.getDouble(DrillOnYarnConfig.DRILLBIT_DISKS);
+
+    LaunchSpec drillbitSpec = new LaunchSpec();
+
+    // The drill home location is either a non-localized location,
+    // or, more typically, the expanded Drill directory under the
+    // container's working directory. When the localized directory,
+    // we rely on the fact that the current working directory is
+    // set to the container directory, so we just need the name
+    // of the Drill folder under the cwd.
+
+    String drillHome = doyConfig.getRemoteDrillHome();
+    drillbitSpec.env.put("DRILL_HOME", drillHome);
+    LOG.trace("Drillbit DRILL_HOME: " + drillHome);
+
+    // Heap memory
+
+    addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_HEAP, "DRILL_HEAP");
+
+    // Direct memory
+
+    addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_DIRECT_MEM,
+        "DRILL_MAX_DIRECT_MEMORY");
+
+    // Code cache
+
+    addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_CODE_CACHE,
+        "DRILLBIT_CODE_CACHE_SIZE");
+
+    // Any additional VM arguments from the config file.
+
+    addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_VM_ARGS,
+        "DRILL_JVM_OPTS");
+
+    // Any user-specified library path
+
+    addIfSet(drillbitSpec, DrillOnYarnConfig.JAVA_LIB_PATH,
+        DrillOnYarnConfig.DOY_LIBPATH_ENV_VAR);
+
+    // Drill logs.
+    // Relies on the LOG_DIR_EXPANSION_VAR marker which is replaced by
+    // the container log directory.
+
+    if (!config.getBoolean(DrillOnYarnConfig.DISABLE_YARN_LOGS)) {
+      drillbitSpec.env.put("DRILL_YARN_LOG_DIR",
+          ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+    }
+
+    // Debug option.
+
+    if (config.getBoolean(DrillOnYarnConfig.DRILLBIT_DEBUG_LAUNCH)) {
+      drillbitSpec.env.put(DrillOnYarnConfig.DRILL_DEBUG_ENV_VAR, "1");
+    }
+
+    // Hadoop home should be set in drill-env.sh since it is needed
+    // for client launch as well as the AM.
+
+    // addIfSet( drillbitSpec, DrillOnYarnConfig.HADOOP_HOME, "HADOOP_HOME" );
+
+    // Garbage collection (gc) logging. In drillbit.sh logging can be
+    // configured to go anywhere. In YARN, all logs go to the YARN log
+    // directory; the gc log file is always called "gc.log".
+
+    if (config.getBoolean(DrillOnYarnConfig.DRILLBIT_LOG_GC)) {
+      drillbitSpec.env.put("ENABLE_GC_LOG", "1");
+    }
+
+    // Class path additions.
+
+    addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_PREFIX_CLASSPATH,
+        DrillOnYarnConfig.DRILL_CLASSPATH_PREFIX_ENV_VAR);
+    addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_CLASSPATH,
+        DrillOnYarnConfig.DRILL_CLASSPATH_ENV_VAR);
+
+    // Drill-config.sh has specific entries for Hadoop and Hbase. To prevent
+    // an endless number of such one-off cases, we add a general extension
+    // class path. But, we retain Hadoop and Hbase for backward compatibility.
+
+    addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_EXTN_CLASSPATH,
+        "EXTN_CLASSPATH");
+    addIfSet(drillbitSpec, DrillOnYarnConfig.HADOOP_CLASSPATH,
+        "DRILL_HADOOP_CLASSPATH");
+    addIfSet(drillbitSpec, DrillOnYarnConfig.HBASE_CLASSPATH,
+        "DRILL_HBASE_CLASSPATH");
+
+    // Note that there is no equivalent of niceness for YARN: YARN controls
+    // the niceness of its child processes.
+
+    // Drillbit launch script under YARN
+    // Here we can use DRILL_HOME because all env vars are set before
+    // issuing this command.
+
+    drillbitSpec.command = "$DRILL_HOME/bin/yarn-drillbit.sh";
+
+    // Configuration (site directory), if given.
+
+    String siteDirPath = doyConfig.getRemoteSiteDir();
+    if (siteDirPath != null) {
+      drillbitSpec.cmdArgs.add("--site");
+      drillbitSpec.cmdArgs.add(siteDirPath);
+    }
+
+    // Localized resources
+
+    if (resources != null) {
+      drillbitSpec.resources.putAll(resources);
+    }
+
+    // Container definition.
+
+    TaskSpec taskSpec = new TaskSpec();
+    taskSpec.name = "Drillbit";
+    taskSpec.containerSpec = containerSpec;
+    taskSpec.launchSpec = drillbitSpec;
+    taskSpec.maxRetries = config.getInt(DrillOnYarnConfig.DRILLBIT_MAX_RETRIES);
+    return taskSpec;
+  }
+
+  /**
+   * Utility method to create an environment variable in the process launch
+   * specification if a given Drill-on-YARN configuration variable is set,
+   * copying the config value to the environment variable.
+   *
+   * @param spec
+   * @param configParam
+   * @param envVar
+   */
+
+  public void addIfSet(LaunchSpec spec, String configParam, String envVar) {
+    String value = config.getString(configParam);
+    if (!DoYUtil.isBlank(value)) {
+      spec.env.put(envVar, value);
+    }
+  }
+
+  public static class ZKRegistryAddOn implements DispatcherAddOn {
+    ZKRegistry zkRegistry;
+
+    public ZKRegistryAddOn(ZKRegistry zkRegistry) {
+      this.zkRegistry = zkRegistry;
+    }
+
+    @Override
+    public void start(ClusterController controller) {
+      zkRegistry.start(controller);
+    }
+
+    @Override
+    public void finish(ClusterController controller) {
+      zkRegistry.finish(controller);
+    }
+  }
+
+  /**
+   * Create the Drill-on-YARN version of the ZooKeeper cluster coordinator.
+   * Compared to the Drill version, this one takes its parameters via a builder
+   * pattern in the form of the cluster coordinator driver.
+   *
+   * @param config
+   * @param dispatcher
+   */
+
+  private void buildZooKeeper(Config config, Dispatcher dispatcher) {
+    String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT);
+    String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT);
+    String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID);
+    int failureTimeoutMs = config
+        .getInt(DrillOnYarnConfig.ZK_FAILURE_TIMEOUT_MS);
+    int retryCount = config.getInt(DrillOnYarnConfig.ZK_RETRY_COUNT);
+    int retryDelayMs = config.getInt(DrillOnYarnConfig.ZK_RETRY_DELAY_MS);
+    int userPort = config.getInt(DrillOnYarnConfig.DRILLBIT_USER_PORT);
+    int bitPort = config.getInt(DrillOnYarnConfig.DRILLBIT_BIT_PORT);
+    ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver()
+        .setConnect(zkConnect, zkRoot, clusterId)
+        .setFailureTimoutMs(failureTimeoutMs)
+        .setRetryCount(retryCount)
+        .setRetryDelayMs(retryDelayMs)
+        .setPorts(userPort, bitPort, bitPort + 1);
+    ZKRegistry zkRegistry = new ZKRegistry(driver);
+    dispatcher.registerAddOn(new ZKRegistryAddOn(zkRegistry));
+
+    // The ZK driver is started and stopped in conjunction with the
+    // controller lifecycle.
+
+    dispatcher.getController().registerLifecycleListener(zkRegistry);
+
+    // The ZK driver also handles registering the AM for the cluster.
+
+    dispatcher.setAMRegistrar(driver);
+
+    // The UI needs access to ZK to report unmanaged drillbits. We use
+    // a property to avoid unnecessary code dependencies.
+
+    dispatcher.getController().setProperty(ZKRegistry.CONTROLLER_PROPERTY,
+        zkRegistry);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java
new file mode 100644
index 0000000..76936b5
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public class DrillbitScheduler extends AbstractDrillbitScheduler {
+  private int requestTimeoutSecs;
+  private int maxExtraNodes;
+
+
+  public DrillbitScheduler(String name, TaskSpec taskSpec, int quantity,
+                           int requestTimeoutSecs, int maxExtraNodes) {
+    super("basic", name, quantity);
+    this.taskSpec = taskSpec;
+    this.requestTimeoutSecs = requestTimeoutSecs;
+    this.maxExtraNodes = maxExtraNodes;
+  }
+
+  /**
+   * Set the number of running tasks to the quantity given.
+   * Limits the quantity to only a small margin above the number
+   * of estimated free YARN nodes. This avoids a common users error
+   * where someone requests 20 nodes on a 5-node cluster.
+   */
+
+  @Override
+  public int resize(int level) {
+    int limit = quantity + state.getController().getFreeNodeCount( ) +
+        maxExtraNodes;
+    return super.resize( Math.min( limit, level ) );
+  }
+
+  @Override
+  public int getRequestTimeoutSec() {
+    return requestTimeoutSecs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java
new file mode 100644
index 0000000..bec8cf9
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.drill.yarn.appMaster.Scheduler.TaskManager;
+
+public class EventContext {
+  public final AMYarnFacade yarn;
+  public final ClusterControllerImpl controller;
+  public SchedulerStateImpl group;
+  public Task task;
+
+  public EventContext(ClusterControllerImpl controller) {
+    yarn = controller.getYarn();
+    this.controller = controller;
+  }
+
+  public EventContext(ClusterController controller) {
+    this((ClusterControllerImpl) controller);
+  }
+
+  public EventContext(ClusterControllerImpl controller, Task task) {
+    this(controller);
+    setTask(task);
+  }
+
+  /**
+   * For testing only, omits the controller and YARN.
+   *
+   * @param task
+   */
+
+  public EventContext(Task task) {
+    controller = null;
+    yarn = null;
+    this.task = task;
+  }
+
+  public void setTask(Task task) {
+    this.task = task;
+    group = task.getGroup();
+  }
+
+  public TaskState getState() {
+    return task.state;
+  }
+
+  public void setGroup(SchedulerStateActions group) {
+    this.group = (SchedulerStateImpl) group;
+  }
+
+  public TaskManager getTaskManager() {
+    return group.getScheduler().getTaskManager();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java
new file mode 100644
index 0000000..ec20307
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+
+/**
+ * Creates an AM-side inventory of cluster nodes. Used to track node
+ * reservations (container allocations) to prevent requesting multiple
+ * containers on the same node. Tracks blacklisted nodes that have failed too
+ * often. Since YARN will discard our blacklist if we add to many nodes, tracks
+ * when a container is allocated on a blacklisted node and signals that the
+ * cluster is in a bad state.
+ */
+
+public class NodeInventory {
+  private static final Log LOG = LogFactory.getLog(NodeInventory.class);
+
+  /**
+   * Indicates the case in which we've failed so many nodes that YARN has
+   * cancelled some of our blacklist entries and we've received a container for
+   * a blacklisted node. At this point, we should stop adding new tasks else
+   * we'll get into a nasty loop.
+   */
+  private boolean failed;
+
+  private Map<String, String> nodeMap = new HashMap<>();
+
+  /**
+   * The set of nodes available that YARN reports are available.
+   * Not clear if these are all nodes in the cluster, or just those usable
+   * by the current app (when the app is associated to a queue that
+   * uses node labels.)
+   */
+
+  private Map<String, NodeReport> yarnNodes = new HashMap<>();
+
+  /**
+   * The set of nodes in use by Drill. Includes both nodes on which the AM
+   * has requested to run Drillbits, and those nodes found to be running
+   * "stray" Drillbits started outside of DoY.
+   */
+
+  private Set<String> nodesInUse = new HashSet<>();
+
+  /**
+   * Nodes that have failed (typically due to mis-configuration) and
+   * are to be excluded from future container requests.
+   */
+
+  private Set<String> blacklist = new HashSet<>();
+  private final AMYarnFacade yarn;
+
+  public NodeInventory(AMYarnFacade yarn) throws YarnFacadeException {
+    this.yarn = yarn;
+    buildNodeMap();
+  }
+
+  private void buildNodeMap() throws YarnFacadeException {
+    List<NodeReport> nodes = yarn.getNodeReports();
+    for (NodeReport node : nodes) {
+      String hostName = node.getNodeId().getHost();
+      nodeMap.put(hostName, node.getHttpAddress());
+      yarnNodes.put(hostName, node);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("YARN Node report");
+      for (NodeReport node : nodes) {
+        LOG.info("Node: " + node.getHttpAddress() + ", Rack: "
+            + node.getRackName() + " has " + node.getCapability().getMemory()
+            + " MB, " + node.getCapability().getVirtualCores()
+            + " vcores, labels: " + node.getNodeLabels());
+      }
+    }
+  }
+
+  public boolean isFailed() {
+    return failed;
+  }
+
+  public void reserve(Container container) {
+    reserve(container.getNodeId().getHost());
+  }
+
+  public void reserve(String hostName) {
+    if (blacklist.contains(hostName)) {
+      LOG.error( "Node to be reserved is in the blacklist: " + hostName );
+      failed = true;
+    }
+    if (nodesInUse.contains(hostName)) {
+      LOG.error( "Node to be reserved is already in use: " + hostName );
+      return;
+    }
+    if (!yarnNodes.containsKey(hostName)) {
+      LOG.warn( "Node to be reserved was not in YARN node inventory: " + hostName );
+    }
+    nodesInUse.add(hostName);
+    yarn.blacklistNode(hostName);
+  }
+
+  public void release(Container container) {
+    release(container.getNodeId().getHost());
+  }
+
+  public void release(String hostName) {
+    if (!yarnNodes.containsKey(hostName)) {
+      return;
+    }
+    nodesInUse.remove(hostName);
+    yarn.removeBlacklist(hostName);
+  }
+
+  public void blacklist(String hostName) {
+    if (!yarnNodes.containsKey(hostName)) {
+      return;
+    }
+    assert !nodesInUse.contains(hostName);
+    blacklist.add(hostName);
+    yarn.blacklistNode(hostName);
+    LOG.info("Node blacklisted: " + hostName);
+  }
+
+  /**
+   * Determine the number of free nodes in the YARN cluster. The free set is the
+   * set of all YARN nodes minus those that are allocated and those that are
+   * blacklisted. Note that a node might be both in use and blacklisted if
+   * DoY blacklists a node, but then the user starts a "stray" Drillbit on
+   * that same node.
+   * <p>
+   * This number is an approximation: the set of nodes managed by YARN can
+   * change any time, and in-flight container requests will consume a node,
+   * but since the request is not yet completed, we don't know which node
+   * will be assigned, so the node does not yet appear in the in-use list.
+   *
+   * @return an approximation of the free node count
+   */
+
+  public int getFreeNodeCount() {
+    Set<String> free = new HashSet<>( );
+    free.addAll( yarnNodes.keySet() );
+    free.removeAll( nodesInUse );
+    free.removeAll( blacklist );
+    return free.size( );
+  }
+
+  /**
+   * Return a copy of the blacklist (list of failed nodes) for use in display
+   * to the user or similar purpose.
+   *
+   * @return a copy of the blacklist.
+   */
+
+  public List<String> getBlacklist() {
+    List<String> copy = new ArrayList<>( );
+    copy.addAll(blacklist);
+    return copy;
+  }
+
+  /**
+   * Report if the given host name is in use.
+   *
+   * @param hostName
+   * @return true if the host is reserved (in use by a container) or
+   * blacklisted (failed.)
+   */
+
+  public boolean isInUse(String hostName) {
+    return blacklist.contains(hostName) || nodesInUse.contains(hostName);
+  }
+}


Mime
View raw message