http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java
new file mode 100644
index 0000000..01b89ae
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public abstract class AbstractScheduler implements Scheduler {
+ private static final Log LOG = LogFactory.getLog(AbstractScheduler.class);
+ private final String name;
+ private final String type;
+ protected TaskSpec taskSpec;
+ protected int priority;
+ protected int failCount;
+ protected TaskManager taskManager;
+ protected SchedulerState state;
+ protected boolean isTracked;
+
+ public AbstractScheduler(String type, String name) {
+ this.type = type;
+ this.name = name;
+ taskManager = new AbstractTaskManager();
+ }
+
+ public void setTaskManager(TaskManager taskManager) {
+ this.taskManager = taskManager;
+ }
+
+ @Override
+ public void registerState(SchedulerState state) {
+ this.state = state;
+ }
+
+ @Override
+ public void setPriority(int priority) {
+ this.priority = priority;
+ taskSpec.containerSpec.priority = priority;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public TaskManager getTaskManager() {
+ return taskManager;
+ }
+
+ @Override
+ public void change(int delta) {
+ resize(getTarget() + delta);
+ }
+
+ protected void addTasks(int n) {
+ LOG.info( "[" + getName( ) + "] - Adding " + n + " tasks" );
+ for (int i = 0; i < n; i++) {
+ state.start(new Task(this, taskSpec));
+ }
+ }
+
+ @Override
+ public boolean isTracked() {
+ return isTracked;
+ }
+
+ @Override
+ public ContainerRequestSpec getResource() {
+ return taskSpec.containerSpec;
+ }
+
+ @Override
+ public void limitContainerSize(Resource maxResource) throws AMException {
+ if (taskSpec.containerSpec.memoryMb > maxResource.getMemory()) {
+ LOG.warn(taskSpec.name + " requires " + taskSpec.containerSpec.memoryMb
+ + " MB but the maximum YARN container size is "
+ + maxResource.getMemory() + " MB");
+ taskSpec.containerSpec.memoryMb = maxResource.getMemory();
+ }
+ if (taskSpec.containerSpec.vCores > maxResource.getVirtualCores()) {
+ LOG.warn(taskSpec.name + " requires " + taskSpec.containerSpec.vCores
+ + " vcores but the maximum YARN container size is "
+ + maxResource.getVirtualCores() + " vcores");
+ taskSpec.containerSpec.vCores = maxResource.getVirtualCores();
+ }
+ }
+
+ @Override
+ public int getRequestTimeoutSec() { return 0; }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java
new file mode 100644
index 0000000..7acd402
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.drill.yarn.appMaster.Scheduler.TaskManager;
+import org.apache.drill.yarn.core.LaunchSpec;
+
+/**
+ * Task manager that does nothing.
+ */
+
+public class AbstractTaskManager implements TaskManager {
+ @Override
+ public int maxConcurrentAllocs() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void allocated(EventContext context) {
+ }
+
+ @Override
+ public LaunchSpec getLaunchSpec(Task task) {
+ return task.getLaunchSpec();
+ }
+
+ @Override
+ public boolean stop(Task task) { return false; }
+
+ @Override
+ public void completed(EventContext context) { }
+
+ @Override
+ public boolean isLive(EventContext context) { return true; }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java
new file mode 100644
index 0000000..8f3aaab
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public class BatchScheduler extends AbstractScheduler {
+ private int quantity;
+ private int completedCount;
+
+ public BatchScheduler(String name, int quantity) {
+ super("batch", name);
+ this.quantity = quantity;
+ }
+
+ @Override
+ public void completed(Task task) {
+ completedCount++;
+ if (task.getDisposition() != Task.Disposition.COMPLETED) {
+ failCount++;
+ }
+ }
+
+ @Override
+ public int resize(int level) { quantity = level; return quantity; }
+
+ @Override
+ public int getTarget() { return quantity; }
+
+ @Override
+ public int[] getProgress() {
+ return new int[] { Math.min(completedCount, quantity), quantity };
+ }
+
+ @Override
+ public void adjust() {
+ int activeCount = state.getTaskCount();
+ int delta = quantity - activeCount - completedCount;
+ if (delta < 0) {
+ addTasks(-delta);
+ }
+ if (delta > 0) {
+ cancelTasks(delta);
+ }
+ }
+
+ /**
+ * Cancel any starting tasks. We don't cancel launched, in-flight tasks
+ * because there is no way to tell YARN to cancel tasks that are in the
+ * process of being launched: we have to wait for them to start
+ * before canceling.
+ *
+ * @param n
+ */
+
+ private void cancelTasks(int n) {
+ for (Task task : state.getStartingTasks()) {
+ state.cancel(task);
+ if (--n == 0) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public boolean hasMoreTasks() {
+ return completedCount < quantity;
+ }
+
+ @Override
+ public void requestTimedOut() {
+ // Not clear what to do here. Since this case is used only for testing,
+ // deal with this case later.
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java
new file mode 100644
index 0000000..6aaa18b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ * Interface which identifies the cluster controller methods that are save to
+ * call from the {@link Dispatcher}. Methods here are either designed to be
+ * called before the event threads start or after they complete. The remainder
+ * synchronized to coordinate between event threads.
+ */
+
+public interface ClusterController extends RegistryHandler {
+ void enableFailureCheck(boolean flag);
+
+ void registerLifecycleListener(TaskLifecycleListener listener);
+
+ void registerScheduler(Scheduler resourceGroup);
+
+ void setProperty(String key, Object value);
+
+ Object getProperty(String key);
+
+ /**
+ * Called after the dispatcher has started YARN and other server
+ * components. The controller can now begin to spin up tasks.
+ */
+
+ void started( ) throws YarnFacadeException, AMException;
+
+ /**
+ * Called by the timer ("pulse") thread to trigger time-based events.
+ *
+ * @param curTime
+ */
+
+ void tick(long curTime);
+
+ /**
+ * The RM has allocated one or more containers in response to container
+ * requests submitted to the RM.
+ *
+ * @param containers
+ * the set of containers provided by YARN
+ * @return the set of tasks to launch
+ */
+
+ void containersAllocated(List<Container> containers);
+
+ /**
+ * The NM reports that a container has successfully started.
+ *
+ * @param containerId
+ * the container which started
+ */
+
+ void containerStarted(ContainerId containerId);
+
+ /**
+ * The RM API reports that an attempt to start a container has failed locally.
+ *
+ * @param containerId
+ * the container that failed to launch
+ * @param t
+ * the error that occurred
+ */
+
+ void taskStartFailed(ContainerId containerId, Throwable t);
+
+ /**
+ * The Node Manager reports that a container has stopped.
+ *
+ * @param containerId
+ */
+ void containerStopped(ContainerId containerId);
+
+ /**
+ * The Resource Manager reports that containers have completed with the given
+ * statuses. Find the task for each container and mark them as completed.
+ *
+ * @param statuses
+ */
+
+ void containersCompleted(List<ContainerStatus> statuses);
+
+ float getProgress();
+
+ /**
+ * The Node Manager API reports that a request sent to the NM to stop a task
+ * has failed.
+ *
+ * @param containerId
+ * the container that failed to stop
+ * @param t
+ * the reason that the stop request failed
+ */
+
+ void stopTaskFailed(ContainerId containerId, Throwable t);
+
+ /**
+ * Request to resize the Drill cluster by a relative amount.
+ *
+ * @param delta
+ * the amount of change. Can be positive (to grow) or negative (to
+ * shrink the cluster)
+ */
+
+ void resizeDelta(int delta);
+
+ /**
+ * Request to resize the Drill cluster to the given size.
+ *
+ * @param n
+ * the desired cluster size
+ */
+
+ int resizeTo(int n);
+
+ /**
+ * Indicates a request to gracefully shut down the cluster.
+ */
+
+ void shutDown();
+
+ /**
+ * Called by the main thread to wait for the normal shutdown of the
+ * controller. Such shutdown occurs when the admin sends a sutdown
+ * command from the UI or REST API.
+ *
+ * @return
+ */
+
+ boolean waitForCompletion();
+
+ void updateRMStatus();
+
+ void setMaxRetries(int value);
+
+ /**
+ * Allow an observer to see a consistent view of the controller's
+ * state by performing the visit in a synchronized block.
+ * @param visitor
+ */
+
+ void visit( ControllerVisitor visitor );
+
+ /**
+ * Allow an observer to see a consistent view of the controller's
+ * task state by performing the visit in a synchronized block.
+ *
+ * @param visitor
+ */
+
+ void visitTasks( TaskVisitor visitor );
+
+ /**
+ * Return the target number of tasks that the controller seeks to maintain.
+ * This is the sum across all pools.
+ *
+ * @return
+ */
+
+ int getTargetCount();
+
+ boolean isTaskLive(int id);
+
+ /**
+ * Cancels the given task, reducing the target task count. Called
+ * from the UI to allow the user to select the specific task to end
+ * when reducing cluster size.
+ *
+ * @param id
+ * @return
+ */
+
+ boolean cancelTask(int id);
+
+ /**
+ * Whether this distribution of YARN supports disk resources.
+ * @return
+ */
+
+ boolean supportsDiskResource();
+
+ int getFreeNodeCount();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java
new file mode 100644
index 0000000..3c011ec
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.TaskLifecycleListener.Event;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+
+/**
+ * Controls the Drill cluster by representing the current cluster state with a
+ * desired state, taking corrective action to keep the cluster in the desired
+ * state. The cluster as a whole has a state, as do each task (node) within the
+ * cluster.
+ * <p>
+ * This class is designed to allow unit tests. In general, testing the
+ * controller on a live cluster is tedious. This class encapsulates the
+ * controller algorithm so it can be driven by a simulated cluster.
+ * <p>
+ * This object is shared between threads, thus synchronized.
+ */
+
+public class ClusterControllerImpl implements ClusterController {
+ /**
+ * Controller lifecycle state.
+ */
+
+ public enum State {
+ /**
+ * Cluster is starting. Things are in a partially-built state. No tasks are
+ * started until the cluster moves to LIVE.
+ */
+
+ START,
+
+ /**
+ * Normal operating state: the controller seeks to maintain the desired
+ * number of tasks.
+ */
+
+ LIVE,
+
+ /**
+ * Controller is shutting down. Tasks are gracefully (where possible) ended;
+ * no new tasks are started. (That is, when we detect the exit of a task,
+ * the controller no longer immediately tries to start a replacement.
+ */
+
+ ENDING,
+
+ /**
+ * The controller has shut down. All tasks and threads are stopped. The
+ * controller allows the main thread (which has been patiently waiting) to
+ * continue, allowing the AM itself to shut down. Thus, this is a very
+ * short-lived state.
+ */
+
+ ENDED,
+
+ /**
+ * Something bad happened on start-up; the AM can't start and must shut
+ * down.
+ */
+
+ FAILED
+ }
+
+ private final static int PRIORITY_OFFSET = 1;
+
+ private static final Log LOG = LogFactory.getLog(ClusterControllerImpl.class);
+
+ /**
+ * Signals the completion of the cluster run. The main program waits on this
+ * mutex until all tasks complete (batch) or the cluster is explicitly shut
+ * down (persistent tasks.)
+ */
+
+ private Object completionMutex = new Object();
+
+ /**
+ * Maximum number of retries for each task launch.
+ */
+
+ protected int maxRetries = 3;
+
+ /**
+ * Controller state.
+ *
+ * @see {@link State}
+ */
+
+ State state = State.START;
+
+ /**
+ * Definition of the task types that can be run by this controller, along with
+ * the target task levels for each.
+ */
+
+ private Map<String, SchedulerStateActions> taskPools = new HashMap<>();
+
+ /**
+ * List of task pools prioritized in the order in which tasks should start.
+ * DoY supports only one task pool at present. The idea is to, later, support
+ * multiple pools that represent, say, pool 1 as the minimum number of
+ * Drillbits to run at all times, with pool 2 as extra Drillbits to start up
+ * during peak demand.
+ * <p>
+ * The priority also gives rise to YARN request priorities which are the only
+ * tool the AM has to associate container grants with the requests to which
+ * they correspond.
+ */
+
+ private List<SchedulerStateActions> prioritizedGroups = new ArrayList<>();
+
+ /**
+ * Cluster-wide association of YARN container IDs to tasks.
+ */
+
+ private Set<ContainerId> allocatedContainers = new HashSet<>();
+
+ /**
+ * Cluster-wide list of active tasks. Allows lookup from container ID to task
+ * (and then from task to task type.)
+ */
+
+ private Map<ContainerId, Task> activeContainers = new HashMap<>();
+
+ /**
+ * Tracks the tasks that have completed: either successfully (state == ENDED)
+ * or failed (state == FAILED). Eventually store this information elsewhere to
+ * avoid cluttering memory with historical data. Entries here are static
+ * copies, preserving the state at the time that the task completed.
+ */
+
+ private List<Task> completedTasks = new LinkedList<>();
+
+ /**
+ * Wrapper around the YARN API. Abstracts the details of YARN operations.
+ */
+
+ private final AMYarnFacade yarn;
+
+ /**
+ * Maximum number of new tasks to start on each "pulse" tick.
+ */
+
+ private int maxRequestsPerTick = 2;
+
+ private int stopTimoutMs = 10_000;
+
+ /**
+ * Time (in ms) between request to YARN to get an updated list of the node
+ * "inventory".
+ */
+
+ private int configPollPeriod = 60_000;
+ private long nextResourcePollTime;
+
+ /**
+ * List of nodes available in the cluster. Necessary as part of the process of
+ * ensuring that we run one Drillbit per node. (The YARN blacklist only half
+ * works for this purpose.)
+ */
+
+ private NodeInventory nodeInventory;
+
+ private long lastFailureCheckTime;
+
+ private int failureCheckPeriodMs = 60_000;
+
+ private int taskCheckPeriodMs = 10_000;
+ private long lastTaskCheckTime;
+
+ /**
+ * To increase code modularity, add-ons (such as the ZK monitor) register as
+ * lifecycle listeners that are alerted to "interesting" lifecycle events.
+ */
+
+ private List<TaskLifecycleListener> lifecycleListeners = new ArrayList<>();
+
+ /**
+ * Handy mechanism for setting properties on this controller that are
+ * available to plugins and UI without cluttering this class with member
+ * variables.
+ */
+
+ private Map<String, Object> properties = new HashMap<>();
+
+ /**
+ * When enabled, allows the controller to check for failures that result in no
+ * drillbits running. The controller will then automatically exit as no useful
+ * work can be done. Disable this to make debugging easier on a single-node
+ * cluster (lets you, say, start a "stray" drill bit and see what happens
+ * without the AM exiting.)
+ */
+
+ private boolean enableFailureCheck = true;
+
+ public ClusterControllerImpl(AMYarnFacade yarn) {
+ this.yarn = yarn;
+ }
+
+ @Override
+ public void enableFailureCheck(boolean flag) {
+ this.enableFailureCheck = flag;
+ }
+
+ /**
+ * Define a task type. Registration order is important: the controller starts
+ * task in the order that they are registered. Must happen before the YARN
+ * callbacks start.
+ *
+ * @param scheduler
+ */
+
+ @Override
+ public void registerScheduler(Scheduler scheduler) {
+ assert !taskPools.containsKey(scheduler.getName());
+ scheduler.setPriority(taskPools.size() + PRIORITY_OFFSET);
+ SchedulerStateActions taskGroup = new SchedulerStateImpl(this, scheduler);
+ taskPools.put(taskGroup.getName(), taskGroup);
+ prioritizedGroups.add(taskGroup);
+ }
+
+ /**
+ * Called when the caller has completed start-up and the controller should
+ * become live.
+ */
+
+ @Override
+ public synchronized void started() throws YarnFacadeException, AMException {
+ nodeInventory = new NodeInventory(yarn);
+
+ // Verify that no resource seeks a container larger than
+ // what YARN can provide. Ensures a graceful exit in this
+ // case.
+
+ Resource maxResource = yarn.getRegistrationResponse()
+ .getMaximumResourceCapability();
+ for (SchedulerStateActions group : prioritizedGroups) {
+ group.getScheduler().limitContainerSize(maxResource);
+ }
+ state = State.LIVE;
+ }
+
+ @Override
+ public synchronized void tick(long curTime) {
+ if (state == State.LIVE) {
+ adjustTasks(curTime);
+ requestContainers();
+ }
+ if (state == State.LIVE || state == State.ENDING) {
+ checkTasks(curTime);
+ }
+ }
+
+ /**
+ * Adjust the number of running tasks to match the desired level.
+ *
+ * @param curTime
+ */
+
+ private void adjustTasks(long curTime) {
+ if (enableFailureCheck && getFreeNodeCount() == 0) {
+ checkForFailure(curTime);
+ }
+ if (state != State.LIVE) {
+ return;
+ }
+ for (SchedulerStateActions group : prioritizedGroups) {
+ group.adjustTasks();
+ }
+ }
+
+ /**
+ * Get the approximate number of free YARN nodes (those that can
+ * accept a task request.) Starts with the number of nodes from
+ * the node inventory, then subtracts any in-flight requests (which
+ * do not, by definition, have node allocated.)
+ * <p>
+ * This approximation <b>does not</b> consider whether the node
+ * has sufficient resources to run a task; only whether the node
+ * itself exists.
+ * @return
+ */
+
+ @Override
+ public int getFreeNodeCount( ) {
+ int count = nodeInventory.getFreeNodeCount();
+ for (SchedulerStateActions group : prioritizedGroups) {
+ count -= group.getRequestCount( );
+ }
+ return Math.max( 0, count );
+ }
+
+ /**
+ * Check if the controller is unable to run any tasks. If so, and the option
+ * is enabled, then automatically exit since no useful work can be done.
+ *
+ * @param curTime
+ */
+
+ private void checkForFailure(long curTime) {
+ if (lastFailureCheckTime + failureCheckPeriodMs > curTime) {
+ return;
+ }
+ lastFailureCheckTime = curTime;
+ for (SchedulerStateActions group : prioritizedGroups) {
+ if (group.getTaskCount() > 0) {
+ return;
+ }
+ }
+ LOG.error(
+ "Application failure: no tasks are running and no nodes are available -- exiting.");
+ terminate(State.FAILED);
+ }
+
+ /**
+ * Periodically check tasks, handling any timeout issues.
+ *
+ * @param curTime
+ */
+
+ private void checkTasks(long curTime) {
+
+ // Check periodically, not on every tick.
+
+ if (lastTaskCheckTime + taskCheckPeriodMs > curTime) {
+ return;
+ }
+ lastTaskCheckTime = curTime;
+
+ // Check for task timeouts in states that have a timeout.
+
+ EventContext context = new EventContext(this);
+ for (SchedulerStateActions group : prioritizedGroups) {
+ context.setGroup(group);
+ group.checkTasks(context, curTime);
+ }
+ }
+
+ /**
+ * Get an update from YARN on available resources.
+ */
+
+ @Override
+ public void updateRMStatus() {
+ long curTime = System.currentTimeMillis();
+ if (nextResourcePollTime > curTime) {
+ return;
+ }
+
+ // yarnNodeCount = yarn.getNodeCount();
+ // LOG.info("YARN reports " + yarnNodeCount + " nodes.");
+
+ // Resource yarnResources = yarn.getResources();
+ // if (yarnResources != null) {
+ // LOG.info("YARN reports " + yarnResources.getMemory() + " MB, " +
+ // yarnResources.getVirtualCores()
+ // + " vcores available.");
+ // }
+ nextResourcePollTime = curTime + configPollPeriod;
+ }
+
+ /**
+ * Request any containers that have accumulated.
+ */
+
+ private void requestContainers() {
+ EventContext context = new EventContext(this);
+ for (SchedulerStateActions group : prioritizedGroups) {
+ context.setGroup(group);
+ if (group.requestContainers(context, maxRequestsPerTick)) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public synchronized void containersAllocated(List<Container> containers) {
+ EventContext context = new EventContext(this);
+ for (Container container : containers) {
+ if (allocatedContainers.contains(container.getId())) {
+ continue;
+ }
+
+ // We should never get a container on a node in the blacklist we
+ // sent to YARN. If we do, something is wrong. Log the error and
+ // reject the container. Else, bad things happen further along as
+ // the tracking mechanisms assume one task per node.
+
+ String host = container.getNodeId().getHost();
+ if (nodeInventory.isInUse(host)) {
+ LOG.error( "Host is in use, but YARN allocated a container: " +
+ DoYUtil.labelContainer(container) + " - container rejected." );
+ yarn.releaseContainer(container);
+ continue;
+ }
+
+ // The container is fine.
+
+ allocatedContainers.add(container.getId());
+ int priority = container.getPriority().getPriority();
+ int offset = priority - PRIORITY_OFFSET;
+ if (offset < 0 || offset > prioritizedGroups.size()) {
+ LOG.error("Container allocated with unknown priority " + DoYUtil.labelContainer(container));
+ continue;
+ }
+ context.setGroup(prioritizedGroups.get(offset));
+ context.group.containerAllocated(context, container);
+ }
+ }
+
+ @Override
+ public synchronized void containerStarted(ContainerId containerId) {
+ Task task = getTask(containerId);
+ if (task == null) {
+ return;
+ }
+ EventContext context = new EventContext(this, task);
+ context.getState().containerStarted(context);
+ LOG.trace("Container started: " + containerId);
+ }
+
+ @Override
+ public synchronized void taskStartFailed(ContainerId containerId,
+ Throwable t) {
+ Task task = getTask(containerId);
+ if (task == null) {
+ return;
+ }
+ EventContext context = new EventContext(this, task);
+ context.getState().launchFailed(context, t);
+ }
+
+ private Task getTask(ContainerId containerId) {
+ return activeContainers.get(containerId);
+ }
+
+ @Override
+ public synchronized void containerStopped(ContainerId containerId) {
+ // Ignored because the node manager notification is very
+ // unreliable. Better to rely on the Resource Manager
+ // completion request.
+ // Task task = getTask(containerId);
+ // if (task == null) {
+ // return; }
+ // EventContext context = new EventContext(this, task);
+ // context.getState().containerStopped(context);
+ }
+
+ @Override
+ public synchronized void containersCompleted(List<ContainerStatus> statuses) {
+ EventContext context = new EventContext(this);
+ for (ContainerStatus status : statuses) {
+ Task task = getTask(status.getContainerId());
+ if (task == null) {
+ if (task == null) {
+ // Will occur if a container was allocated but rejected.
+ // Any other occurrence is unexpected and an error.
+
+ LOG.warn("Container completed but no associated task state: " + status.getContainerId() );
+ }
+ continue;
+ }
+ context.setTask(task);
+ context.getState().containerCompleted(context, status);
+ }
+ checkStatus();
+ }
+
+ @Override
+ public synchronized float getProgress() {
+ int numerator = 0;
+ int denominator = 0;
+ for (SchedulerStateActions group : taskPools.values()) {
+ Scheduler sched = group.getScheduler();
+ int[] progress = sched.getProgress();
+ numerator += progress[0];
+ denominator += progress[1];
+ }
+ if (numerator == 0) {
+ return 1;
+ }
+ return (float) denominator / (float) numerator;
+ }
+
+ @Override
+ public synchronized void stopTaskFailed(ContainerId containerId,
+ Throwable t) {
+ Task task = getTask(containerId);
+ if (task == null) {
+ return;
+ }
+ EventContext context = new EventContext(this, task);
+ context.getState().stopTaskFailed(context, t);
+ }
+
+ @Override
+ public synchronized void resizeDelta(int delta) {
+ // TODO: offer the delta to each scheduler in turn.
+ // For now, we support only one scheduler.
+
+ prioritizedGroups.get(0).getScheduler().change(delta);
+ }
+
+ @Override
+ public synchronized int resizeTo(int n) {
+ // TODO: offer the delta to each scheduler in turn.
+ // For now, we support only one scheduler.
+
+ return prioritizedGroups.get(0).getScheduler().resize(n);
+ }
+
+ @Override
+ public synchronized void shutDown() {
+ LOG.info("Shut down request received");
+ this.state = State.ENDING;
+ EventContext context = new EventContext(this);
+ for (SchedulerStateActions group : prioritizedGroups) {
+ group.shutDown(context);
+ }
+ checkStatus();
+ }
+
+ @Override
+ public boolean waitForCompletion() {
+ start();
+ synchronized (completionMutex) {
+ try {
+ completionMutex.wait();
+ LOG.info("Controller shut down completed");
+ } catch (InterruptedException e) {
+ // Should not happen
+ }
+ }
+ return succeeded();
+ }
+
+ private void start() {
+ yarnReport();
+ }
+
+ private void yarnReport() {
+ RegisterApplicationMasterResponse response = yarn.getRegistrationResponse();
+ LOG.info("YARN queue: " + response.getQueue());
+ Resource resource = response.getMaximumResourceCapability();
+ LOG.info("YARN max resource: " + resource.getMemory() + " MB, "
+ + resource.getVirtualCores() + " cores");
+ EnumSet<SchedulerResourceTypes> types = response
+ .getSchedulerResourceTypes();
+ StringBuilder buf = new StringBuilder();
+ String sep = "";
+ for (SchedulerResourceTypes type : types) {
+ buf.append(sep);
+ buf.append(type.toString());
+ sep = ", ";
+ }
+ LOG.info("YARN scheduler resource types: " + buf.toString());
+ }
+
+ /**
+ * Check for overall completion. We are done when either we've successfully
+ * run all tasks, or we've run some and given up on others. We're done when
+ * the number of completed or failed tasks reaches our target.
+ */
+
+ private void checkStatus() {
+ if (state != State.ENDING) {
+ return;
+ }
+ for (SchedulerStateActions group : prioritizedGroups) {
+ if (!group.isDone()) {
+ return;
+ }
+ }
+ terminate(State.ENDED);
+ }
+
+ private void terminate(State state) {
+ this.state = state;
+ synchronized (completionMutex) {
+ completionMutex.notify();
+ }
+ }
+
+ public boolean isLive() {
+ return state == State.LIVE;
+ }
+
+ public boolean succeeded() {
+ return state == State.ENDED;
+ }
+
+ public void containerAllocated(Task task) {
+ activeContainers.put(task.getContainerId(), task);
+ }
+
+ public AMYarnFacade getYarn() {
+ return yarn;
+ }
+
+ public void containerReleased(Task task) {
+ activeContainers.remove(task.getContainerId());
+ }
+
+ public void taskEnded(Task task) {
+ completedTasks.add(task);
+ }
+
+ public void taskRetried(Task task) {
+ Task copy = task.copy();
+ copy.disposition = Task.Disposition.RETRIED;
+ completedTasks.add(copy);
+ }
+
+ public void taskGroupCompleted(SchedulerStateActions taskGroup) {
+ checkStatus();
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public int getStopTimeoutMs() {
+ return stopTimoutMs;
+ }
+
+ @Override
+ public synchronized void reserveHost(String hostName) {
+ nodeInventory.reserve(hostName);
+ }
+
+ @Override
+ public synchronized void releaseHost(String hostName) {
+ nodeInventory.release(hostName);
+ }
+
+ public NodeInventory getNodeInventory() {
+ return nodeInventory;
+ }
+
+ @Override
+ public void setProperty(String key, Object value) {
+ properties.put(key, value);
+ }
+
+ @Override
+ public Object getProperty(String key) {
+ return properties.get(key);
+ }
+
+ @Override
+ public void registerLifecycleListener(TaskLifecycleListener listener) {
+ lifecycleListeners.add(listener);
+ }
+
+ public void fireLifecycleChange(Event event, EventContext context) {
+ for (TaskLifecycleListener listener : lifecycleListeners) {
+ listener.stateChange(event, context);
+ }
+ }
+
+ @Override
+ public void setMaxRetries(int value) {
+ maxRetries = value;
+ }
+
+ @Override
+ public int getTargetCount() {
+ int count = 0;
+ for (SchedulerStateActions group : prioritizedGroups) {
+ count += group.getScheduler().getTarget();
+ }
+ return count;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public synchronized void visit(ControllerVisitor visitor) {
+ visitor.visit(this);
+ }
+
+ public List<SchedulerStateActions> getPools() {
+ return prioritizedGroups;
+ }
+
+ @Override
+ public synchronized void visitTasks(TaskVisitor visitor) {
+ for (SchedulerStateActions pool : prioritizedGroups) {
+ pool.visitTaskModels(visitor);
+ }
+ }
+
+ public List<Task> getHistory() {
+ return completedTasks;
+ }
+
+ @Override
+ public boolean isTaskLive(int id) {
+ for (SchedulerStateActions group : prioritizedGroups) {
+ Task task = group.getTask(id);
+ if (task != null) {
+ return task.isLive();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized boolean cancelTask(int id) {
+ for (SchedulerStateActions group : prioritizedGroups) {
+ Task task = group.getTask(id);
+ if (task != null) {
+ group.cancel(task);
+ group.getScheduler().change(-1);
+ return true;
+ }
+ }
+ LOG.warn( "Requested to cancel task, but no task found: " + id );
+ return false;
+ }
+
+ @Override
+ public synchronized void completionAck(Task task, String propertyKey) {
+ EventContext context = new EventContext(this);
+ context.setTask(task);
+ context.getState().completionAck(context);
+ if (propertyKey != null) {
+ task.properties.remove(propertyKey);
+ }
+ }
+
+ @Override
+ public synchronized void startAck(Task task, String propertyKey,
+ Object value) {
+ if (propertyKey != null && value != null) {
+ task.properties.put(propertyKey, value);
+ }
+ EventContext context = new EventContext(this);
+ context.setTask(task);
+ context.getState().startAck(context);
+ }
+
+ @Override
+ public boolean supportsDiskResource() {
+ return getYarn().supportsDiskResource();
+ }
+
+ @Override
+ public void registryDown() { shutDown( ); }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java
new file mode 100644
index 0000000..b8d6e06
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public interface ControllerFactory {
+ public static class ControllerFactoryException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ControllerFactoryException(String msg, Exception e) {
+ super(msg, e);
+ }
+ }
+
+ Dispatcher build() throws ControllerFactoryException;
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java
new file mode 100644
index 0000000..5774d7d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public interface ControllerVisitor {
+ void visit(ClusterController controller);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java
new file mode 100644
index 0000000..f5257e6
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.AMRegistrar.AMRegistrationException;
+import org.apache.drill.yarn.appMaster.AMYarnFacade.YarnAppHostReport;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+/**
+ * Dispatches YARN, timer and ZooKeeper events to the cluster controller.
+ * Allows the controller to be independent of the plumbing needed to
+ * receive events. Divides work among
+ * various components to separate concerns. Three streams of events
+ * feed into an app master "strategy". The three streams are
+ * <ol>
+ * <li>Resource manager</li>
+ * <li>Node manager</li>
+ * <li>Timer</li>
+ * </ol>
+ * <p>
+ * This class is "lightly" multi-threaded: it responds to events
+ * from the RM, NM and timer threads. Within each of these, events
+ * are sequential. So, synchronization is needed across the three event
+ * types, but not within event types. (That is, we won't see two RM events,
+ * say, occurring at the same time from separate threads.)
+ */
+
+public class Dispatcher
+{
+ private static final Log LOG = LogFactory.getLog(Dispatcher.class);
+
+ /**
+ * Handle YARN Resource Manager events. This is a separate class to clarify
+ * which events are from the Resource Manager.
+ */
+
+ private class ResourceCallback implements AMRMClientAsync.CallbackHandler {
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ LOG.trace("NM: Containers allocated: " + containers.size());
+ controller.containersAllocated(containers);
+ }
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ LOG.trace("NM: Containers completed: " + statuses.size());
+ controller.containersCompleted(statuses);
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ LOG.trace("RM: Shutdown request");
+ controller.shutDown();
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ LOG.trace("RM: Nodes updated, count= " + updatedNodes.size());
+ }
+
+ @Override
+ public float getProgress() {
+ // getProgress is called on each fetch from the NM response queue.
+ // This is a good time to update status, even if it looks a bit
+ // bizarre...
+
+ controller.updateRMStatus();
+ return controller.getProgress();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.error("Fatal RM Error: " + e.getMessage());
+ LOG.error("AM Shutting down!");
+ controller.shutDown();
+ }
+ }
+
+ /**
+ * Handle YARN Node Manager events. This is a separate class to clarify which
+ * events are, in fact, from the node manager.
+ */
+
+ public class NodeCallback implements NMClientAsync.CallbackHandler {
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ LOG.trace("CNM: ontainer start error: " + containerId, t);
+ controller.taskStartFailed(containerId, t);
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId,
+ Map<String, ByteBuffer> allServiceResponse) {
+ LOG.trace("NM: Container started: " + containerId);
+ controller.containerStarted(containerId);
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ LOG.trace("NM: Container status: " + containerId + " - "
+ + containerStatus.toString());
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId,
+ Throwable t) {
+ LOG.trace("NM: Container error: " + containerId, t);
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ LOG.trace("NM: Stop container error: " + containerId, t);
+ controller.stopTaskFailed(containerId, t);
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ LOG.trace("NM: Container stopped: " + containerId);
+ controller.containerStopped(containerId);
+ }
+ }
+
+ /**
+ * Handle timer events: a constant tick to handle time-based actions such as
+ * timeouts.
+ */
+
+ public class TimerCallback implements PulseRunnable.PulseCallback {
+ /**
+ * The lifecycle of each task is driven by RM and NM callbacks. We use the
+ * timer to start the process. While this is overkill here, in a real app,
+ * we'd check requested resource levels (which might change) and number of
+ * tasks (which might change if tasks die), and take corrective action:
+ * adding or removing tasks.
+ */
+
+ @Override
+ public void onTick(long curTime) {
+ for (Pollable pollable : pollables) {
+ pollable.tick(curTime);
+ }
+ controller.tick(curTime);
+ }
+ }
+
+ private AMYarnFacade yarn;
+ private ClusterController controller;
+
+ /**
+ * Add-on tools that are called once on each timer tick.
+ */
+
+ private List<Pollable> pollables = new ArrayList<>();
+
+ /**
+ * Add-ons for which the dispatcher should managed the start/end lifecycle.
+ */
+
+ private List<DispatcherAddOn> addOns = new ArrayList<>();
+ private String trackingUrl;
+ private AMRegistrar amRegistrar;
+ private int httpPort;
+ private PulseRunnable timer;
+ private Thread pulseThread;
+ private final int timerPeriodMs;
+
+ public Dispatcher(int timerPeriodMs) {
+ this.timerPeriodMs = timerPeriodMs;
+ }
+
+ public void setYarn(AMYarnFacade yarn) throws YarnFacadeException {
+ this.yarn = yarn;
+ controller = new ClusterControllerImpl(yarn);
+ }
+
+ public ClusterController getController() {
+ return controller;
+ }
+
+ public void registerPollable(Pollable pollable) {
+ pollables.add(pollable);
+ }
+
+ public void registerAddOn(DispatcherAddOn addOn) {
+ addOns.add(addOn);
+ }
+
+ public void setHttpPort(int port) {
+ httpPort = port;
+ }
+
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ }
+
+ public String getTrackingUrl() {
+ return yarn.getTrackingUrl();
+ }
+
+ public void setAMRegistrar(AMRegistrar registrar) {
+ amRegistrar = registrar;
+ }
+
+ /**
+ * Start the dispatcher by initializing YARN and registering the AM.
+ *
+ * @return true if successful, false if the dispatcher did not start.
+ */
+
+ public boolean start() throws YarnFacadeException {
+
+ // Start the connection to YARN to get information about this app, and to
+ // create a session we can use to report problems.
+
+ try {
+ setup();
+ } catch (AMException e) {
+ String msg = e.getMessage();
+ LOG.error("Fatal error: " + msg);
+ yarn.finish(false, msg);
+ return false;
+ }
+
+ // Ensure that this is the only AM. If not, shut down the AM,
+ // reporting to YARN that this is a failure and the message explaining
+ // the conflict. Report this as a SUCCESS run so that YARN does not
+ // attempt to retry the AM.
+
+ try {
+ register();
+ } catch (AMRegistrationException e) {
+ LOG.error(e.getMessage(), e);
+ yarn.finish(true, e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ public void run() throws YarnFacadeException {
+ // Only if registration is successful do we start the pulse thread
+ // which will cause containers to be requested.
+
+ startTimer();
+
+ // Run until the controller decides to shut down.
+
+ LOG.trace("Running");
+ boolean success = controller.waitForCompletion();
+
+ // Shut down.
+
+ LOG.trace("Finishing");
+ finish(success, null);
+ }
+
+ private void setup() throws YarnFacadeException, AMException {
+ LOG.trace("Starting YARN agent");
+ yarn.start(new ResourceCallback(), new NodeCallback());
+ String url = trackingUrl.replace("<port>", Integer.toString(httpPort));
+ if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) {
+ url = url.replace("http:", "https:");
+ }
+ LOG.trace("Registering YARN application, URL: " + url);
+ yarn.register(url);
+ controller.started();
+
+ for (DispatcherAddOn addOn : addOns) {
+ addOn.start(controller);
+ }
+ }
+
+ private void register() throws AMRegistrationException {
+ if (amRegistrar == null) {
+ LOG.warn(
+ "No AM Registrar provided: cannot check if this is the only AM for the Drill cluster.");
+ } else {
+ YarnAppHostReport rpt = yarn.getAppHostReport();
+ amRegistrar.register(rpt.amHost, httpPort, rpt.appId);
+ }
+ }
+
+ private void startTimer() {
+ timer = new PulseRunnable(timerPeriodMs, new TimerCallback());
+
+ // Start the pulse thread after registering so that we're in
+ // a state where we can interact with the RM.
+
+ pulseThread = new Thread(timer);
+ pulseThread.setName("Pulse");
+ pulseThread.start();
+ }
+
+ private void finish(boolean success, String msg) throws YarnFacadeException {
+ for (DispatcherAddOn addOn : addOns) {
+ addOn.finish(controller);
+ }
+
+ LOG.trace("Shutting down YARN agent");
+
+ // Stop the timer thread first. This ensures that the
+ // timer events don't try to use the YARN API during
+ // shutdown.
+
+ stopTimer();
+ yarn.finish(success, msg);
+ }
+
+ private void stopTimer() {
+ timer.stop();
+ try {
+ pulseThread.join();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java
new file mode 100644
index 0000000..5c7100b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+/**
+ * Interface for an add-on to the dispatcher that
+ * should be started at start of the run and ended
+ * at the end of the run.
+ */
+
+public interface DispatcherAddOn {
+ void start(ClusterController controller);
+
+ void finish(ClusterController controller);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java
new file mode 100644
index 0000000..c0db9a1
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.ControllerFactory.ControllerFactoryException;
+import org.apache.drill.yarn.appMaster.http.WebServer;
+import org.apache.drill.yarn.core.DoyConfigException;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+
+/**
+ * Application Master for Drill. The name is visible when using the "jps"
+ * command and is chosen to make sense on a busy YARN node.
+ * <p>
+ * To debug this AM use the customized unmanaged AM launcher in this
+ * jar. (The "stock" YARN version does not give you time to attach
+ * the debugger.)
+ * <pre><code>
+ * TARGET_JAR=/your-git-folder/drill-yarn/target/drill-yarn-1.6-SNAPSHOT.jar
+ * TARGET_CLASS=org.apache.drill.yarn.appMaster.ApplicationMaster
+ * LAUNCHER_JAR=$TARGET_JAR
+ * LAUNCHER_CLASS=org.apache.drill.yarn.mock.UnmanagedAMLauncher
+ * $HH/bin/hadoop jar $LAUNCHER_JAR \
+ * $LAUNCHER_CLASS -classpath $TARGET_JAR \
+ * -cmd "java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 \
+ * $TARGET_CLASS"
+ * </pre></code>
+ */
+
+public class DrillApplicationMaster {
+ private static final Log LOG = LogFactory
+ .getLog(DrillApplicationMaster.class);
+
+ public static void main(String[] args) {
+ LOG.trace("Drill Application Master starting.");
+
+ // Load the configuration. Assumes that the user's Drill-on-YARN
+ // configuration was archived along with the Drill software in
+ // the $DRILL_HOME/conf directory, and that $DRILL_HOME/conf is
+ // on the class-path.
+
+ try {
+ DrillOnYarnConfig.load().setAmDrillHome();
+ } catch (DoyConfigException e) {
+ System.err.println(e.getMessage());
+ System.exit(-1);
+ }
+
+ // Build the dispatcher using the Drillbit factory. Allows inserting
+ // other factories for testing, or if we need to manage a cluster of
+ // processes other than Drillbits.
+
+ // Dispatcher am = (new SimpleBatchFactory( )).build( );
+ // Dispatcher am = (new MockDrillbitFactory( )).build( );
+ Dispatcher dispatcher;
+ try {
+ dispatcher = (new DrillControllerFactory()).build();
+ } catch (ControllerFactoryException e) {
+ LOG.error("Setup failed, exiting: " + e.getMessage(), e);
+ System.exit(-1);
+ return;
+ }
+
+ // Start the Dispatcher. This will return false if this AM conflicts with
+ // a running AM.
+
+ try {
+ if (!dispatcher.start()) {
+ return;
+ }
+ } catch (Throwable e) {
+ LOG.error("Fatal error, exiting: " + e.getMessage(), e);
+ System.exit(-1);
+ }
+
+ // Create and start the web server. Do this after starting the AM
+ // so that we don't learn about a conflict via the a web server port
+ // conflict.
+
+ WebServer webServer = new WebServer(dispatcher);
+ try {
+ webServer.start();
+ } catch (Exception e) {
+ LOG.error("Web server setup failed, exiting: " + e.getMessage(), e);
+ System.exit(-1);
+ }
+
+ // Run the dispatcher until the cluster shuts down.
+
+ try {
+ dispatcher.run();
+ } catch (Throwable e) {
+ LOG.error("Fatal error, exiting: " + e.getMessage(), e);
+ System.exit(-1);
+ } finally {
+ try {
+ webServer.close();
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java
new file mode 100644
index 0000000..013fdba
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.drill.yarn.core.DfsFacade;
+import org.apache.drill.yarn.core.DfsFacade.DfsFacadeException;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.drill.yarn.core.DoyConfigException;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.drill.yarn.core.LaunchSpec;
+import org.apache.drill.yarn.appMaster.http.AMSecurityManagerImpl;
+import org.apache.drill.yarn.core.ClusterDef;
+import org.apache.drill.yarn.zk.ZKClusterCoordinatorDriver;
+import org.apache.drill.yarn.zk.ZKRegistry;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+
+import com.typesafe.config.Config;
+
+/**
+ * Builds a controller for a cluster of Drillbits. The AM is designed to be
+ * mostly generic; only this class contains knowledge that the tasks being
+ * managed are drillbits. This design ensures that we can add other Drill
+ * components in the future without the need to make major changes to the AM
+ * logic.
+ * <p>
+ * The controller consists of a generic dispatcher and cluster controller, along
+ * with a Drill-specific scheduler and task launch specification. Drill also
+ * includes an interface to ZooKeeper to monitor Drillbits.
+ * <p>
+ * The AM is launched by YARN. All it knows is what is in its launch environment
+ * or configuration files. The client must set up all the information that the
+ * AM needs. Static information appears in configuration files. But, dynamic
+ * information (or that which is inconvenient to repeat in configuration files)
+ * must arrive in environment variables. See {@link DrillOnYarnConfig} for more
+ * information.
+ */
+
+public class DrillControllerFactory implements ControllerFactory {
+ private static final Log LOG = LogFactory.getLog(DrillControllerFactory.class);
+ private Config config = DrillOnYarnConfig.config();
+ private String drillArchivePath;
+ private String siteArchivePath;
+ private boolean localized;
+
+ @Override
+ public Dispatcher build() throws ControllerFactoryException {
+ LOG.info(
+ "Initializing AM for " + config.getString(DrillOnYarnConfig.APP_NAME));
+ Dispatcher dispatcher;
+ try {
+ Map<String, LocalResource> resources = prepareResources();
+
+ TaskSpec taskSpec = buildDrillTaskSpec(resources);
+
+ // Prepare dispatcher
+
+ int timerPeriodMs = config.getInt(DrillOnYarnConfig.AM_TICK_PERIOD_MS);
+ dispatcher = new Dispatcher(timerPeriodMs);
+ int pollPeriodMs = config.getInt(DrillOnYarnConfig.AM_POLL_PERIOD_MS);
+ AMYarnFacadeImpl yarn = new AMYarnFacadeImpl(pollPeriodMs);
+ dispatcher.setYarn(yarn);
+ dispatcher.getController()
+ .setMaxRetries(config.getInt(DrillOnYarnConfig.DRILLBIT_MAX_RETRIES));
+
+ int requestTimeoutSecs = DrillOnYarnConfig.config().getInt( DrillOnYarnConfig.DRILLBIT_REQUEST_TIMEOUT_SEC);
+ int maxExtraNodes = DrillOnYarnConfig.config().getInt(DrillOnYarnConfig.DRILLBIT_MAX_EXTRA_NODES);
+
+ // Assume basic scheduler for now.
+ ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0);
+ Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec,
+ pool.getCount(), requestTimeoutSecs, maxExtraNodes);
+ dispatcher.getController().registerScheduler(testGroup);
+ pool.modifyTaskSpec(taskSpec);
+
+ // ZooKeeper setup
+
+ buildZooKeeper(config, dispatcher);
+ } catch (YarnFacadeException | DoyConfigException e) {
+ throw new ControllerFactoryException("Drill AM intitialization failed", e);
+ }
+
+ // Tracking Url
+ // TODO: HTTPS support
+
+ dispatcher.setHttpPort(config.getInt(DrillOnYarnConfig.HTTP_PORT));
+ String trackingUrl = null;
+ if (config.getBoolean(DrillOnYarnConfig.HTTP_ENABLED)) {
+ trackingUrl = "http://<host>:<port>/redirect";
+ dispatcher.setTrackingUrl(trackingUrl);
+ }
+
+ // Enable/disable check for auto shutdown when no nodes are running.
+
+ dispatcher.getController().enableFailureCheck(
+ config.getBoolean(DrillOnYarnConfig.AM_ENABLE_AUTO_SHUTDOWN));
+
+ // Define the security manager
+
+ AMSecurityManagerImpl.setup();
+
+ return dispatcher;
+ }
+
+ /**
+ * Prepare the files ("resources" in YARN terminology) that YARN should
+ * download ("localize") for the Drillbit. We need both the Drill software and
+ * the user's site-specific configuration.
+ *
+ * @return
+ * @throws YarnFacadeException
+ */
+
+ private Map<String, LocalResource> prepareResources()
+ throws YarnFacadeException {
+ try {
+ DfsFacade dfs = new DfsFacade(config);
+ localized = dfs.isLocalized();
+ if (!localized) {
+ return null;
+ }
+ dfs.connect();
+ Map<String, LocalResource> resources = new HashMap<>();
+ DrillOnYarnConfig drillConfig = DrillOnYarnConfig.instance();
+
+ // Localize the Drill archive.
+
+ drillArchivePath = drillConfig.getDrillArchiveDfsPath();
+ DfsFacade.Localizer localizer = new DfsFacade.Localizer(dfs,
+ drillArchivePath);
+ String key = config.getString(DrillOnYarnConfig.DRILL_ARCHIVE_KEY);
+ localizer.defineResources(resources, key);
+ LOG.info("Localizing " + drillArchivePath + " with key \"" + key + "\"");
+
+ // Localize the site archive, if any.
+
+ siteArchivePath = drillConfig.getSiteArchiveDfsPath();
+ if (siteArchivePath != null) {
+ localizer = new DfsFacade.Localizer(dfs, siteArchivePath);
+ key = config.getString(DrillOnYarnConfig.SITE_ARCHIVE_KEY);
+ localizer.defineResources(resources, key);
+ LOG.info("Localizing " + siteArchivePath + " with key \"" + key + "\"");
+ }
+ return resources;
+ } catch (DfsFacadeException e) {
+ throw new YarnFacadeException(
+ "Failed to get DFS status for Drill archive", e);
+ }
+ }
+
+ /**
+ * Constructs the Drill launch command. The launch uses the YARN-specific
+ * yarn-drillbit.sh script, setting up the required input environment
+ * variables.
+ * <p>
+ * This is an exercise in getting many details just right. The code here sets
+ * the environment variables required by (and documented in) yarn-drillbit.sh.
+ * The easiest way to understand this code is to insert an "echo" statement in
+ * drill-bit.sh to echo the launch command there. Then, look in YARN's NM
+ * private container directory for the launch_container.sh script to see the
+ * command generated by the following code. Compare the two to validate that
+ * the code does the right thing.
+ * <p>
+ * This class is very Linux-specific. The usual adjustments must be made to
+ * adapt it to Windows.
+ *
+ * @param config
+ * @return
+ * @throws DoyConfigException
+ */
+
+ private TaskSpec buildDrillTaskSpec(Map<String, LocalResource> resources)
+ throws DoyConfigException {
+ DrillOnYarnConfig doyConfig = DrillOnYarnConfig.instance();
+
+ // Drillbit launch description
+
+ ContainerRequestSpec containerSpec = new ContainerRequestSpec();
+ containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY);
+ containerSpec.vCores = config.getInt(DrillOnYarnConfig.DRILLBIT_VCORES);
+ containerSpec.disks = config.getDouble(DrillOnYarnConfig.DRILLBIT_DISKS);
+
+ LaunchSpec drillbitSpec = new LaunchSpec();
+
+ // The drill home location is either a non-localized location,
+ // or, more typically, the expanded Drill directory under the
+ // container's working directory. When the localized directory,
+ // we rely on the fact that the current working directory is
+ // set to the container directory, so we just need the name
+ // of the Drill folder under the cwd.
+
+ String drillHome = doyConfig.getRemoteDrillHome();
+ drillbitSpec.env.put("DRILL_HOME", drillHome);
+ LOG.trace("Drillbit DRILL_HOME: " + drillHome);
+
+ // Heap memory
+
+ addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_HEAP, "DRILL_HEAP");
+
+ // Direct memory
+
+ addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_DIRECT_MEM,
+ "DRILL_MAX_DIRECT_MEMORY");
+
+ // Code cache
+
+ addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_CODE_CACHE,
+ "DRILLBIT_CODE_CACHE_SIZE");
+
+ // Any additional VM arguments from the config file.
+
+ addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_VM_ARGS,
+ "DRILL_JVM_OPTS");
+
+ // Any user-specified library path
+
+ addIfSet(drillbitSpec, DrillOnYarnConfig.JAVA_LIB_PATH,
+ DrillOnYarnConfig.DOY_LIBPATH_ENV_VAR);
+
+ // Drill logs.
+ // Relies on the LOG_DIR_EXPANSION_VAR marker which is replaced by
+ // the container log directory.
+
+ if (!config.getBoolean(DrillOnYarnConfig.DISABLE_YARN_LOGS)) {
+ drillbitSpec.env.put("DRILL_YARN_LOG_DIR",
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ }
+
+ // Debug option.
+
+ if (config.getBoolean(DrillOnYarnConfig.DRILLBIT_DEBUG_LAUNCH)) {
+ drillbitSpec.env.put(DrillOnYarnConfig.DRILL_DEBUG_ENV_VAR, "1");
+ }
+
+ // Hadoop home should be set in drill-env.sh since it is needed
+ // for client launch as well as the AM.
+
+ // addIfSet( drillbitSpec, DrillOnYarnConfig.HADOOP_HOME, "HADOOP_HOME" );
+
+ // Garbage collection (gc) logging. In drillbit.sh logging can be
+ // configured to go anywhere. In YARN, all logs go to the YARN log
+ // directory; the gc log file is always called "gc.log".
+
+ if (config.getBoolean(DrillOnYarnConfig.DRILLBIT_LOG_GC)) {
+ drillbitSpec.env.put("ENABLE_GC_LOG", "1");
+ }
+
+ // Class path additions.
+
+ addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_PREFIX_CLASSPATH,
+ DrillOnYarnConfig.DRILL_CLASSPATH_PREFIX_ENV_VAR);
+ addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_CLASSPATH,
+ DrillOnYarnConfig.DRILL_CLASSPATH_ENV_VAR);
+
+ // Drill-config.sh has specific entries for Hadoop and Hbase. To prevent
+ // an endless number of such one-off cases, we add a general extension
+ // class path. But, we retain Hadoop and Hbase for backward compatibility.
+
+ addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_EXTN_CLASSPATH,
+ "EXTN_CLASSPATH");
+ addIfSet(drillbitSpec, DrillOnYarnConfig.HADOOP_CLASSPATH,
+ "DRILL_HADOOP_CLASSPATH");
+ addIfSet(drillbitSpec, DrillOnYarnConfig.HBASE_CLASSPATH,
+ "DRILL_HBASE_CLASSPATH");
+
+ // Note that there is no equivalent of niceness for YARN: YARN controls
+ // the niceness of its child processes.
+
+ // Drillbit launch script under YARN
+ // Here we can use DRILL_HOME because all env vars are set before
+ // issuing this command.
+
+ drillbitSpec.command = "$DRILL_HOME/bin/yarn-drillbit.sh";
+
+ // Configuration (site directory), if given.
+
+ String siteDirPath = doyConfig.getRemoteSiteDir();
+ if (siteDirPath != null) {
+ drillbitSpec.cmdArgs.add("--site");
+ drillbitSpec.cmdArgs.add(siteDirPath);
+ }
+
+ // Localized resources
+
+ if (resources != null) {
+ drillbitSpec.resources.putAll(resources);
+ }
+
+ // Container definition.
+
+ TaskSpec taskSpec = new TaskSpec();
+ taskSpec.name = "Drillbit";
+ taskSpec.containerSpec = containerSpec;
+ taskSpec.launchSpec = drillbitSpec;
+ taskSpec.maxRetries = config.getInt(DrillOnYarnConfig.DRILLBIT_MAX_RETRIES);
+ return taskSpec;
+ }
+
+ /**
+ * Utility method to create an environment variable in the process launch
+ * specification if a given Drill-on-YARN configuration variable is set,
+ * copying the config value to the environment variable.
+ *
+ * @param spec
+ * @param configParam
+ * @param envVar
+ */
+
+ public void addIfSet(LaunchSpec spec, String configParam, String envVar) {
+ String value = config.getString(configParam);
+ if (!DoYUtil.isBlank(value)) {
+ spec.env.put(envVar, value);
+ }
+ }
+
+ public static class ZKRegistryAddOn implements DispatcherAddOn {
+ ZKRegistry zkRegistry;
+
+ public ZKRegistryAddOn(ZKRegistry zkRegistry) {
+ this.zkRegistry = zkRegistry;
+ }
+
+ @Override
+ public void start(ClusterController controller) {
+ zkRegistry.start(controller);
+ }
+
+ @Override
+ public void finish(ClusterController controller) {
+ zkRegistry.finish(controller);
+ }
+ }
+
+ /**
+ * Create the Drill-on-YARN version of the ZooKeeper cluster coordinator.
+ * Compared to the Drill version, this one takes its parameters via a builder
+ * pattern in the form of the cluster coordinator driver.
+ *
+ * @param config
+ * @param dispatcher
+ */
+
+ private void buildZooKeeper(Config config, Dispatcher dispatcher) {
+ String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT);
+ String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT);
+ String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID);
+ int failureTimeoutMs = config
+ .getInt(DrillOnYarnConfig.ZK_FAILURE_TIMEOUT_MS);
+ int retryCount = config.getInt(DrillOnYarnConfig.ZK_RETRY_COUNT);
+ int retryDelayMs = config.getInt(DrillOnYarnConfig.ZK_RETRY_DELAY_MS);
+ int userPort = config.getInt(DrillOnYarnConfig.DRILLBIT_USER_PORT);
+ int bitPort = config.getInt(DrillOnYarnConfig.DRILLBIT_BIT_PORT);
+ ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver()
+ .setConnect(zkConnect, zkRoot, clusterId)
+ .setFailureTimoutMs(failureTimeoutMs)
+ .setRetryCount(retryCount)
+ .setRetryDelayMs(retryDelayMs)
+ .setPorts(userPort, bitPort, bitPort + 1);
+ ZKRegistry zkRegistry = new ZKRegistry(driver);
+ dispatcher.registerAddOn(new ZKRegistryAddOn(zkRegistry));
+
+ // The ZK driver is started and stopped in conjunction with the
+ // controller lifecycle.
+
+ dispatcher.getController().registerLifecycleListener(zkRegistry);
+
+ // The ZK driver also handles registering the AM for the cluster.
+
+ dispatcher.setAMRegistrar(driver);
+
+ // The UI needs access to ZK to report unmanaged drillbits. We use
+ // a property to avoid unnecessary code dependencies.
+
+ dispatcher.getController().setProperty(ZKRegistry.CONTROLLER_PROPERTY,
+ zkRegistry);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java
new file mode 100644
index 0000000..76936b5
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public class DrillbitScheduler extends AbstractDrillbitScheduler {
+ private int requestTimeoutSecs;
+ private int maxExtraNodes;
+
+
+ public DrillbitScheduler(String name, TaskSpec taskSpec, int quantity,
+ int requestTimeoutSecs, int maxExtraNodes) {
+ super("basic", name, quantity);
+ this.taskSpec = taskSpec;
+ this.requestTimeoutSecs = requestTimeoutSecs;
+ this.maxExtraNodes = maxExtraNodes;
+ }
+
+ /**
+ * Set the number of running tasks to the quantity given.
+ * Limits the quantity to only a small margin above the number
+ * of estimated free YARN nodes. This avoids a common users error
+ * where someone requests 20 nodes on a 5-node cluster.
+ */
+
+ @Override
+ public int resize(int level) {
+ int limit = quantity + state.getController().getFreeNodeCount( ) +
+ maxExtraNodes;
+ return super.resize( Math.min( limit, level ) );
+ }
+
+ @Override
+ public int getRequestTimeoutSec() {
+ return requestTimeoutSecs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java
new file mode 100644
index 0000000..bec8cf9
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.drill.yarn.appMaster.Scheduler.TaskManager;
+
+public class EventContext {
+ public final AMYarnFacade yarn;
+ public final ClusterControllerImpl controller;
+ public SchedulerStateImpl group;
+ public Task task;
+
+ public EventContext(ClusterControllerImpl controller) {
+ yarn = controller.getYarn();
+ this.controller = controller;
+ }
+
+ public EventContext(ClusterController controller) {
+ this((ClusterControllerImpl) controller);
+ }
+
+ public EventContext(ClusterControllerImpl controller, Task task) {
+ this(controller);
+ setTask(task);
+ }
+
+ /**
+ * For testing only, omits the controller and YARN.
+ *
+ * @param task
+ */
+
+ public EventContext(Task task) {
+ controller = null;
+ yarn = null;
+ this.task = task;
+ }
+
+ public void setTask(Task task) {
+ this.task = task;
+ group = task.getGroup();
+ }
+
+ public TaskState getState() {
+ return task.state;
+ }
+
+ public void setGroup(SchedulerStateActions group) {
+ this.group = (SchedulerStateImpl) group;
+ }
+
+ public TaskManager getTaskManager() {
+ return group.getScheduler().getTaskManager();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java
new file mode 100644
index 0000000..ec20307
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+
+/**
+ * Creates an AM-side inventory of cluster nodes. Used to track node
+ * reservations (container allocations) to prevent requesting multiple
+ * containers on the same node. Tracks blacklisted nodes that have failed too
+ * often. Since YARN will discard our blacklist if we add to many nodes, tracks
+ * when a container is allocated on a blacklisted node and signals that the
+ * cluster is in a bad state.
+ */
+
+public class NodeInventory {
+ private static final Log LOG = LogFactory.getLog(NodeInventory.class);
+
+ /**
+ * Indicates the case in which we've failed so many nodes that YARN has
+ * cancelled some of our blacklist entries and we've received a container for
+ * a blacklisted node. At this point, we should stop adding new tasks else
+ * we'll get into a nasty loop.
+ */
+ private boolean failed;
+
+ private Map<String, String> nodeMap = new HashMap<>();
+
+ /**
+ * The set of nodes available that YARN reports are available.
+ * Not clear if these are all nodes in the cluster, or just those usable
+ * by the current app (when the app is associated to a queue that
+ * uses node labels.)
+ */
+
+ private Map<String, NodeReport> yarnNodes = new HashMap<>();
+
+ /**
+ * The set of nodes in use by Drill. Includes both nodes on which the AM
+ * has requested to run Drillbits, and those nodes found to be running
+ * "stray" Drillbits started outside of DoY.
+ */
+
+ private Set<String> nodesInUse = new HashSet<>();
+
+ /**
+ * Nodes that have failed (typically due to mis-configuration) and
+ * are to be excluded from future container requests.
+ */
+
+ private Set<String> blacklist = new HashSet<>();
+ private final AMYarnFacade yarn;
+
+ public NodeInventory(AMYarnFacade yarn) throws YarnFacadeException {
+ this.yarn = yarn;
+ buildNodeMap();
+ }
+
+ private void buildNodeMap() throws YarnFacadeException {
+ List<NodeReport> nodes = yarn.getNodeReports();
+ for (NodeReport node : nodes) {
+ String hostName = node.getNodeId().getHost();
+ nodeMap.put(hostName, node.getHttpAddress());
+ yarnNodes.put(hostName, node);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("YARN Node report");
+ for (NodeReport node : nodes) {
+ LOG.info("Node: " + node.getHttpAddress() + ", Rack: "
+ + node.getRackName() + " has " + node.getCapability().getMemory()
+ + " MB, " + node.getCapability().getVirtualCores()
+ + " vcores, labels: " + node.getNodeLabels());
+ }
+ }
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public void reserve(Container container) {
+ reserve(container.getNodeId().getHost());
+ }
+
+ public void reserve(String hostName) {
+ if (blacklist.contains(hostName)) {
+ LOG.error( "Node to be reserved is in the blacklist: " + hostName );
+ failed = true;
+ }
+ if (nodesInUse.contains(hostName)) {
+ LOG.error( "Node to be reserved is already in use: " + hostName );
+ return;
+ }
+ if (!yarnNodes.containsKey(hostName)) {
+ LOG.warn( "Node to be reserved was not in YARN node inventory: " + hostName );
+ }
+ nodesInUse.add(hostName);
+ yarn.blacklistNode(hostName);
+ }
+
+ public void release(Container container) {
+ release(container.getNodeId().getHost());
+ }
+
+ public void release(String hostName) {
+ if (!yarnNodes.containsKey(hostName)) {
+ return;
+ }
+ nodesInUse.remove(hostName);
+ yarn.removeBlacklist(hostName);
+ }
+
+ public void blacklist(String hostName) {
+ if (!yarnNodes.containsKey(hostName)) {
+ return;
+ }
+ assert !nodesInUse.contains(hostName);
+ blacklist.add(hostName);
+ yarn.blacklistNode(hostName);
+ LOG.info("Node blacklisted: " + hostName);
+ }
+
+ /**
+ * Determine the number of free nodes in the YARN cluster. The free set is the
+ * set of all YARN nodes minus those that are allocated and those that are
+ * blacklisted. Note that a node might be both in use and blacklisted if
+ * DoY blacklists a node, but then the user starts a "stray" Drillbit on
+ * that same node.
+ * <p>
+ * This number is an approximation: the set of nodes managed by YARN can
+ * change any time, and in-flight container requests will consume a node,
+ * but since the request is not yet completed, we don't know which node
+ * will be assigned, so the node does not yet appear in the in-use list.
+ *
+ * @return an approximation of the free node count
+ */
+
+ public int getFreeNodeCount() {
+ Set<String> free = new HashSet<>( );
+ free.addAll( yarnNodes.keySet() );
+ free.removeAll( nodesInUse );
+ free.removeAll( blacklist );
+ return free.size( );
+ }
+
+ /**
+ * Return a copy of the blacklist (list of failed nodes) for use in display
+ * to the user or similar purpose.
+ *
+ * @return a copy of the blacklist.
+ */
+
+ public List<String> getBlacklist() {
+ List<String> copy = new ArrayList<>( );
+ copy.addAll(blacklist);
+ return copy;
+ }
+
+ /**
+ * Report if the given host name is in use.
+ *
+ * @param hostName
+ * @return true if the host is reserved (in use by a container) or
+ * blacklisted (failed.)
+ */
+
+ public boolean isInUse(String hostName) {
+ return blacklist.contains(hostName) || nodesInUse.contains(hostName);
+ }
+}
|