drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [03/12] drill git commit: DRILL-1170: YARN integration for Drill
Date Sun, 04 Mar 2018 17:13:43 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java
new file mode 100644
index 0000000..5872ab9
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+public class NameValuePair {
+  private String name;
+  private Object value;
+
+  public NameValuePair(String name, Object value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  public String getQuotedValue() {
+    if (value == null) {
+      return "<unset>";
+    }
+    if (value instanceof String) {
+      return "\"" + value + "\"";
+    }
+    return value.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java
new file mode 100644
index 0000000..62dd468
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+public class YarnClientException extends Exception {
+  private static final long serialVersionUID = -1411110715738266578L;
+
+  public YarnClientException(String msg) {
+    super(msg);
+  }
+
+  public YarnClientException(String msg, Exception e) {
+    super(msg, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
new file mode 100644
index 0000000..8905ce3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+/**
+ * YARN resource manager client implementation for Drill. Provides a wrapper
+ * around the YARN client interface to the Resource Manager. Used by the client
+ * app to start the Drill application master.
+ * <p>
+ * Based on
+ * <a href="https://github.com/hortonworks/simple-yarn-app">simple-yarn-app</a>
+ */
+
+public class YarnRMClient {
+  private YarnConfiguration conf;
+  private YarnClient yarnClient;
+
+  /**
+   * Application ID. Semantics are such that each session of Drill-on-YARN works
+   * with no more than one application ID.
+   */
+
+  private ApplicationId appId;
+  private YarnClientApplication app;
+
+  public YarnRMClient() {
+    this(new YarnConfiguration());
+  }
+
+  public YarnRMClient(ApplicationId appId) {
+    this();
+    this.appId = appId;
+  }
+
+  public YarnRMClient(YarnConfiguration conf) {
+    this.conf = conf;
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    yarnClient.start();
+  }
+
+  public GetNewApplicationResponse createAppMaster()
+      throws YarnClientException {
+    // Create application via yarnClient
+    // Response is a new application ID along with cluster capacity info
+
+    try {
+      app = yarnClient.createApplication();
+    } catch (YarnException | IOException e) {
+      throw new YarnClientException("Create application failed", e);
+    }
+    GetNewApplicationResponse response = app.getNewApplicationResponse();
+    appId = response.getApplicationId();
+    return response;
+  }
+
+  public void submitAppMaster(AppSpec spec) throws YarnClientException {
+    if (app == null) {
+      throw new IllegalStateException("call createAppMaster( ) first");
+    }
+
+    ApplicationSubmissionContext appContext;
+    try {
+      appContext = spec.createAppLaunchContext(conf, app);
+    } catch (IOException e) {
+      throw new YarnClientException("Create app launch context failed", e);
+    }
+
+    // Submit application
+    try {
+      yarnClient.submitApplication(appContext);
+    } catch (YarnException | IOException e) {
+      throw new YarnClientException("Submit application failed", e);
+    }
+  }
+
+  public ApplicationId getAppId() {
+    return appId;
+  }
+
+  public ApplicationReport getAppReport() throws YarnClientException {
+    try {
+      return yarnClient.getApplicationReport(appId);
+    } catch (YarnException | IOException e) {
+      throw new YarnClientException("Get application report failed", e);
+    }
+  }
+
+  /**
+   * Waits for the application to start. This version is somewhat informal, the
+   * intended use is when debugging unmanaged applications.
+   *
+   * @throws YarnClientException
+   */
+  public ApplicationAttemptId waitForStart() throws YarnClientException {
+    ApplicationReport appReport;
+    YarnApplicationState appState;
+    ApplicationAttemptId attemptId;
+    for (;;) {
+      appReport = getAppReport();
+      appState = appReport.getYarnApplicationState();
+      attemptId = appReport.getCurrentApplicationAttemptId();
+      if (appState != YarnApplicationState.NEW
+          && appState != YarnApplicationState.NEW_SAVING
+          && appState != YarnApplicationState.SUBMITTED) {
+        break;
+      }
+      System.out.println("App State: " + appState);
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // Should never occur.
+      }
+    }
+    if (appState != YarnApplicationState.ACCEPTED) {
+      throw new YarnClientException(
+          "Application start failed with status " + appState);
+    }
+
+    return attemptId;
+  }
+
+  /**
+   * Wait for the application to enter one of the completion states. This is an
+   * informal implementation useful for testing.
+   *
+   * @throws YarnClientException
+   */
+
+  public void waitForCompletion() throws YarnClientException {
+    ApplicationReport appReport;
+    YarnApplicationState appState;
+    for (;;) {
+      appReport = getAppReport();
+      appState = appReport.getYarnApplicationState();
+      if (appState == YarnApplicationState.FINISHED
+          || appState == YarnApplicationState.KILLED
+          || appState == YarnApplicationState.FAILED) {
+        break;
+      }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        // Should never occur.
+      }
+    }
+
+    System.out.println("Application " + appId + " finished with" + " state "
+        + appState + " at " + appReport.getFinishTime());
+  }
+
+  public Token<AMRMTokenIdentifier> getAMRMToken() throws YarnClientException {
+    try {
+      return yarnClient.getAMRMToken(appId);
+    } catch (YarnException | IOException e) {
+      throw new YarnClientException("Get AM/RM token failed", e);
+    }
+  }
+
+  /**
+   * Return standard class path entries from the YARN application class path.
+   */
+
+  public String[] getYarnAppClassPath() {
+    return conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
+  }
+
+  public void killApplication() throws YarnClientException {
+    try {
+      yarnClient.killApplication(appId);
+    } catch (YarnException | IOException e) {
+      throw new YarnClientException(
+          "Kill failed for application: " + appId.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java
new file mode 100644
index 0000000..aaa0fff
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Holds functionality common to the Drill-on-YARN client and Application Master (AM).
+ * Includes configuration, utilities, and wrappers around various YARN data classes.
+ */
+
+package org.apache.drill.yarn.core;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java
new file mode 100644
index 0000000..170dfa8
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hosts Apache Drill under Apache Hadoop YARN. Consists of two main
+ * components as required by YARN: a client application which uses YARN to
+ * start the Drill cluster, and an Application Master (AM) which manages
+ * the cluster. The AM in turn starts, manages and stops drillbits.
+ * <p>
+ * Much of the functionality is simply plumbing to get YARN to do what is
+ * needed. The core of the AM is a "cluster controller" which starts,
+ * monitors and stops Drillbits, tracking their state transitions though
+ * the several lifecycle stages that result.
+ * <p>
+ * Note about logs here: Drill-on-YARN is a YARN application and so it
+ * uses the same logging system used by the YARN code. This is different
+ * than that used by Drill.
+ */
+
+package org.apache.drill.yarn;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java
new file mode 100644
index 0000000..9cc95e5
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+/**
+ * Register this App Master in ZK to prevent duplicates.
+ * <p>
+ * Possible enhancement is to put the registry in some well-known location, such
+ * as /drill-am,
+ */
+public class AMRegistry {
+  private static final String AM_REGISTRY = "/drill-on-yarn";
+
+  private ZKClusterCoordinator zkCoord;
+  @SuppressWarnings("unused")
+  private String amHost;
+  @SuppressWarnings("unused")
+  private int amPort;
+  @SuppressWarnings("unused")
+  private String amAppId;
+
+  private String zkRoot;
+
+  private String clusterId;
+
+  public AMRegistry(ZKClusterCoordinator zkCoord) {
+    this.zkCoord = zkCoord;
+  }
+
+  public void useLocalRegistry(String zkRoot, String clusterId) {
+    this.zkRoot = zkRoot;
+    this.clusterId = clusterId;
+  }
+
+  /**
+   * Register this AM as an ephemeral znode in ZK. The structure of ZK is as
+   * follows:
+   *
+   * <pre>
+   * /drill
+   * . &lt;cluster-id>
+   * . . &lt;Drillbit GUID> (Value is Proto-encoded drillbit info)
+   * . drill-on-yarn
+   * . . &lt;cluster-id> (value: amHost:port)
+   * </pre>
+   * <p>
+   * The structure acknowledges that the cluster-id znode may be renamed, and
+   * there may be multiple cluster IDs for a single drill root node. (Odd, but
+   * supported.) To address this, we put the AM registrations in their own
+   * (persistent) znode: drill-on-yarn. Each is keyed by the cluster ID (so we
+   * can find it), and holds the host name, HTTP port and Application ID of the
+   * AM.
+   * <p>
+   * When the AM starts, it atomically checks and sets the AM registration. If
+   * another AM already is running, then this AM will fail, displaying a log
+   * error message with the host, port and (most importantly) app ID so the user
+   * can locate the problem.
+   *
+   * @throws ZKRuntimeException
+   */
+
+  public void register(String amHost, int amPort, String amAppId)
+      throws ZKRuntimeException {
+    this.amHost = amHost;
+    this.amPort = amPort;
+    this.amAppId = amAppId;
+    try {
+
+      // The znode to hold AMs may or may not exist. Create it if missing.
+
+      try {
+        zkCoord.getCurator().create().withMode(CreateMode.PERSISTENT)
+            .forPath(AM_REGISTRY, new byte[0]);
+      } catch (NodeExistsException e) {
+        // OK
+      }
+
+      // Try to create the AM registration.
+
+      String amPath = AM_REGISTRY + "/" + clusterId;
+      String content = amHost + ":" + Integer.toString(amPort) + ":" + amAppId;
+      try {
+        zkCoord.getCurator().create().withMode(CreateMode.EPHEMERAL)
+            .forPath(amPath, content.getBytes("UTF-8"));
+      } catch (NodeExistsException e) {
+
+        // ZK says that a node exists, which means that another AM is already
+        // running.
+        // Display an error, handling the case where the AM just disappeared,
+        // the
+        // registration is badly formatted, etc.
+
+        byte data[] = zkCoord.getCurator().getData().forPath(amPath);
+        String existing;
+        if (data == null) {
+          existing = "Unknown";
+        } else {
+          String packed = new String(data, "UTF-8");
+          String unpacked[] = packed.split(":");
+          if (unpacked.length < 3) {
+            existing = packed;
+          } else {
+            existing = unpacked[0] + ", port: " + unpacked[1]
+                + ", Application ID: " + unpacked[2];
+          }
+        }
+
+        // Die with a clear (we hope!) error message.
+
+        throw new ZKRuntimeException(
+            "FAILED! An Application Master already exists for " + zkRoot + "/"
+                + clusterId + " on host: " + existing);
+      }
+    } catch (ZKRuntimeException e) {
+
+      // Something bad happened with ZK.
+
+      throw e;
+    } catch (Exception e) {
+
+      // Something bad happened with ZK.
+
+      throw new ZKRuntimeException("Failed to create AM registration node", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java
new file mode 100644
index 0000000..7c5f5f3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+import static com.google.common.base.Throwables.propagate;
+import static com.google.common.collect.Collections2.transform;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.ServiceCacheListener;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
+import org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreFactory;
+import org.apache.drill.exec.coord.zk.ZKRegistrationHandle;
+import org.apache.drill.exec.coord.zk.ZkDistributedSemaphore;
+import org.apache.drill.exec.coord.zk.ZkEphemeralStore;
+import org.apache.drill.exec.coord.zk.ZkTransientStoreFactory;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
+
+import com.google.common.base.Function;
+
+/**
+ * Manages cluster coordination utilizing zookeeper.
+ * <p>
+ * This is a clone of the Drill class
+ * org.apache.drill.exec.coord.zk.ZKClusterCoordinator with a number of
+ * modifications:
+ * <ul>
+ * <li>Removed dependency on the Drill config system. That system uses Google's
+ * Guava library version 18, which conflicts with the earlier versions used by
+ * YARN and Hadoop, which resulted in runtime undefined method exceptions.</li>
+ * <li>Instead of getting config information out of the Drill config, the
+ * parameters are instead passed directly.</li>
+ * <li>Adds support for the drillbits registered event which was neither needed
+ * nor implemented by Drill.</li>
+ * <li>Use the YARN logging system instead of Drill's.</li>
+ * </ul>
+ * <p>
+ * This class should be replaced by the Drill version if/when the Guava
+ * conflicts can be resolved (and when registered Drillbit notifications are
+ * added to the Drill version.)
+ */
+
+public class ZKClusterCoordinator extends ClusterCoordinator {
+
+  protected static final Log logger = LogFactory
+      .getLog(ZKClusterCoordinator.class);
+
+  private CuratorFramework curator;
+  private ServiceDiscovery<DrillbitEndpoint> discovery;
+  private volatile Collection<DrillbitEndpoint> endpoints = Collections
+      .emptyList();
+  private final String serviceName;
+  private final CountDownLatch initialConnection = new CountDownLatch(1);
+  private final TransientStoreFactory factory;
+  private ServiceCache<DrillbitEndpoint> serviceCache;
+
+  public ZKClusterCoordinator(String connect, String zkRoot, String clusterId,
+      int retryCount, int retryDelayMs, int connectTimeoutMs)
+      throws IOException {
+    logger.debug("ZK connect: " + connect + ", zkRoot: " + zkRoot
+        + ", clusterId: " + clusterId);
+
+    this.serviceName = clusterId;
+    RetryPolicy rp = new RetryNTimes(retryCount, retryDelayMs);
+    curator = CuratorFrameworkFactory.builder().namespace(zkRoot)
+        .connectionTimeoutMs(connectTimeoutMs).retryPolicy(rp)
+        .connectString(connect).build();
+    curator.getConnectionStateListenable()
+        .addListener(new InitialConnectionListener());
+    curator.start();
+    discovery = newDiscovery();
+    factory = CachingTransientStoreFactory
+        .of(new ZkTransientStoreFactory(curator));
+  }
+
+  public CuratorFramework getCurator() {
+    return curator;
+  }
+
+  @Override
+  public void start(long millisToWait) throws Exception {
+    logger.debug("Starting ZKClusterCoordination.");
+    discovery.start();
+
+    if (millisToWait != 0) {
+      boolean success = this.initialConnection.await(millisToWait,
+          TimeUnit.MILLISECONDS);
+      if (!success) {
+        throw new IOException(String.format(
+            "Failure to connect to the zookeeper cluster service within the allotted time of %d milliseconds.",
+            millisToWait));
+      }
+    } else {
+      this.initialConnection.await();
+    }
+
+    serviceCache = discovery.serviceCacheBuilder().name(serviceName).build();
+    serviceCache.addListener(new EndpointListener());
+    serviceCache.start();
+    updateEndpoints();
+  }
+
+  private class InitialConnectionListener implements ConnectionStateListener {
+
+    @Override
+    public void stateChanged(CuratorFramework client,
+        ConnectionState newState) {
+      if (newState == ConnectionState.CONNECTED) {
+        initialConnection.countDown();
+        client.getConnectionStateListenable().removeListener(this);
+      }
+    }
+
+  }
+
+  private class EndpointListener implements ServiceCacheListener {
+    @Override
+    public void stateChanged(CuratorFramework client,
+        ConnectionState newState) {
+    }
+
+    @Override
+    public void cacheChanged() {
+      logger.debug("Got cache changed --> updating endpoints");
+      updateEndpoints();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    // discovery attempts to close its caches(ie serviceCache) already. however,
+    // being good citizens we make sure to
+    // explicitly close serviceCache. Not only that we make sure to close
+    // serviceCache before discovery to prevent
+    // double releasing and disallowing jvm to spit bothering warnings. simply
+    // put, we are great!
+    AutoCloseables.close(serviceCache, discovery, curator, factory);
+  }
+
+  @Override
+  public RegistrationHandle register(DrillbitEndpoint data) {
+    try {
+      ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(
+          data);
+      discovery.registerService(serviceInstance);
+      return new ZKRegistrationHandle(serviceInstance.getId(), data);
+    } catch (Exception e) {
+      throw propagate(e);
+    }
+  }
+
+  @Override
+  public void unregister(RegistrationHandle handle) {
+    if (!(handle instanceof ZKRegistrationHandle)) {
+      throw new UnsupportedOperationException(
+          "Unknown handle type: " + handle.getClass().getName());
+    }
+
+    // when Drillbit is unregistered, clean all the listeners registered in CC.
+    this.listeners.clear();
+
+    ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
+    try {
+      ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance
+          .<DrillbitEndpoint> builder().address("").port(0).id(h.id)
+          .name(serviceName).build();
+      discovery.unregisterService(serviceInstance);
+    } catch (Exception e) {
+      propagate(e);
+    }
+  }
+
+  @Override
+  public Collection<DrillbitEndpoint> getAvailableEndpoints() {
+    return this.endpoints;
+  }
+
+  @Override
+  public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
+    return new ZkDistributedSemaphore(curator, "/semaphore/" + name,
+        maximumLeases);
+  }
+
+  @Override
+  public <V> TransientStore<V> getOrCreateTransientStore(
+      final TransientStoreConfig<V> config) {
+    final ZkEphemeralStore<V> store = (ZkEphemeralStore<V>) factory
+        .getOrCreateStore(config);
+    return store;
+  }
+
+  private synchronized void updateEndpoints() {
+    try {
+      Collection<DrillbitEndpoint> newDrillbitSet = transform(
+          discovery.queryForInstances(serviceName),
+          new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
+            @Override
+            public DrillbitEndpoint apply(
+                ServiceInstance<DrillbitEndpoint> input) {
+              return input.getPayload();
+            }
+          });
+
+      // set of newly dead bits : original bits - new set of active bits.
+      Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints);
+      unregisteredBits.removeAll(newDrillbitSet);
+
+      // Set of newly live bits : new set of active bits - original bits.
+      Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet);
+      registeredBits.removeAll(endpoints);
+
+      endpoints = newDrillbitSet;
+
+      if (logger.isDebugEnabled()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("Active drillbit set changed.  Now includes ");
+        builder.append(newDrillbitSet.size());
+        builder.append(" total bits.");
+        if (!newDrillbitSet.isEmpty()) {
+          builder.append(" New active drillbits: \n");
+        }
+        for (DrillbitEndpoint bit : newDrillbitSet) {
+          builder.append('\t');
+          builder.append(bit.getAddress());
+          builder.append(':');
+          builder.append(bit.getUserPort());
+          builder.append(':');
+          builder.append(bit.getControlPort());
+          builder.append(':');
+          builder.append(bit.getDataPort());
+          builder.append('\n');
+        }
+        logger.debug(builder.toString());
+      }
+
+      // Notify the drillbit listener for newly unregistered bits.
+      if (!(unregisteredBits.isEmpty())) {
+        drillbitUnregistered(unregisteredBits);
+      }
+      // Notify the drillbit listener for newly registered bits.
+      if (!(registeredBits.isEmpty())) {
+        drillbitRegistered(registeredBits);
+      }
+
+    } catch (Exception e) {
+      logger.error("Failure while update Drillbit service location cache.", e);
+    }
+  }
+
+  protected ServiceInstance<DrillbitEndpoint> newServiceInstance(
+      DrillbitEndpoint endpoint) throws Exception {
+    return ServiceInstance.<DrillbitEndpoint> builder().name(serviceName)
+        .payload(endpoint).build();
+  }
+
+  protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() {
+    return ServiceDiscoveryBuilder.builder(DrillbitEndpoint.class).basePath("/")
+        .client(curator).serializer(DrillServiceInstanceHelper.SERIALIZER)
+        .build();
+  }
+
+  @Override
+  public Collection<DrillbitEndpoint> getOnlineEndPoints() {
+
+    // Not used in DoY
+
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RegistrationHandle update(RegistrationHandle handle, State state) {
+
+    // Not used in DoY
+
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java
new file mode 100644
index 0000000..3f83ff2
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
+import org.apache.drill.yarn.appMaster.AMRegistrar;
+
+/**
+ * Driver class for the ZooKeeper cluster coordinator. Provides defaults for
+ * most options, but allows customizing each. Provides a {@link #build()} method
+ * to create <i>and start</i> the ZK service. Obtains the initial set of
+ * Drillbits (which should be empty for a YARN-defined cluster) which can be
+ * retrieved after building.
+ * <p>
+ * Maintains the ZK connection and monitors for disconnect. This class simply
+ * detects a disconnect timeout, it does not send a disconnect event itself to
+ * avoid creating a timer thread just for this purpose. Instead, the caller can
+ * poll {@link #hasFailed()}.
+ * <p>
+ * Defaults match those in Drill. (Actual Drill defaults are not yet used due to
+ * code incompatibility issues.)
+ */
+
+public class ZKClusterCoordinatorDriver implements AMRegistrar {
+  private static final Pattern ZK_COMPLEX_STRING = Pattern
+      .compile("(^.*?)/(.*)/([^/]*)$");
+
+  // Defaults are taken from java-exec's drill-module.conf
+
+  private String connect = "localhost:2181";
+  private String clusterId = "drillbits1";
+  private String zkRoot = "drill";
+  private int retryCount = 7200;
+  private int connectTimeoutMs = 5_000;
+  private int retryDelayMs = 500;
+
+  // Default timeout before we declare that ZK is down: 2 minutes.
+
+  private int failureTimeoutMs = 120_000;
+
+  // Maximum ZK startup wait defaults to 30 seconds. It is only 10 seconds
+  // in the Drill implementation.
+
+  private int maxStartWaitMs = 30_000;
+
+  // Expected ports used to match ZK registries with
+  // containers. ZK lists the ports as part of its key, we have to anticipate
+  // these values in order to match.
+
+  private int userPort = 31010;
+  private int controlPort = 31011;
+  private int dataPort = 31012;
+
+  private List<DrillbitEndpoint> initialEndpoints;
+  private ConnectionStateListener stateListener = new ConnectionStateListener() {
+
+    @Override
+    public void stateChanged(CuratorFramework client,
+        ConnectionState newState) {
+      ZKClusterCoordinatorDriver.this.stateChanged(newState);
+    }
+  };
+
+  private ZKClusterCoordinator zkCoord;
+
+  private long connectionLostTime;
+
+  private AMRegistry amRegistry;
+
+  public ZKClusterCoordinatorDriver() {
+  }
+
+  /**
+   * Specify connect string in the form: host:/zkRoot/clusterId
+   *
+   * @param connect
+   * @return
+   * @throws ZKConfigException
+   */
+  public ZKClusterCoordinatorDriver setConnect(String connect)
+      throws ZKConfigException {
+
+    // check if this is a complex zk string. If so, parse into components.
+    Matcher m = ZK_COMPLEX_STRING.matcher(connect);
+    if (!m.matches()) {
+      throw new ZKConfigException("Bad connect string: " + connect);
+    }
+    this.connect = m.group(1);
+    zkRoot = m.group(2);
+    clusterId = m.group(3);
+    return this;
+  }
+
+  public ZKClusterCoordinatorDriver setConnect(String connect, String zkRoot,
+      String clusterId) {
+    this.connect = connect;
+    this.zkRoot = zkRoot;
+    this.clusterId = clusterId;
+    return this;
+  }
+
+  public ZKClusterCoordinatorDriver setRetryCount(int n) {
+    retryCount = n;
+    return this;
+  }
+
+  public ZKClusterCoordinatorDriver setConnectTimeoutMs(int ms) {
+    connectTimeoutMs = ms;
+    return this;
+  }
+
+  public ZKClusterCoordinatorDriver setRetryDelayMs(int ms) {
+    retryDelayMs = ms;
+    return this;
+  }
+
+  public ZKClusterCoordinatorDriver setMaxStartWaitMs(int ms) {
+    maxStartWaitMs = ms;
+    return this;
+  }
+
+  public ZKClusterCoordinatorDriver setFailureTimoutMs(int ms) {
+    failureTimeoutMs = ms;
+    return this;
+  }
+
+  public ZKClusterCoordinatorDriver setPorts(int userPort, int controlPort,
+      int dataPort) {
+    this.userPort = userPort;
+    this.controlPort = controlPort;
+    this.dataPort = dataPort;
+    return this;
+  }
+
+  /**
+   * Builds and starts the ZooKeeper cluster coordinator, translating any errors
+   * that occur. After this call, the listener will start receiving messages.
+   *
+   * @return
+   * @throws ZKRuntimeException
+   *           if ZK startup fails
+   */
+  public ZKClusterCoordinatorDriver build() throws ZKRuntimeException {
+    try {
+      zkCoord = new ZKClusterCoordinator(connect, zkRoot, clusterId, retryCount,
+          retryDelayMs, connectTimeoutMs);
+    } catch (IOException e) {
+      throw new ZKRuntimeException(
+          "Failed to initialize the ZooKeeper cluster coordination", e);
+    }
+    try {
+      zkCoord.start(maxStartWaitMs);
+    } catch (Exception e) {
+      throw new ZKRuntimeException(
+          "Failed to start the ZooKeeper cluster coordination after "
+              + maxStartWaitMs + " ms.",
+          e);
+    }
+    initialEndpoints = new ArrayList<>(zkCoord.getAvailableEndpoints());
+    zkCoord.getCurator().getConnectionStateListenable()
+        .addListener(stateListener);
+    amRegistry = new AMRegistry(zkCoord);
+    amRegistry.useLocalRegistry(zkRoot, clusterId);
+    return this;
+  }
+
+  public void addDrillbitListener(DrillbitStatusListener listener) {
+    zkCoord.addDrillbitStatusListener(listener);
+  }
+
+  public void removeDrillbitListener(DrillbitStatusListener listener) {
+    zkCoord.removeDrillbitStatusListener(listener);
+  }
+
+  /**
+   * Returns the set of Drillbits registered at the time of the {@link #build()}
+   * call. Should be empty for a cluster managed by YARN.
+   *
+   * @return
+   */
+
+  public List<DrillbitEndpoint> getInitialEndpoints() {
+    return initialEndpoints;
+  }
+
+  /**
+   * Convenience method to convert a Drillbit to a string. Note that ZK does not
+   * advertise the HTTP port, so it does not appear in the generated string.
+   *
+   * @param bit
+   * @return
+   */
+
+  public static String asString(DrillbitEndpoint bit) {
+    return formatKey(bit.getAddress(), bit.getUserPort(), bit.getControlPort(),
+        bit.getDataPort());
+  }
+
+  public String toKey(String host) {
+    return formatKey(host, userPort, controlPort, dataPort);
+  }
+
+  public static String formatKey(String host, int userPort, int controlPort,
+      int dataPort) {
+    StringBuilder buf = new StringBuilder();
+    buf.append(host).append(":").append(userPort).append(':')
+        .append(controlPort).append(':').append(dataPort);
+    return buf.toString();
+  }
+
+  /**
+   * Translate ZK connection events into a connected/disconnected state along
+   * with the time of the first disconnect not followed by a connect.
+   *
+   * @param newState
+   */
+
+  protected void stateChanged(ConnectionState newState) {
+    switch (newState) {
+    case CONNECTED:
+    case READ_ONLY:
+    case RECONNECTED:
+      if (connectionLostTime != 0) {
+        ZKClusterCoordinator.logger.info("ZK connection regained");
+      }
+      connectionLostTime = 0;
+      break;
+    case LOST:
+    case SUSPENDED:
+      if (connectionLostTime == 0) {
+        ZKClusterCoordinator.logger.info("ZK connection lost");
+        connectionLostTime = System.currentTimeMillis();
+      }
+      break;
+    }
+  }
+
+  /**
+   * Reports our best guess as to whether ZK has failed. We assume ZK has failed
+   * if we received a connection lost notification without a subsequent connect
+   * notification, and we received the disconnect notification log enough ago
+   * that we assume that a timeout has occurred.
+   *
+   * @return
+   */
+
+  public boolean hasFailed() {
+    if (connectionLostTime == 0) {
+      return false;
+    }
+    return System.currentTimeMillis() - connectionLostTime > failureTimeoutMs;
+  }
+
+  public long getLostConnectionDurationMs() {
+    if (connectionLostTime == 0) {
+      return 0;
+    }
+    return System.currentTimeMillis() - connectionLostTime;
+  }
+
+  public void close() {
+    if (zkCoord == null) {
+      return;
+    }
+    zkCoord.getCurator().getConnectionStateListenable()
+        .removeListener(stateListener);
+    try {
+      zkCoord.close();
+    } catch (Exception e) {
+      ZKClusterCoordinator.logger.error("Error occurred on ZK close, ignored",
+          e);
+    }
+    zkCoord = null;
+  }
+
+  @Override
+  public void register(String amHost, int amPort, String appId)
+      throws AMRegistrationException {
+    try {
+      amRegistry.register(amHost, amPort, appId);
+    } catch (ZKRuntimeException e) {
+      throw new AMRegistrationException(e);
+    }
+  }
+
+  @Override
+  public void deregister() {
+    // Nothing to do: ZK does it for us.
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java
new file mode 100644
index 0000000..700d84b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+public class ZKConfigException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public ZKConfigException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java
new file mode 100644
index 0000000..0426578
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
+import org.apache.drill.yarn.appMaster.AMWrapperException;
+import org.apache.drill.yarn.appMaster.EventContext;
+import org.apache.drill.yarn.appMaster.Pollable;
+import org.apache.drill.yarn.appMaster.RegistryHandler;
+import org.apache.drill.yarn.appMaster.Task;
+import org.apache.drill.yarn.appMaster.TaskLifecycleListener;
+
+/**
+ * AM-specific implementation of a Drillbit registry backed by ZooKeeper.
+ * Listens to ZK events for registering a Drillbit and deregistering. Alerts the
+ * Cluster Controller of these events.
+ * <p>
+ * Locking strategy: Receives events from both ZK and the cluster controller,
+ * both of which must be synchronized. To prevent deadlocks, this class NEVER
+ * calls into the cluster controller while holding a lock. This prevents the
+ * following:
+ * <p>
+ * ClusterController --> ZKRegistry (OK) <br>
+ * ZK --> ZKRegistry (OK) <br>
+ * ZK --> ZKRegistry --> Cluster Controller (bad)
+ * <p>
+ * In the case of registration, ZK calls the registry which must alert the
+ * cluster controller. Cluster controller alerting is handled outside the ZK
+ * update critical section.
+ * <p>
+ * Because ZK events are occur relatively infrequently, any deadlock will occur
+ * once in a blue moon, which will make it very hard to reproduce. So, extreme
+ * caution is needed at design time to prevent the problem.
+ */
+
+public class ZKRegistry
+    implements TaskLifecycleListener, DrillbitStatusListener, Pollable {
+  /**
+   * State of each Drillbit that we've discovered through ZK or launched via the
+   * AM. The tracker is where we combine the ZK information with AM to correlate
+   * overall Drillbit health.
+   */
+
+  protected static class DrillbitTracker {
+    /**
+     * A Drillbit can be in one of four states.
+     */
+
+    public enum State {
+
+      /**
+       * An unmanaged Drillbit is one that has announced itself via ZK, but
+       * which the AM didn't launch (or has not yet received confirmation from
+       * YARN that it was launched.) In the normal state, this state either does
+       * not occur (YARN reports the task launch before the Drillbit registers
+       * in ZK) or is transient (if the Drillbit registers in ZK before YARN
+       * gets around to telling the AM that the Drillbit was launched.) A
+       * Drillbit that stays in the unregistered state is likely one launched
+       * outside the AM: either launched manually or (possibly), one left from a
+       * previous, failed AM run (though YARN is supposed to kill left-over
+       * child processes in that case.)
+       */
+
+      UNMANAGED,
+
+      /**
+       * A new Drillbit is one that the AM has launched, but that has not yet
+       * registered itself with ZK. This is normally a transient state that
+       * occurs as ZK registration catches up with the YARN launch notification.
+       * If a Drillbit says in this state, then something is seriously wrong
+       * (perhaps a mis-configuration). The cluster controller will patiently
+       * wait a while, then decide bad things are happening and will ask YARN to
+       * kill the Drillbit, then will retry a time or two, after which it will
+       * throw up its hands, blacklist the node, and wait for the admin to sort
+       * things out.
+       */
+
+      NEW,
+
+      /**
+       * Normal operating state: the AM launched the Drillbit, which then
+       * dutifully registered itself in ZK. Nothing to see here, move along.
+       */
+
+      REGISTERED,
+
+      /**
+       * The Drillbit was working just fine, but its registration has dropped
+       * out of ZK for a reason best left to the cluster controller to
+       * determine. Perhaps the controller has decided to kill the Drillbit.
+       * Perhaps the Drillbit became unresponsive (in which case the controller
+       * will kill it and retry) or has died (in which case YARN will alert the
+       * AM that the process exited.)
+       */
+
+      DEREGISTERED
+    }
+
+    /**
+     * The common key used between tasks and ZK registrations. The key is of the
+     * form:<br>
+     *
+     * <pre>
+     * host:port:port:port
+     * </pre>
+     */
+
+    protected final String key;
+
+    /**
+     * ZK tracking state.
+     *
+     * @see {@link State}
+     */
+
+    protected State state;
+
+    /**
+     * For Drillbits started by the AM, the task object for this Drillbit.
+     */
+
+    protected Task task;
+
+    /**
+     * For Drillbits discovered through ZK, the Drill endpoint for the Drillbit.
+     */
+
+    protected DrillbitEndpoint endpoint;
+
+    public DrillbitTracker(String key, DrillbitEndpoint endpoint) {
+      this.key = key;
+      this.state = DrillbitTracker.State.UNMANAGED;
+      this.endpoint = endpoint;
+    }
+
+    public DrillbitTracker(String key, Task task) {
+      this.key = key;
+      this.task = task;
+      state = DrillbitTracker.State.NEW;
+    }
+
+    /**
+     * Mark that a YARN-managed task has become registered in ZK. This indicates
+     * that the task has come online. Tell the task to update its state to
+     * record that the task is, in fact, registered in ZK. This indicates a
+     * normal, healthy task.
+     *
+     * @param tracker
+     */
+
+    private void becomeRegistered() {
+      state = DrillbitTracker.State.REGISTERED;
+    }
+
+    /**
+     * Mark that a YARN-managed Drillbit has dropped out of ZK.
+     *
+     * @param registryHandler
+     */
+
+    public void becomeUnregistered() {
+      assert state == DrillbitTracker.State.REGISTERED;
+      state = DrillbitTracker.State.DEREGISTERED;
+      endpoint = null;
+    }
+  }
+
+  public static final String CONTROLLER_PROPERTY = "zk";
+
+  public static final int UPDATE_PERIOD_MS = 20_000;
+
+  public static final String ENDPOINT_PROPERTY = "endpoint";
+
+  private static final Log LOG = LogFactory.getLog(ZKRegistry.class);
+
+  /**
+   * Map of host:port:port:port keys to tracking objects. Identifies the
+   * Drillbits discovered from ZK, started by the controller, or (ideally) both.
+   */
+
+  private Map<String, DrillbitTracker> registry = new HashMap<>();
+
+  /**
+   * Interface to Drill's cluster coordinator.
+   */
+
+  private ZKClusterCoordinatorDriver zkDriver;
+
+  /**
+   * Drill's cluster coordinator (or, at least, Drill-on-YARN's version of it.
+   */
+
+  private RegistryHandler registryHandler;
+
+  /**
+   * Last check of ZK status.
+   */
+
+  private long lastUpdateTime;
+
+  public ZKRegistry(ZKClusterCoordinatorDriver zkDriver) {
+    this.zkDriver = zkDriver;
+  }
+
+  /**
+   * Called during AM startup to initialize ZK. Checks if any Drillbits are
+   * already running. These are "unmanaged" because the AM could not have
+   * started them (since they predate the AM.)
+   */
+
+  public void start(RegistryHandler controller) {
+    this.registryHandler = controller;
+    try {
+      zkDriver.build();
+    } catch (ZKRuntimeException e) {
+      LOG.error("Failed to start ZK monitoring", e);
+      throw new AMWrapperException("Failed to start ZK monitoring", e);
+    }
+    for (DrillbitEndpoint dbe : zkDriver.getInitialEndpoints()) {
+      String key = toKey(dbe);
+      registry.put(key, new DrillbitTracker(key, dbe));
+
+      // Blacklist the host for each unmanaged drillbit.
+
+      controller.reserveHost(dbe.getAddress());
+
+      LOG.warn("Host " + dbe.getAddress()
+          + " already running a Drillbit outside of YARN.");
+    }
+    zkDriver.addDrillbitListener(this);
+  }
+
+  /**
+   * Convert a Drillbit endpoint to a string key used in the (zk-->task) map.
+   * Note that the string format here must match the one used in
+   * {@link #toKey(Task)} to map a task to string key.
+   *
+   * @param dbe
+   *          the Drillbit endpoint from ZK
+   * @return a string key for this object
+   */
+
+  private String toKey(DrillbitEndpoint dbe) {
+    return ZKClusterCoordinatorDriver.asString(dbe);
+  }
+
+  /**
+   * Convert a task to a string key used in the (zk-->task) map. Note that the
+   * string format here must match the one used in
+   * {@link #toKey(DrillbitEndpoint)} to map a drillbit endpoint to string key.
+   *
+   * @param task
+   *          the task tracked by the cluster controller
+   * @return a string key for this object
+   */
+
+  private String toKey(Task task) {
+    return zkDriver.toKey(task.getHostName());
+  }
+
+  // private String toKey(Container container) {
+  // return zkDriver.toKey(container.getNodeId().getHost());
+  // }
+
+  public static class AckEvent {
+    Task task;
+    DrillbitEndpoint endpoint;
+
+    public AckEvent(Task task, DrillbitEndpoint endpoint) {
+      this.task = task;
+      this.endpoint = endpoint;
+    }
+  }
+
+  /**
+   * Callback from ZK to indicate that one or more drillbits have become
+   * registered. We handle registrations in a critical section, then alert the
+   * cluster controller outside the critical section.
+   */
+
+  @Override
+  public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
+    List<AckEvent> updates = registerDrillbits(registeredDrillbits);
+    for (AckEvent event : updates) {
+      if (event.task == null) {
+        registryHandler.reserveHost(event.endpoint.getAddress());
+      } else {
+        registryHandler.startAck(event.task, ENDPOINT_PROPERTY, event.endpoint);
+      }
+    }
+  }
+
+  private synchronized List<AckEvent> registerDrillbits(
+      Set<DrillbitEndpoint> registeredDrillbits) {
+    List<AckEvent> events = new ArrayList<>();
+    for (DrillbitEndpoint dbe : registeredDrillbits) {
+      AckEvent event = drillbitRegistered(dbe);
+      if (event != null) {
+        events.add(event);
+      }
+    }
+    return events;
+  }
+
+  /**
+   * Called when a drillbit has become registered. There are two cases. Either
+   * this is a normal registration of a previously-started task, or this is an
+   * unmanaged drillbit for which we have no matching task.
+   */
+
+  private AckEvent drillbitRegistered(DrillbitEndpoint dbe) {
+    String key = toKey(dbe);
+    DrillbitTracker tracker = registry.get(key);
+    if (tracker == null) {
+      // Unmanaged drillbit case
+
+      LOG.info("Registration of unmanaged drillbit: " + key);
+      tracker = new DrillbitTracker(key, dbe);
+      registry.put(key, tracker);
+      return new AckEvent(null, dbe);
+    }
+
+    // Managed drillbit case. Might be we lost, then regained
+    // ZK connection.
+
+    if (tracker.state == DrillbitTracker.State.REGISTERED) {
+      LOG.info("Re-registration of known drillbit: " + key);
+      return null;
+    }
+
+    // Otherwise, the Drillbit has just registered with ZK.
+    // Or, if the ZK connection was lost and regained, the
+    // state changes from DEREGISTERED --> REGISTERED
+
+    LOG.info("Drillbit registered: " + key + ", task: " + tracker.task.toString() );
+    tracker.endpoint = dbe;
+    tracker.becomeRegistered();
+    return new AckEvent(tracker.task, dbe);
+  }
+
+  /**
+   * Callback from ZK to indicate that one or more drillbits have become
+   * deregistered from ZK. We handle the deregistrations in a critical section,
+   * but updates to the cluster controller outside of a critical section.
+   */
+
+  @Override
+  public void drillbitUnregistered(
+      Set<DrillbitEndpoint> unregisteredDrillbits) {
+    List<AckEvent> updates = unregisterDrillbits(unregisteredDrillbits);
+    for (AckEvent event : updates) {
+      registryHandler.completionAck(event.task, ENDPOINT_PROPERTY);
+    }
+  }
+
+  private synchronized List<AckEvent> unregisterDrillbits(
+      Set<DrillbitEndpoint> unregisteredDrillbits) {
+    List<AckEvent> events = new ArrayList<>();
+    for (DrillbitEndpoint dbe : unregisteredDrillbits) {
+      AckEvent event = drillbitUnregistered(dbe);
+      if (event != null) {
+        events.add(event);
+      }
+    }
+    return events;
+  }
+
+  /**
+   * Handle the case that a drillbit becomes unregistered. There are three
+   * cases.
+   * <ol>
+   * <li>The deregistration is for a drillbit that is not in the registry table.
+   * Indicates a code error.</li>
+   * <li>The drillbit is unmanaged. This occurs for drillbits started and
+   * stopped outside of YARN.</li>
+   * <li>Normal case of deregistration of a YARN-managed drillbit. Inform the
+   * controller of this event.</li>
+   * </ol>
+   *
+   * @param dbe
+   */
+
+  private AckEvent drillbitUnregistered(DrillbitEndpoint dbe) {
+    String key = toKey(dbe);
+    DrillbitTracker tracker = registry.get(key);
+    assert tracker != null;
+    if (tracker == null) {
+      // Something is terribly wrong.
+      // Have seen this when a user kills the Drillbit just after it starts. Evidently, the
+      // Drillbit registers with ZK just before it is killed, but before DoY hears about
+      // the registration.
+
+      LOG.error("Internal error - Unexpected drillbit unregistration: " + key);
+      return null;
+    }
+    if (tracker.state == DrillbitTracker.State.UNMANAGED) {
+      // Unmanaged drillbit
+
+      assert tracker.task == null;
+      LOG.info("Unmanaged drillbit unregistered: " + key);
+      registry.remove(key);
+      registryHandler.releaseHost(dbe.getAddress());
+      return null;
+    }
+    LOG.info("Drillbit unregistered: " + key + ", task: " + tracker.task.toString() );
+    tracker.becomeUnregistered();
+    return new AckEvent(tracker.task, dbe);
+  }
+
+  /**
+   * Listen for selected YARN task state changes. Called from within the cluster
+   * controller's critical section.
+   */
+
+  @Override
+  public synchronized void stateChange(Event event, EventContext context) {
+    switch (event) {
+    case ALLOCATED:
+      taskCreated(context.task);
+      break;
+    case ENDED:
+      taskEnded(context.task);
+      break;
+    default:
+      break;
+    }
+  }
+
+  /**
+   * Indicates that the cluster controller has created a task that we expect to
+   * be monitored by ZK. We handle two cases: the normal case in which we later
+   * receive a ZK notification. And, the unusual case in which we've already
+   * received the ZK notification and we now match that notification with this
+   * task. (The second case could occur if latency causes us to receive the ZK
+   * notification before we learn from the NM that the task is alive.)
+   *
+   * @param task
+   */
+
+  private void taskCreated(Task task) {
+    String key = toKey(task);
+    DrillbitTracker tracker = registry.get(key);
+    if (tracker == null) {
+      // Normal case: no ZK registration yet.
+
+      registry.put(key, new DrillbitTracker(key, task));
+    } else if (tracker.state == DrillbitTracker.State.UNMANAGED) {
+      // Unusual case: ZK registration came first.
+
+      LOG.info("Unmanaged drillbit became managed: " + key);
+      tracker.task = task;
+      tracker.becomeRegistered();
+
+      // Note: safe to call this here as we are already in the controller
+      // critical section.
+
+      registryHandler.startAck(task, ENDPOINT_PROPERTY, tracker.endpoint);
+    } else {
+      LOG.error(task.getLabel() + " - Drillbit registry in wrong state "
+          + tracker.state + " for new task: " + key);
+    }
+  }
+
+  /**
+   * Report whether the given task is still registered in ZK. Called while
+   * waiting for a deregistration event to catch possible cases where the
+   * messages is lost. The message should never be lost, but we've seen
+   * cases where tasks hang in this state. This is a potential work-around.
+   *
+   * @param task
+   * @return
+   */
+
+  public synchronized boolean isRegistered(Task task) {
+    String key = toKey(task);
+    DrillbitTracker tracker = registry.get(key);
+    if (tracker==null) {
+      return false;
+    }
+    return tracker.state == DrillbitTracker.State.REGISTERED;
+  }
+
+  /**
+   * Mark that a task (YARN container) has ended. Updates the (zk --> task)
+   * registry by removing the task. The cluster controller state machine
+   * monitors ZK and does not end the task until the ZK registration for that
+   * task drops. As a result, the entry here should be in the deregistered state
+   * or something is seriously wrong.
+   *
+   * @param task
+   */
+
+  private void taskEnded(Task task) {
+
+    // If the task has no host name then the task is being cancelled before
+    // a YARN container was allocated. Just ignore such a case.
+
+    if (task.getHostName() == null) {
+      return;
+    }
+    String key = toKey(task);
+    DrillbitTracker tracker = registry.get(key);
+    assert tracker != null;
+    assert tracker.state == DrillbitTracker.State.DEREGISTERED;
+    registry.remove(key);
+  }
+
+  /**
+   * Periodically check ZK status. If the ZK connection has timed out, something
+   * is very seriously wrong. Shut the whole Drill cluster down since Drill
+   * cannot operate without ZooKeeper.
+   * <p>
+   * This method should not be synchronized. It checks only the ZK state, not
+   * internal state. Further, if we do reconnect to ZK, then a ZK thread may
+   * attempt to update this registry, which will acquire a synchronization lock.
+   */
+
+  @Override
+  public void tick(long curTime) {
+    if (lastUpdateTime + UPDATE_PERIOD_MS < curTime) {
+      return;
+    }
+
+    lastUpdateTime = curTime;
+    if (zkDriver.hasFailed()) {
+      int secs = (int) ((zkDriver.getLostConnectionDurationMs() + 500) / 1000);
+      LOG.error(
+          "ZooKeeper connection lost, failing after " + secs + " seconds.");
+      registryHandler.registryDown();
+    }
+  }
+
+  public void finish(RegistryHandler handler) {
+    zkDriver.removeDrillbitListener(this);
+    zkDriver.close();
+  }
+
+  public synchronized List<String> listUnmanagedDrillits() {
+    List<String> drillbits = new ArrayList<>();
+    for (DrillbitTracker item : registry.values()) {
+      if (item.state == DrillbitTracker.State.UNMANAGED) {
+        drillbits.add(item.key);
+      }
+    }
+    return drillbits;
+  }
+
+  /**
+   * Get the current registry for testing. Why for testing? Because this is
+   * unsynchronized. In production code, the map may change out from under you.
+   *
+   * @return
+   */
+
+  protected Map<String, DrillbitTracker> getRegistryForTesting() {
+    return registry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java
new file mode 100644
index 0000000..4e1b115
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+public class ZKRuntimeException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public ZKRuntimeException(String msg, Exception e) {
+    super(msg, e);
+  }
+
+  public ZKRuntimeException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java
new file mode 100644
index 0000000..14bb427
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Interface between the Application Master and ZooKeeper. Classes here manage two
+ * registrations: Drillbits and the AM itself.
+ * <p>
+ * Drillbit registration is used to confirm that Drillbits have indeed come online.
+ * If Drillbits fail to come online, then the AM concludes that somethign went wrong.
+ * If Drilbits drop offline unexpectedly, the AM concludes that the Drillbit is sick
+ * and restarts it.
+ * <p>
+ * The AM registry prevents two AMs from attempting to manage the same cluster.
+ */
+
+package org.apache.drill.yarn.zk;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/config.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/config.ftl b/drill-yarn/src/main/resources/drill-am/config.ftl
new file mode 100644
index 0000000..8405c1f
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/config.ftl
@@ -0,0 +1,41 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+  <h4>Fully Resolved Configuration Settings</h4>
+  <p>&nbsp;
+
+  <table class="table table-hover" style="width: auto;">
+    <tr>
+      <th>Configuration Key</td>
+      <th>Value</td>
+    </tr>
+    <#list model as pair>
+      <tr>
+        <td>${pair.getName()}</td>
+        <td>${pair.getQuotedValue()}</td>
+      </tr>
+    </#list>
+  </table>
+  <p>
+  To modify these values:
+  <ol>
+  <li>Edit <code>$DRILL_SITE/drill-on-yarn.conf</code> (for the drill.yarn settings),</li>
+  <li>Edit <code>$DRILL_SITE/drill-override.conf</code> (for the drill.exec settings).</li>
+  <li>Restart your Drill cluster using the Drill-on-YARN client.</li>
+  </ol>
+</#macro>
+
+<@page_html/>

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/confirm.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/confirm.ftl b/drill-yarn/src/main/resources/drill-am/confirm.ftl
new file mode 100644
index 0000000..515293d
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/confirm.ftl
@@ -0,0 +1,70 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+  <h4><#if model.getType( ) == "STOPPED">
+  Stop Drill Cluster
+  <#else>
+  Resize Drill Cluster
+  </#if></h4>
+  <p>&nbsp;
+
+  <#if model.getType( ) == "RESIZED">
+    <div class="alert alert-success">
+      <strong>Success!</strong> Cluster resizing to ${model.getValue( )} nodes.
+    </div>
+  <#elseif model.getType( ) == "CANCELLED">
+    <div class="alert alert-info">
+       <strong>Success!</strong> Drillbit ${model.getValue( )} was cancelled.
+    </div>
+  <#elseif model.getType( ) == "NULL_RESIZE">
+    <div class="alert alert-info">
+      <strong>Note:</strong> The new size of ${model.getValue( )} is the
+      same as the current cluster size.
+    </div>
+  <#elseif model.getType( ) == "INVALID_RESIZE">
+    <div class="alert alert-danger">
+      <strong>Error!</strong> Invalid cluster resize level: ${model.getValue( )}.
+      Please <a href="/manage">try again</a>.
+    </div>
+  <#elseif model.getType( ) == "INVALID_GROW">
+    <div class="alert alert-danger">
+      <strong>Error!</strong> Invalid cluster grow amount: ${model.getValue( )}.
+      Please <a href="/manage">try again</a>.
+    </div>
+  <#elseif model.getType( ) == "INVALID_SHRINK">
+    <div class="alert alert-danger">
+      <strong>Error!</strong> Invalid cluster shrink amount: ${model.getValue( )}.
+      Please <a href="/manage">try again</a>.
+    </div>
+  <#elseif model.getType( ) == "INVALID_TASK">
+    <div class="alert alert-danger">
+      <strong>Error!</strong> Invalid Drillbit ID: ${model.getValue( )}.
+      Perhaps the Drillbit has already stopped.
+    </div>
+  <#elseif model.getType( ) == "STOPPED">
+    <div class="alert alert alert-success">
+      <strong>Success!</strong> Cluster is shutting down.
+    </div>
+    Pages on this site will be unavailable until the cluster restarts.
+  </#if>
+  <#if model.getType( ) == "CANCELLED">
+    Return to the <a href="/drillbits">Drillbits page</a>.
+  <#elseif model.getType( ) != "STOPPED">
+    Return to the <a href="/manage">Management page</a>.
+  </#if>
+</#macro>
+
+<@page_html/>

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/generic.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/generic.ftl b/drill-yarn/src/main/resources/drill-am/generic.ftl
new file mode 100644
index 0000000..b76a917
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/generic.ftl
@@ -0,0 +1,78 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<#-- Adapted from the Drill generic.ftl, adjusted for use in the AM. -->
+
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+</#macro>
+
+<#macro page_html>
+      <meta http-equiv="X-UA-Compatible" content="IE=edge">
+
+      <title>Apache Drill - Application Master</title>
+      <link rel="shortcut icon" href="/static/img/drill.ico">
+
+      <link href="/static/css/bootstrap.min.css" rel="stylesheet">
+      <link href="/drill-am/static/css/drill-am.css" rel="stylesheet">
+
+      <script src="/static/js/jquery.min.js"></script>
+      <script src="/static/js/bootstrap.min.js"></script>
+
+      <!-- HTML5 shim and Respond.js IE8 support of HTML5 elements and media queries -->
+      <!--[if lt IE 9]>
+        <script src="/static/js/html5shiv.js"></script>
+        <script src="/static/js/1.4.2/respond.min.js"></script>
+      <![endif]-->
+
+      <@page_head/>
+    </head>
+    <body role="document">
+      <div class="navbar navbar-inverse navbar-fixed-top" role="navigation">
+        <div class="container-fluid">
+          <div class="navbar-header">
+            <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
+              <span class="sr-only">Toggle navigation</span>
+              <span class="icon-bar"></span>
+              <span class="icon-bar"></span>
+              <span class="icon-bar"></span>
+            </button>
+            <a class="navbar-brand" href="/">Apache Drill</a>
+          </div>
+          <div class="navbar-collapse collapse">
+            <ul class="nav navbar-nav">
+              <li><a href="/config">Configuration</a></li>
+              <li><a href="/drillbits">Drillbits</a></li>
+              <li><a href="/manage">Manage</a></li>
+              <li><a href="/history">History</a></li>
+            </ul>
+            <ul class="nav navbar-nav navbar-right">
+              <li><a href="${docsLink}">Documentation</a>
+              <#if showLogin == true >
+              <li><a href="/login">Log In</a>
+              </#if>
+              <#if showLogout == true >
+              <li><a href="/logout">Log Out (${loggedInUserName})</a>
+             </#if>
+            </ul>
+          </div>
+        </div>
+      </div>
+
+      <div class="container-fluid drill-am" role="main">
+        <h3>YARN Application Master &ndash; ${clusterName}</h3>
+        <@page_body/>
+      </div>
+    </body>
+  </html>
+</#macro>

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/history.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/history.ftl b/drill-yarn/src/main/resources/drill-am/history.ftl
new file mode 100644
index 0000000..c588d06
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/history.ftl
@@ -0,0 +1,59 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+  <meta http-equiv="refresh" content="${refreshSecs}" >
+</#macro>
+
+<#macro page_body>
+  <h4>Drillbit History</h4>
+  <p>&nbsp;
+
+  <div class="table-responsive">
+    <table class="table table-hover">
+      <tr>
+        <th>ID</th>
+        <th>Try</th>
+        <th>Pool</th>
+        <th>Host</th>
+        <th>Container</th>
+        <th>Memory (MB)</th>
+        <th>Virtual Cores</th>
+        <th>Start Time</th>
+        <th>End Time</th>
+        <th>Disposition</th>
+      </th>
+       <#assign count=0>
+       <#list model as task>
+        <#assign count=count+1>
+        <tr>
+          <td><b>${task.getTaskId( )}</b></td>
+          <td>${task.getTryCount( )}</td>
+          <td>${task.getGroupName( )}</td>
+          <td><#if task.hasContainer( )><a href="${task.getNmLink( )}">${task.getHost( )}</a>
+          <#else>&nbsp;</#if></td>
+          <td>${task.getContainerId()}</td>
+          <td>${task.getMemory( )}</td>
+          <td>${task.getVcores( )}</td>
+          <td>${task.getStartTime( )}</td>
+          <td>${task.getEndTime( )}</td>
+          <td>${task.getDisposition( )}</td>
+        </tr>
+      </#list>
+    </table>
+    <#if count == 0>
+    No drillbits have completed.
+    </#if>
+  </div>
+</#macro>
+
+<@page_html/>

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/index.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/index.ftl b/drill-yarn/src/main/resources/drill-am/index.ftl
new file mode 100644
index 0000000..18d6ab5
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/index.ftl
@@ -0,0 +1,128 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+  <meta http-equiv="refresh" content="${model.getRefreshSecs( )}" >
+</#macro>
+
+<#macro page_body>
+  <h4>Drill Cluster Status</h4>
+
+  <table class="table table-hover" style="width: auto;">
+    <tr>
+      <td>YARN Application ID:</td>
+      <td><a href="${model.getRmAppLink( )}" data-toggle="tooltip" title="YARN Resource Manager page for this application">
+      ${model.getAppId( )}</a></td>
+    </tr>
+    <tr>
+      <td>YARN Resource Manager:</td>
+      <td><#if model.getRmLink()?? > <#-- Occurs early in startup before app is fully registered. -->
+      <a href="${model.getRmLink( )}" data-toggle="tooltip" title="YARN Resource Manager page for this container">
+      ${model.getRmHost( )}</a>
+      <#else>Unavailable
+      </#if></td>
+    </tr>
+    <tr>
+      <td>YARN Node Manager for AM:</td>
+      <td><#if model.getNmLink()?? > <#-- Occurs early in startup before app is fully registered. -->
+      <a href="${model.getNmLink( )}" data-toggle="tooltip" title="YARN Node Manager">
+      ${model.getNmHost( )}</a> |
+          <a href="${model.getNmAppLink( )}" data-toggle="tooltip" title="YARN Node Manager page for this application">App info</a>
+      <#else>Unavailable
+      </#if></td>
+    </tr>
+    <tr>
+      <td>ZooKeeper Hosts:</td>
+      <td><span data-toggle="tooltip" title="ZooKeeper connection string.">
+      ${model.getZkConnectionStr( )}</span></td>
+    </tr>
+    <tr>
+      <td>ZooKeeper Root:</td>
+      <td><span data-toggle="tooltip" title="ZooKeeper Drill root.">
+      ${model.getZkRoot( )}</span></td>
+    </tr>
+    <tr>
+      <td>ZooKeeper Cluster ID:</td>
+      <td><span data-toggle="tooltip" title="ZooKeeper Drill cluster-id.">
+      ${model.getZkClusterId( )}</span></td>
+    </tr>
+    <tr>
+      <td>State:</td>
+      <td><span data-toggle="tooltip" title="${model.getStateHint( )}">
+      ${model.getState( )}</span></td>
+    </tr>
+    <tr>
+      <td>Target Drillbit Count:</td>
+      <td>${model.getTargetCount( )}</td>
+    </tr>
+    <tr>
+      <td>Live Drillbit Count:</td>
+      <td>${model.getLiveCount( )}</td>
+    </tr>
+    <#if model.getUnmanagedCount( ) gt 0 >
+      <tr>
+        <td style="color: red">Unmanaged Drillbit Count:</td>
+        <td>${model.getUnmanagedCount( )}</td>
+      </tr>
+    </#if>
+    <#if model.getBlacklistCount( ) gt 0 >
+      <tr>
+        <td style="color: red">Blacklisted Node Count:</td>
+        <td>${model.getBlacklistCount( )}</td>
+      </tr>
+    </#if>
+    <tr>
+      <td>Total Drill Virtual Cores:</td>
+      <td>${model.getDrillTotalVcores( )}</td>
+    </tr>
+    <tr>
+      <td>Total Drill Memory (MB):</td>
+      <td>${model.getDrillTotalMemory( )}</td>
+    </tr>
+    <#if model.supportsDiskResource( ) >
+      <tr>
+        <td>Total Drill Disks:</td>
+        <td>${model.getDrillTotalDisks( )}</td>
+      </tr>
+    </#if>
+  </table>
+  <table class="table table-hover" style="width: auto;">
+    <tr>
+      <th>Group</th>
+      <th>Type</th>
+      <th>Target Drillbit Count</th>
+      <th>Total Drillbits</th>
+      <th>Live Drillbits</th>
+      <th>Memory per Drillbit (MB)</th>
+      <th>VCores per Drillbit</th>
+      <#if model.supportsDiskResource( ) >
+        <th>Disks per Drillbit</th>
+      </#if>
+    </tr>
+    <#list model.getGroups( ) as group>
+      <tr>
+        <td>${group.getName( )}</td>
+        <td>${group.getType( )}</td>
+        <td>${group.getTargetCount( )}</td>
+        <td>${group.getTaskCount( )}</td>
+        <td>${group.getLiveCount( )}</td>
+        <td>${group.getMemory( )}</td>
+        <td>${group.getVcores( )}</td>
+        <#if model.supportsDiskResource( ) >
+          <td>${group.getDisks( )}</td>
+        </#if>
+      </tr>
+    </#list>
+  </table>
+</#macro>
+
+<@page_html/>

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/login.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/login.ftl b/drill-yarn/src/main/resources/drill-am/login.ftl
new file mode 100644
index 0000000..036229e
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/login.ftl
@@ -0,0 +1,35 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+  <div align="center" class="table-responsive">
+    <form role="form" name="input" action="/j_security_check" method="POST">
+      <fieldset>
+        <div class="form-group">
+          <img src="/drill-am/static/img/apache-drill-logo.png" alt="Apache Drill Logo">
+          <#if model??>
+          <div class="alert alert-danger">
+            <strong>Error</strong> ${model}
+          </div>
+          </#if>
+          <p><input type="text" size="30" name="j_username" placeholder="Username"></p>
+          <p><input type="password" size="30" name="j_password" placeholder="Password"></p>
+          <p><button type="submit" class="btn btn-primary">Log In</button></p>
+        </div>
+      </fieldset>
+    </form>
+  </div>
+</#macro>
+<@page_html/>
\ No newline at end of file


Mime
View raw message