http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java
new file mode 100644
index 0000000..5872ab9
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/NameValuePair.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+public class NameValuePair {
+ private String name;
+ private Object value;
+
+ public NameValuePair(String name, Object value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public String getQuotedValue() {
+ if (value == null) {
+ return "<unset>";
+ }
+ if (value instanceof String) {
+ return "\"" + value + "\"";
+ }
+ return value.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java
new file mode 100644
index 0000000..62dd468
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnClientException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+public class YarnClientException extends Exception {
+ private static final long serialVersionUID = -1411110715738266578L;
+
+ public YarnClientException(String msg) {
+ super(msg);
+ }
+
+ public YarnClientException(String msg, Exception e) {
+ super(msg, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
new file mode 100644
index 0000000..8905ce3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+/**
+ * YARN resource manager client implementation for Drill. Provides a wrapper
+ * around the YARN client interface to the Resource Manager. Used by the client
+ * app to start the Drill application master.
+ * <p>
+ * Based on
+ * <a href="https://github.com/hortonworks/simple-yarn-app">simple-yarn-app</a>
+ */
+
+public class YarnRMClient {
+ private YarnConfiguration conf;
+ private YarnClient yarnClient;
+
+ /**
+ * Application ID. Semantics are such that each session of Drill-on-YARN works
+ * with no more than one application ID.
+ */
+
+ private ApplicationId appId;
+ private YarnClientApplication app;
+
+ public YarnRMClient() {
+ this(new YarnConfiguration());
+ }
+
+ public YarnRMClient(ApplicationId appId) {
+ this();
+ this.appId = appId;
+ }
+
+ public YarnRMClient(YarnConfiguration conf) {
+ this.conf = conf;
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ }
+
+ public GetNewApplicationResponse createAppMaster()
+ throws YarnClientException {
+ // Create application via yarnClient
+ // Response is a new application ID along with cluster capacity info
+
+ try {
+ app = yarnClient.createApplication();
+ } catch (YarnException | IOException e) {
+ throw new YarnClientException("Create application failed", e);
+ }
+ GetNewApplicationResponse response = app.getNewApplicationResponse();
+ appId = response.getApplicationId();
+ return response;
+ }
+
+ public void submitAppMaster(AppSpec spec) throws YarnClientException {
+ if (app == null) {
+ throw new IllegalStateException("call createAppMaster( ) first");
+ }
+
+ ApplicationSubmissionContext appContext;
+ try {
+ appContext = spec.createAppLaunchContext(conf, app);
+ } catch (IOException e) {
+ throw new YarnClientException("Create app launch context failed", e);
+ }
+
+ // Submit application
+ try {
+ yarnClient.submitApplication(appContext);
+ } catch (YarnException | IOException e) {
+ throw new YarnClientException("Submit application failed", e);
+ }
+ }
+
+ public ApplicationId getAppId() {
+ return appId;
+ }
+
+ public ApplicationReport getAppReport() throws YarnClientException {
+ try {
+ return yarnClient.getApplicationReport(appId);
+ } catch (YarnException | IOException e) {
+ throw new YarnClientException("Get application report failed", e);
+ }
+ }
+
+ /**
+ * Waits for the application to start. This version is somewhat informal, the
+ * intended use is when debugging unmanaged applications.
+ *
+ * @throws YarnClientException
+ */
+ public ApplicationAttemptId waitForStart() throws YarnClientException {
+ ApplicationReport appReport;
+ YarnApplicationState appState;
+ ApplicationAttemptId attemptId;
+ for (;;) {
+ appReport = getAppReport();
+ appState = appReport.getYarnApplicationState();
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ if (appState != YarnApplicationState.NEW
+ && appState != YarnApplicationState.NEW_SAVING
+ && appState != YarnApplicationState.SUBMITTED) {
+ break;
+ }
+ System.out.println("App State: " + appState);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Should never occur.
+ }
+ }
+ if (appState != YarnApplicationState.ACCEPTED) {
+ throw new YarnClientException(
+ "Application start failed with status " + appState);
+ }
+
+ return attemptId;
+ }
+
+ /**
+ * Wait for the application to enter one of the completion states. This is an
+ * informal implementation useful for testing.
+ *
+ * @throws YarnClientException
+ */
+
+ public void waitForCompletion() throws YarnClientException {
+ ApplicationReport appReport;
+ YarnApplicationState appState;
+ for (;;) {
+ appReport = getAppReport();
+ appState = appReport.getYarnApplicationState();
+ if (appState == YarnApplicationState.FINISHED
+ || appState == YarnApplicationState.KILLED
+ || appState == YarnApplicationState.FAILED) {
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // Should never occur.
+ }
+ }
+
+ System.out.println("Application " + appId + " finished with" + " state "
+ + appState + " at " + appReport.getFinishTime());
+ }
+
+ public Token<AMRMTokenIdentifier> getAMRMToken() throws YarnClientException {
+ try {
+ return yarnClient.getAMRMToken(appId);
+ } catch (YarnException | IOException e) {
+ throw new YarnClientException("Get AM/RM token failed", e);
+ }
+ }
+
+ /**
+ * Return standard class path entries from the YARN application class path.
+ */
+
+ public String[] getYarnAppClassPath() {
+ return conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
+ }
+
+ public void killApplication() throws YarnClientException {
+ try {
+ yarnClient.killApplication(appId);
+ } catch (YarnException | IOException e) {
+ throw new YarnClientException(
+ "Kill failed for application: " + appId.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java
new file mode 100644
index 0000000..aaa0fff
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Holds functionality common to the Drill-on-YARN client and Application Master (AM).
+ * Includes configuration, utilities, and wrappers around various YARN data classes.
+ */
+
+package org.apache.drill.yarn.core;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java
new file mode 100644
index 0000000..170dfa8
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hosts Apache Drill under Apache Hadoop YARN. Consists of two main
+ * components as required by YARN: a client application which uses YARN to
+ * start the Drill cluster, and an Application Master (AM) which manages
+ * the cluster. The AM in turn starts, manages and stops drillbits.
+ * <p>
+ * Much of the functionality is simply plumbing to get YARN to do what is
+ * needed. The core of the AM is a "cluster controller" which starts,
+ * monitors and stops Drillbits, tracking their state transitions though
+ * the several lifecycle stages that result.
+ * <p>
+ * Note about logs here: Drill-on-YARN is a YARN application and so it
+ * uses the same logging system used by the YARN code. This is different
+ * than that used by Drill.
+ */
+
+package org.apache.drill.yarn;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java
new file mode 100644
index 0000000..9cc95e5
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/AMRegistry.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+/**
+ * Register this App Master in ZK to prevent duplicates.
+ * <p>
+ * Possible enhancement is to put the registry in some well-known location, such
+ * as /drill-am,
+ */
+public class AMRegistry {
+ private static final String AM_REGISTRY = "/drill-on-yarn";
+
+ private ZKClusterCoordinator zkCoord;
+ @SuppressWarnings("unused")
+ private String amHost;
+ @SuppressWarnings("unused")
+ private int amPort;
+ @SuppressWarnings("unused")
+ private String amAppId;
+
+ private String zkRoot;
+
+ private String clusterId;
+
+ public AMRegistry(ZKClusterCoordinator zkCoord) {
+ this.zkCoord = zkCoord;
+ }
+
+ public void useLocalRegistry(String zkRoot, String clusterId) {
+ this.zkRoot = zkRoot;
+ this.clusterId = clusterId;
+ }
+
+ /**
+ * Register this AM as an ephemeral znode in ZK. The structure of ZK is as
+ * follows:
+ *
+ * <pre>
+ * /drill
+ * . <cluster-id>
+ * . . <Drillbit GUID> (Value is Proto-encoded drillbit info)
+ * . drill-on-yarn
+ * . . <cluster-id> (value: amHost:port)
+ * </pre>
+ * <p>
+ * The structure acknowledges that the cluster-id znode may be renamed, and
+ * there may be multiple cluster IDs for a single drill root node. (Odd, but
+ * supported.) To address this, we put the AM registrations in their own
+ * (persistent) znode: drill-on-yarn. Each is keyed by the cluster ID (so we
+ * can find it), and holds the host name, HTTP port and Application ID of the
+ * AM.
+ * <p>
+ * When the AM starts, it atomically checks and sets the AM registration. If
+ * another AM already is running, then this AM will fail, displaying a log
+ * error message with the host, port and (most importantly) app ID so the user
+ * can locate the problem.
+ *
+ * @throws ZKRuntimeException
+ */
+
+ public void register(String amHost, int amPort, String amAppId)
+ throws ZKRuntimeException {
+ this.amHost = amHost;
+ this.amPort = amPort;
+ this.amAppId = amAppId;
+ try {
+
+ // The znode to hold AMs may or may not exist. Create it if missing.
+
+ try {
+ zkCoord.getCurator().create().withMode(CreateMode.PERSISTENT)
+ .forPath(AM_REGISTRY, new byte[0]);
+ } catch (NodeExistsException e) {
+ // OK
+ }
+
+ // Try to create the AM registration.
+
+ String amPath = AM_REGISTRY + "/" + clusterId;
+ String content = amHost + ":" + Integer.toString(amPort) + ":" + amAppId;
+ try {
+ zkCoord.getCurator().create().withMode(CreateMode.EPHEMERAL)
+ .forPath(amPath, content.getBytes("UTF-8"));
+ } catch (NodeExistsException e) {
+
+ // ZK says that a node exists, which means that another AM is already
+ // running.
+ // Display an error, handling the case where the AM just disappeared,
+ // the
+ // registration is badly formatted, etc.
+
+ byte data[] = zkCoord.getCurator().getData().forPath(amPath);
+ String existing;
+ if (data == null) {
+ existing = "Unknown";
+ } else {
+ String packed = new String(data, "UTF-8");
+ String unpacked[] = packed.split(":");
+ if (unpacked.length < 3) {
+ existing = packed;
+ } else {
+ existing = unpacked[0] + ", port: " + unpacked[1]
+ + ", Application ID: " + unpacked[2];
+ }
+ }
+
+ // Die with a clear (we hope!) error message.
+
+ throw new ZKRuntimeException(
+ "FAILED! An Application Master already exists for " + zkRoot + "/"
+ + clusterId + " on host: " + existing);
+ }
+ } catch (ZKRuntimeException e) {
+
+ // Something bad happened with ZK.
+
+ throw e;
+ } catch (Exception e) {
+
+ // Something bad happened with ZK.
+
+ throw new ZKRuntimeException("Failed to create AM registration node", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java
new file mode 100644
index 0000000..7c5f5f3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinator.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+import static com.google.common.base.Throwables.propagate;
+import static com.google.common.collect.Collections2.transform;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.ServiceCacheListener;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
+import org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreFactory;
+import org.apache.drill.exec.coord.zk.ZKRegistrationHandle;
+import org.apache.drill.exec.coord.zk.ZkDistributedSemaphore;
+import org.apache.drill.exec.coord.zk.ZkEphemeralStore;
+import org.apache.drill.exec.coord.zk.ZkTransientStoreFactory;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
+
+import com.google.common.base.Function;
+
+/**
+ * Manages cluster coordination utilizing zookeeper.
+ * <p>
+ * This is a clone of the Drill class
+ * org.apache.drill.exec.coord.zk.ZKClusterCoordinator with a number of
+ * modifications:
+ * <ul>
+ * <li>Removed dependency on the Drill config system. That system uses Google's
+ * Guava library version 18, which conflicts with the earlier versions used by
+ * YARN and Hadoop, which resulted in runtime undefined method exceptions.</li>
+ * <li>Instead of getting config information out of the Drill config, the
+ * parameters are instead passed directly.</li>
+ * <li>Adds support for the drillbits registered event which was neither needed
+ * nor implemented by Drill.</li>
+ * <li>Use the YARN logging system instead of Drill's.</li>
+ * </ul>
+ * <p>
+ * This class should be replaced by the Drill version if/when the Guava
+ * conflicts can be resolved (and when registered Drillbit notifications are
+ * added to the Drill version.)
+ */
+
+public class ZKClusterCoordinator extends ClusterCoordinator {
+
+ protected static final Log logger = LogFactory
+ .getLog(ZKClusterCoordinator.class);
+
+ private CuratorFramework curator;
+ private ServiceDiscovery<DrillbitEndpoint> discovery;
+ private volatile Collection<DrillbitEndpoint> endpoints = Collections
+ .emptyList();
+ private final String serviceName;
+ private final CountDownLatch initialConnection = new CountDownLatch(1);
+ private final TransientStoreFactory factory;
+ private ServiceCache<DrillbitEndpoint> serviceCache;
+
+ public ZKClusterCoordinator(String connect, String zkRoot, String clusterId,
+ int retryCount, int retryDelayMs, int connectTimeoutMs)
+ throws IOException {
+ logger.debug("ZK connect: " + connect + ", zkRoot: " + zkRoot
+ + ", clusterId: " + clusterId);
+
+ this.serviceName = clusterId;
+ RetryPolicy rp = new RetryNTimes(retryCount, retryDelayMs);
+ curator = CuratorFrameworkFactory.builder().namespace(zkRoot)
+ .connectionTimeoutMs(connectTimeoutMs).retryPolicy(rp)
+ .connectString(connect).build();
+ curator.getConnectionStateListenable()
+ .addListener(new InitialConnectionListener());
+ curator.start();
+ discovery = newDiscovery();
+ factory = CachingTransientStoreFactory
+ .of(new ZkTransientStoreFactory(curator));
+ }
+
+ public CuratorFramework getCurator() {
+ return curator;
+ }
+
+ @Override
+ public void start(long millisToWait) throws Exception {
+ logger.debug("Starting ZKClusterCoordination.");
+ discovery.start();
+
+ if (millisToWait != 0) {
+ boolean success = this.initialConnection.await(millisToWait,
+ TimeUnit.MILLISECONDS);
+ if (!success) {
+ throw new IOException(String.format(
+ "Failure to connect to the zookeeper cluster service within the allotted time of %d milliseconds.",
+ millisToWait));
+ }
+ } else {
+ this.initialConnection.await();
+ }
+
+ serviceCache = discovery.serviceCacheBuilder().name(serviceName).build();
+ serviceCache.addListener(new EndpointListener());
+ serviceCache.start();
+ updateEndpoints();
+ }
+
+ private class InitialConnectionListener implements ConnectionStateListener {
+
+ @Override
+ public void stateChanged(CuratorFramework client,
+ ConnectionState newState) {
+ if (newState == ConnectionState.CONNECTED) {
+ initialConnection.countDown();
+ client.getConnectionStateListenable().removeListener(this);
+ }
+ }
+
+ }
+
+ private class EndpointListener implements ServiceCacheListener {
+ @Override
+ public void stateChanged(CuratorFramework client,
+ ConnectionState newState) {
+ }
+
+ @Override
+ public void cacheChanged() {
+ logger.debug("Got cache changed --> updating endpoints");
+ updateEndpoints();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // discovery attempts to close its caches(ie serviceCache) already. however,
+ // being good citizens we make sure to
+ // explicitly close serviceCache. Not only that we make sure to close
+ // serviceCache before discovery to prevent
+ // double releasing and disallowing jvm to spit bothering warnings. simply
+ // put, we are great!
+ AutoCloseables.close(serviceCache, discovery, curator, factory);
+ }
+
+ @Override
+ public RegistrationHandle register(DrillbitEndpoint data) {
+ try {
+ ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(
+ data);
+ discovery.registerService(serviceInstance);
+ return new ZKRegistrationHandle(serviceInstance.getId(), data);
+ } catch (Exception e) {
+ throw propagate(e);
+ }
+ }
+
+ @Override
+ public void unregister(RegistrationHandle handle) {
+ if (!(handle instanceof ZKRegistrationHandle)) {
+ throw new UnsupportedOperationException(
+ "Unknown handle type: " + handle.getClass().getName());
+ }
+
+ // when Drillbit is unregistered, clean all the listeners registered in CC.
+ this.listeners.clear();
+
+ ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
+ try {
+ ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance
+ .<DrillbitEndpoint> builder().address("").port(0).id(h.id)
+ .name(serviceName).build();
+ discovery.unregisterService(serviceInstance);
+ } catch (Exception e) {
+ propagate(e);
+ }
+ }
+
+ @Override
+ public Collection<DrillbitEndpoint> getAvailableEndpoints() {
+ return this.endpoints;
+ }
+
+ @Override
+ public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
+ return new ZkDistributedSemaphore(curator, "/semaphore/" + name,
+ maximumLeases);
+ }
+
+ @Override
+ public <V> TransientStore<V> getOrCreateTransientStore(
+ final TransientStoreConfig<V> config) {
+ final ZkEphemeralStore<V> store = (ZkEphemeralStore<V>) factory
+ .getOrCreateStore(config);
+ return store;
+ }
+
+ private synchronized void updateEndpoints() {
+ try {
+ Collection<DrillbitEndpoint> newDrillbitSet = transform(
+ discovery.queryForInstances(serviceName),
+ new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
+ @Override
+ public DrillbitEndpoint apply(
+ ServiceInstance<DrillbitEndpoint> input) {
+ return input.getPayload();
+ }
+ });
+
+ // set of newly dead bits : original bits - new set of active bits.
+ Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints);
+ unregisteredBits.removeAll(newDrillbitSet);
+
+ // Set of newly live bits : new set of active bits - original bits.
+ Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet);
+ registeredBits.removeAll(endpoints);
+
+ endpoints = newDrillbitSet;
+
+ if (logger.isDebugEnabled()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Active drillbit set changed. Now includes ");
+ builder.append(newDrillbitSet.size());
+ builder.append(" total bits.");
+ if (!newDrillbitSet.isEmpty()) {
+ builder.append(" New active drillbits: \n");
+ }
+ for (DrillbitEndpoint bit : newDrillbitSet) {
+ builder.append('\t');
+ builder.append(bit.getAddress());
+ builder.append(':');
+ builder.append(bit.getUserPort());
+ builder.append(':');
+ builder.append(bit.getControlPort());
+ builder.append(':');
+ builder.append(bit.getDataPort());
+ builder.append('\n');
+ }
+ logger.debug(builder.toString());
+ }
+
+ // Notify the drillbit listener for newly unregistered bits.
+ if (!(unregisteredBits.isEmpty())) {
+ drillbitUnregistered(unregisteredBits);
+ }
+ // Notify the drillbit listener for newly registered bits.
+ if (!(registeredBits.isEmpty())) {
+ drillbitRegistered(registeredBits);
+ }
+
+ } catch (Exception e) {
+ logger.error("Failure while update Drillbit service location cache.", e);
+ }
+ }
+
+ protected ServiceInstance<DrillbitEndpoint> newServiceInstance(
+ DrillbitEndpoint endpoint) throws Exception {
+ return ServiceInstance.<DrillbitEndpoint> builder().name(serviceName)
+ .payload(endpoint).build();
+ }
+
+ protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() {
+ return ServiceDiscoveryBuilder.builder(DrillbitEndpoint.class).basePath("/")
+ .client(curator).serializer(DrillServiceInstanceHelper.SERIALIZER)
+ .build();
+ }
+
+ @Override
+ public Collection<DrillbitEndpoint> getOnlineEndPoints() {
+
+ // Not used in DoY
+
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RegistrationHandle update(RegistrationHandle handle, State state) {
+
+ // Not used in DoY
+
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java
new file mode 100644
index 0000000..3f83ff2
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKClusterCoordinatorDriver.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
+import org.apache.drill.yarn.appMaster.AMRegistrar;
+
+/**
+ * Driver class for the ZooKeeper cluster coordinator. Provides defaults for
+ * most options, but allows customizing each. Provides a {@link #build()} method
+ * to create <i>and start</i> the ZK service. Obtains the initial set of
+ * Drillbits (which should be empty for a YARN-defined cluster) which can be
+ * retrieved after building.
+ * <p>
+ * Maintains the ZK connection and monitors for disconnect. This class simply
+ * detects a disconnect timeout, it does not send a disconnect event itself to
+ * avoid creating a timer thread just for this purpose. Instead, the caller can
+ * poll {@link #hasFailed()}.
+ * <p>
+ * Defaults match those in Drill. (Actual Drill defaults are not yet used due to
+ * code incompatibility issues.)
+ */
+
+public class ZKClusterCoordinatorDriver implements AMRegistrar {
+ private static final Pattern ZK_COMPLEX_STRING = Pattern
+ .compile("(^.*?)/(.*)/([^/]*)$");
+
+ // Defaults are taken from java-exec's drill-module.conf
+
+ private String connect = "localhost:2181";
+ private String clusterId = "drillbits1";
+ private String zkRoot = "drill";
+ private int retryCount = 7200;
+ private int connectTimeoutMs = 5_000;
+ private int retryDelayMs = 500;
+
+ // Default timeout before we declare that ZK is down: 2 minutes.
+
+ private int failureTimeoutMs = 120_000;
+
+ // Maximum ZK startup wait defaults to 30 seconds. It is only 10 seconds
+ // in the Drill implementation.
+
+ private int maxStartWaitMs = 30_000;
+
+ // Expected ports used to match ZK registries with
+ // containers. ZK lists the ports as part of its key, we have to anticipate
+ // these values in order to match.
+
+ private int userPort = 31010;
+ private int controlPort = 31011;
+ private int dataPort = 31012;
+
+ private List<DrillbitEndpoint> initialEndpoints;
+ private ConnectionStateListener stateListener = new ConnectionStateListener() {
+
+ @Override
+ public void stateChanged(CuratorFramework client,
+ ConnectionState newState) {
+ ZKClusterCoordinatorDriver.this.stateChanged(newState);
+ }
+ };
+
+ private ZKClusterCoordinator zkCoord;
+
+ private long connectionLostTime;
+
+ private AMRegistry amRegistry;
+
+ public ZKClusterCoordinatorDriver() {
+ }
+
+ /**
+ * Specify connect string in the form: host:/zkRoot/clusterId
+ *
+ * @param connect
+ * @return
+ * @throws ZKConfigException
+ */
+ public ZKClusterCoordinatorDriver setConnect(String connect)
+ throws ZKConfigException {
+
+ // check if this is a complex zk string. If so, parse into components.
+ Matcher m = ZK_COMPLEX_STRING.matcher(connect);
+ if (!m.matches()) {
+ throw new ZKConfigException("Bad connect string: " + connect);
+ }
+ this.connect = m.group(1);
+ zkRoot = m.group(2);
+ clusterId = m.group(3);
+ return this;
+ }
+
+ public ZKClusterCoordinatorDriver setConnect(String connect, String zkRoot,
+ String clusterId) {
+ this.connect = connect;
+ this.zkRoot = zkRoot;
+ this.clusterId = clusterId;
+ return this;
+ }
+
+ public ZKClusterCoordinatorDriver setRetryCount(int n) {
+ retryCount = n;
+ return this;
+ }
+
+ public ZKClusterCoordinatorDriver setConnectTimeoutMs(int ms) {
+ connectTimeoutMs = ms;
+ return this;
+ }
+
+ public ZKClusterCoordinatorDriver setRetryDelayMs(int ms) {
+ retryDelayMs = ms;
+ return this;
+ }
+
+ public ZKClusterCoordinatorDriver setMaxStartWaitMs(int ms) {
+ maxStartWaitMs = ms;
+ return this;
+ }
+
+ public ZKClusterCoordinatorDriver setFailureTimoutMs(int ms) {
+ failureTimeoutMs = ms;
+ return this;
+ }
+
+ public ZKClusterCoordinatorDriver setPorts(int userPort, int controlPort,
+ int dataPort) {
+ this.userPort = userPort;
+ this.controlPort = controlPort;
+ this.dataPort = dataPort;
+ return this;
+ }
+
+ /**
+ * Builds and starts the ZooKeeper cluster coordinator, translating any errors
+ * that occur. After this call, the listener will start receiving messages.
+ *
+ * @return
+ * @throws ZKRuntimeException
+ * if ZK startup fails
+ */
+ public ZKClusterCoordinatorDriver build() throws ZKRuntimeException {
+ try {
+ zkCoord = new ZKClusterCoordinator(connect, zkRoot, clusterId, retryCount,
+ retryDelayMs, connectTimeoutMs);
+ } catch (IOException e) {
+ throw new ZKRuntimeException(
+ "Failed to initialize the ZooKeeper cluster coordination", e);
+ }
+ try {
+ zkCoord.start(maxStartWaitMs);
+ } catch (Exception e) {
+ throw new ZKRuntimeException(
+ "Failed to start the ZooKeeper cluster coordination after "
+ + maxStartWaitMs + " ms.",
+ e);
+ }
+ initialEndpoints = new ArrayList<>(zkCoord.getAvailableEndpoints());
+ zkCoord.getCurator().getConnectionStateListenable()
+ .addListener(stateListener);
+ amRegistry = new AMRegistry(zkCoord);
+ amRegistry.useLocalRegistry(zkRoot, clusterId);
+ return this;
+ }
+
+ public void addDrillbitListener(DrillbitStatusListener listener) {
+ zkCoord.addDrillbitStatusListener(listener);
+ }
+
+ public void removeDrillbitListener(DrillbitStatusListener listener) {
+ zkCoord.removeDrillbitStatusListener(listener);
+ }
+
+ /**
+ * Returns the set of Drillbits registered at the time of the {@link #build()}
+ * call. Should be empty for a cluster managed by YARN.
+ *
+ * @return
+ */
+
+ public List<DrillbitEndpoint> getInitialEndpoints() {
+ return initialEndpoints;
+ }
+
+ /**
+ * Convenience method to convert a Drillbit to a string. Note that ZK does not
+ * advertise the HTTP port, so it does not appear in the generated string.
+ *
+ * @param bit
+ * @return
+ */
+
+ public static String asString(DrillbitEndpoint bit) {
+ return formatKey(bit.getAddress(), bit.getUserPort(), bit.getControlPort(),
+ bit.getDataPort());
+ }
+
+ public String toKey(String host) {
+ return formatKey(host, userPort, controlPort, dataPort);
+ }
+
+ public static String formatKey(String host, int userPort, int controlPort,
+ int dataPort) {
+ StringBuilder buf = new StringBuilder();
+ buf.append(host).append(":").append(userPort).append(':')
+ .append(controlPort).append(':').append(dataPort);
+ return buf.toString();
+ }
+
+ /**
+ * Translate ZK connection events into a connected/disconnected state along
+ * with the time of the first disconnect not followed by a connect.
+ *
+ * @param newState
+ */
+
+ protected void stateChanged(ConnectionState newState) {
+ switch (newState) {
+ case CONNECTED:
+ case READ_ONLY:
+ case RECONNECTED:
+ if (connectionLostTime != 0) {
+ ZKClusterCoordinator.logger.info("ZK connection regained");
+ }
+ connectionLostTime = 0;
+ break;
+ case LOST:
+ case SUSPENDED:
+ if (connectionLostTime == 0) {
+ ZKClusterCoordinator.logger.info("ZK connection lost");
+ connectionLostTime = System.currentTimeMillis();
+ }
+ break;
+ }
+ }
+
+ /**
+ * Reports our best guess as to whether ZK has failed. We assume ZK has failed
+ * if we received a connection lost notification without a subsequent connect
+ * notification, and we received the disconnect notification log enough ago
+ * that we assume that a timeout has occurred.
+ *
+ * @return
+ */
+
+ public boolean hasFailed() {
+ if (connectionLostTime == 0) {
+ return false;
+ }
+ return System.currentTimeMillis() - connectionLostTime > failureTimeoutMs;
+ }
+
+ public long getLostConnectionDurationMs() {
+ if (connectionLostTime == 0) {
+ return 0;
+ }
+ return System.currentTimeMillis() - connectionLostTime;
+ }
+
+ public void close() {
+ if (zkCoord == null) {
+ return;
+ }
+ zkCoord.getCurator().getConnectionStateListenable()
+ .removeListener(stateListener);
+ try {
+ zkCoord.close();
+ } catch (Exception e) {
+ ZKClusterCoordinator.logger.error("Error occurred on ZK close, ignored",
+ e);
+ }
+ zkCoord = null;
+ }
+
+ @Override
+ public void register(String amHost, int amPort, String appId)
+ throws AMRegistrationException {
+ try {
+ amRegistry.register(amHost, amPort, appId);
+ } catch (ZKRuntimeException e) {
+ throw new AMRegistrationException(e);
+ }
+ }
+
+ @Override
+ public void deregister() {
+ // Nothing to do: ZK does it for us.
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java
new file mode 100644
index 0000000..700d84b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKConfigException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+public class ZKConfigException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ZKConfigException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java
new file mode 100644
index 0000000..0426578
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRegistry.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
+import org.apache.drill.yarn.appMaster.AMWrapperException;
+import org.apache.drill.yarn.appMaster.EventContext;
+import org.apache.drill.yarn.appMaster.Pollable;
+import org.apache.drill.yarn.appMaster.RegistryHandler;
+import org.apache.drill.yarn.appMaster.Task;
+import org.apache.drill.yarn.appMaster.TaskLifecycleListener;
+
+/**
+ * AM-specific implementation of a Drillbit registry backed by ZooKeeper.
+ * Listens to ZK events for registering a Drillbit and deregistering. Alerts the
+ * Cluster Controller of these events.
+ * <p>
+ * Locking strategy: Receives events from both ZK and the cluster controller,
+ * both of which must be synchronized. To prevent deadlocks, this class NEVER
+ * calls into the cluster controller while holding a lock. This prevents the
+ * following:
+ * <p>
+ * ClusterController --> ZKRegistry (OK) <br>
+ * ZK --> ZKRegistry (OK) <br>
+ * ZK --> ZKRegistry --> Cluster Controller (bad)
+ * <p>
+ * In the case of registration, ZK calls the registry which must alert the
+ * cluster controller. Cluster controller alerting is handled outside the ZK
+ * update critical section.
+ * <p>
+ * Because ZK events are occur relatively infrequently, any deadlock will occur
+ * once in a blue moon, which will make it very hard to reproduce. So, extreme
+ * caution is needed at design time to prevent the problem.
+ */
+
+public class ZKRegistry
+ implements TaskLifecycleListener, DrillbitStatusListener, Pollable {
+ /**
+ * State of each Drillbit that we've discovered through ZK or launched via the
+ * AM. The tracker is where we combine the ZK information with AM to correlate
+ * overall Drillbit health.
+ */
+
+ protected static class DrillbitTracker {
+ /**
+ * A Drillbit can be in one of four states.
+ */
+
+ public enum State {
+
+ /**
+ * An unmanaged Drillbit is one that has announced itself via ZK, but
+ * which the AM didn't launch (or has not yet received confirmation from
+ * YARN that it was launched.) In the normal state, this state either does
+ * not occur (YARN reports the task launch before the Drillbit registers
+ * in ZK) or is transient (if the Drillbit registers in ZK before YARN
+ * gets around to telling the AM that the Drillbit was launched.) A
+ * Drillbit that stays in the unregistered state is likely one launched
+ * outside the AM: either launched manually or (possibly), one left from a
+ * previous, failed AM run (though YARN is supposed to kill left-over
+ * child processes in that case.)
+ */
+
+ UNMANAGED,
+
+ /**
+ * A new Drillbit is one that the AM has launched, but that has not yet
+ * registered itself with ZK. This is normally a transient state that
+ * occurs as ZK registration catches up with the YARN launch notification.
+ * If a Drillbit says in this state, then something is seriously wrong
+ * (perhaps a mis-configuration). The cluster controller will patiently
+ * wait a while, then decide bad things are happening and will ask YARN to
+ * kill the Drillbit, then will retry a time or two, after which it will
+ * throw up its hands, blacklist the node, and wait for the admin to sort
+ * things out.
+ */
+
+ NEW,
+
+ /**
+ * Normal operating state: the AM launched the Drillbit, which then
+ * dutifully registered itself in ZK. Nothing to see here, move along.
+ */
+
+ REGISTERED,
+
+ /**
+ * The Drillbit was working just fine, but its registration has dropped
+ * out of ZK for a reason best left to the cluster controller to
+ * determine. Perhaps the controller has decided to kill the Drillbit.
+ * Perhaps the Drillbit became unresponsive (in which case the controller
+ * will kill it and retry) or has died (in which case YARN will alert the
+ * AM that the process exited.)
+ */
+
+ DEREGISTERED
+ }
+
+ /**
+ * The common key used between tasks and ZK registrations. The key is of the
+ * form:<br>
+ *
+ * <pre>
+ * host:port:port:port
+ * </pre>
+ */
+
+ protected final String key;
+
+ /**
+ * ZK tracking state.
+ *
+ * @see {@link State}
+ */
+
+ protected State state;
+
+ /**
+ * For Drillbits started by the AM, the task object for this Drillbit.
+ */
+
+ protected Task task;
+
+ /**
+ * For Drillbits discovered through ZK, the Drill endpoint for the Drillbit.
+ */
+
+ protected DrillbitEndpoint endpoint;
+
+ public DrillbitTracker(String key, DrillbitEndpoint endpoint) {
+ this.key = key;
+ this.state = DrillbitTracker.State.UNMANAGED;
+ this.endpoint = endpoint;
+ }
+
+ public DrillbitTracker(String key, Task task) {
+ this.key = key;
+ this.task = task;
+ state = DrillbitTracker.State.NEW;
+ }
+
+ /**
+ * Mark that a YARN-managed task has become registered in ZK. This indicates
+ * that the task has come online. Tell the task to update its state to
+ * record that the task is, in fact, registered in ZK. This indicates a
+ * normal, healthy task.
+ *
+ * @param tracker
+ */
+
+ private void becomeRegistered() {
+ state = DrillbitTracker.State.REGISTERED;
+ }
+
+ /**
+ * Mark that a YARN-managed Drillbit has dropped out of ZK.
+ *
+ * @param registryHandler
+ */
+
+ public void becomeUnregistered() {
+ assert state == DrillbitTracker.State.REGISTERED;
+ state = DrillbitTracker.State.DEREGISTERED;
+ endpoint = null;
+ }
+ }
+
+ public static final String CONTROLLER_PROPERTY = "zk";
+
+ public static final int UPDATE_PERIOD_MS = 20_000;
+
+ public static final String ENDPOINT_PROPERTY = "endpoint";
+
+ private static final Log LOG = LogFactory.getLog(ZKRegistry.class);
+
+ /**
+ * Map of host:port:port:port keys to tracking objects. Identifies the
+ * Drillbits discovered from ZK, started by the controller, or (ideally) both.
+ */
+
+ private Map<String, DrillbitTracker> registry = new HashMap<>();
+
+ /**
+ * Interface to Drill's cluster coordinator.
+ */
+
+ private ZKClusterCoordinatorDriver zkDriver;
+
+ /**
+ * Drill's cluster coordinator (or, at least, Drill-on-YARN's version of it.
+ */
+
+ private RegistryHandler registryHandler;
+
+ /**
+ * Last check of ZK status.
+ */
+
+ private long lastUpdateTime;
+
+ public ZKRegistry(ZKClusterCoordinatorDriver zkDriver) {
+ this.zkDriver = zkDriver;
+ }
+
+ /**
+ * Called during AM startup to initialize ZK. Checks if any Drillbits are
+ * already running. These are "unmanaged" because the AM could not have
+ * started them (since they predate the AM.)
+ */
+
+ public void start(RegistryHandler controller) {
+ this.registryHandler = controller;
+ try {
+ zkDriver.build();
+ } catch (ZKRuntimeException e) {
+ LOG.error("Failed to start ZK monitoring", e);
+ throw new AMWrapperException("Failed to start ZK monitoring", e);
+ }
+ for (DrillbitEndpoint dbe : zkDriver.getInitialEndpoints()) {
+ String key = toKey(dbe);
+ registry.put(key, new DrillbitTracker(key, dbe));
+
+ // Blacklist the host for each unmanaged drillbit.
+
+ controller.reserveHost(dbe.getAddress());
+
+ LOG.warn("Host " + dbe.getAddress()
+ + " already running a Drillbit outside of YARN.");
+ }
+ zkDriver.addDrillbitListener(this);
+ }
+
+ /**
+ * Convert a Drillbit endpoint to a string key used in the (zk-->task) map.
+ * Note that the string format here must match the one used in
+ * {@link #toKey(Task)} to map a task to string key.
+ *
+ * @param dbe
+ * the Drillbit endpoint from ZK
+ * @return a string key for this object
+ */
+
+ private String toKey(DrillbitEndpoint dbe) {
+ return ZKClusterCoordinatorDriver.asString(dbe);
+ }
+
+ /**
+ * Convert a task to a string key used in the (zk-->task) map. Note that the
+ * string format here must match the one used in
+ * {@link #toKey(DrillbitEndpoint)} to map a drillbit endpoint to string key.
+ *
+ * @param task
+ * the task tracked by the cluster controller
+ * @return a string key for this object
+ */
+
+ private String toKey(Task task) {
+ return zkDriver.toKey(task.getHostName());
+ }
+
+ // private String toKey(Container container) {
+ // return zkDriver.toKey(container.getNodeId().getHost());
+ // }
+
+ public static class AckEvent {
+ Task task;
+ DrillbitEndpoint endpoint;
+
+ public AckEvent(Task task, DrillbitEndpoint endpoint) {
+ this.task = task;
+ this.endpoint = endpoint;
+ }
+ }
+
+ /**
+ * Callback from ZK to indicate that one or more drillbits have become
+ * registered. We handle registrations in a critical section, then alert the
+ * cluster controller outside the critical section.
+ */
+
+ @Override
+ public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
+ List<AckEvent> updates = registerDrillbits(registeredDrillbits);
+ for (AckEvent event : updates) {
+ if (event.task == null) {
+ registryHandler.reserveHost(event.endpoint.getAddress());
+ } else {
+ registryHandler.startAck(event.task, ENDPOINT_PROPERTY, event.endpoint);
+ }
+ }
+ }
+
+ private synchronized List<AckEvent> registerDrillbits(
+ Set<DrillbitEndpoint> registeredDrillbits) {
+ List<AckEvent> events = new ArrayList<>();
+ for (DrillbitEndpoint dbe : registeredDrillbits) {
+ AckEvent event = drillbitRegistered(dbe);
+ if (event != null) {
+ events.add(event);
+ }
+ }
+ return events;
+ }
+
+ /**
+ * Called when a drillbit has become registered. There are two cases. Either
+ * this is a normal registration of a previously-started task, or this is an
+ * unmanaged drillbit for which we have no matching task.
+ */
+
+ private AckEvent drillbitRegistered(DrillbitEndpoint dbe) {
+ String key = toKey(dbe);
+ DrillbitTracker tracker = registry.get(key);
+ if (tracker == null) {
+ // Unmanaged drillbit case
+
+ LOG.info("Registration of unmanaged drillbit: " + key);
+ tracker = new DrillbitTracker(key, dbe);
+ registry.put(key, tracker);
+ return new AckEvent(null, dbe);
+ }
+
+ // Managed drillbit case. Might be we lost, then regained
+ // ZK connection.
+
+ if (tracker.state == DrillbitTracker.State.REGISTERED) {
+ LOG.info("Re-registration of known drillbit: " + key);
+ return null;
+ }
+
+ // Otherwise, the Drillbit has just registered with ZK.
+ // Or, if the ZK connection was lost and regained, the
+ // state changes from DEREGISTERED --> REGISTERED
+
+ LOG.info("Drillbit registered: " + key + ", task: " + tracker.task.toString() );
+ tracker.endpoint = dbe;
+ tracker.becomeRegistered();
+ return new AckEvent(tracker.task, dbe);
+ }
+
+ /**
+ * Callback from ZK to indicate that one or more drillbits have become
+ * deregistered from ZK. We handle the deregistrations in a critical section,
+ * but updates to the cluster controller outside of a critical section.
+ */
+
+ @Override
+ public void drillbitUnregistered(
+ Set<DrillbitEndpoint> unregisteredDrillbits) {
+ List<AckEvent> updates = unregisterDrillbits(unregisteredDrillbits);
+ for (AckEvent event : updates) {
+ registryHandler.completionAck(event.task, ENDPOINT_PROPERTY);
+ }
+ }
+
+ private synchronized List<AckEvent> unregisterDrillbits(
+ Set<DrillbitEndpoint> unregisteredDrillbits) {
+ List<AckEvent> events = new ArrayList<>();
+ for (DrillbitEndpoint dbe : unregisteredDrillbits) {
+ AckEvent event = drillbitUnregistered(dbe);
+ if (event != null) {
+ events.add(event);
+ }
+ }
+ return events;
+ }
+
+ /**
+ * Handle the case that a drillbit becomes unregistered. There are three
+ * cases.
+ * <ol>
+ * <li>The deregistration is for a drillbit that is not in the registry table.
+ * Indicates a code error.</li>
+ * <li>The drillbit is unmanaged. This occurs for drillbits started and
+ * stopped outside of YARN.</li>
+ * <li>Normal case of deregistration of a YARN-managed drillbit. Inform the
+ * controller of this event.</li>
+ * </ol>
+ *
+ * @param dbe
+ */
+
+ private AckEvent drillbitUnregistered(DrillbitEndpoint dbe) {
+ String key = toKey(dbe);
+ DrillbitTracker tracker = registry.get(key);
+ assert tracker != null;
+ if (tracker == null) {
+ // Something is terribly wrong.
+ // Have seen this when a user kills the Drillbit just after it starts. Evidently, the
+ // Drillbit registers with ZK just before it is killed, but before DoY hears about
+ // the registration.
+
+ LOG.error("Internal error - Unexpected drillbit unregistration: " + key);
+ return null;
+ }
+ if (tracker.state == DrillbitTracker.State.UNMANAGED) {
+ // Unmanaged drillbit
+
+ assert tracker.task == null;
+ LOG.info("Unmanaged drillbit unregistered: " + key);
+ registry.remove(key);
+ registryHandler.releaseHost(dbe.getAddress());
+ return null;
+ }
+ LOG.info("Drillbit unregistered: " + key + ", task: " + tracker.task.toString() );
+ tracker.becomeUnregistered();
+ return new AckEvent(tracker.task, dbe);
+ }
+
+ /**
+ * Listen for selected YARN task state changes. Called from within the cluster
+ * controller's critical section.
+ */
+
+ @Override
+ public synchronized void stateChange(Event event, EventContext context) {
+ switch (event) {
+ case ALLOCATED:
+ taskCreated(context.task);
+ break;
+ case ENDED:
+ taskEnded(context.task);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Indicates that the cluster controller has created a task that we expect to
+ * be monitored by ZK. We handle two cases: the normal case in which we later
+ * receive a ZK notification. And, the unusual case in which we've already
+ * received the ZK notification and we now match that notification with this
+ * task. (The second case could occur if latency causes us to receive the ZK
+ * notification before we learn from the NM that the task is alive.)
+ *
+ * @param task
+ */
+
+ private void taskCreated(Task task) {
+ String key = toKey(task);
+ DrillbitTracker tracker = registry.get(key);
+ if (tracker == null) {
+ // Normal case: no ZK registration yet.
+
+ registry.put(key, new DrillbitTracker(key, task));
+ } else if (tracker.state == DrillbitTracker.State.UNMANAGED) {
+ // Unusual case: ZK registration came first.
+
+ LOG.info("Unmanaged drillbit became managed: " + key);
+ tracker.task = task;
+ tracker.becomeRegistered();
+
+ // Note: safe to call this here as we are already in the controller
+ // critical section.
+
+ registryHandler.startAck(task, ENDPOINT_PROPERTY, tracker.endpoint);
+ } else {
+ LOG.error(task.getLabel() + " - Drillbit registry in wrong state "
+ + tracker.state + " for new task: " + key);
+ }
+ }
+
+ /**
+ * Report whether the given task is still registered in ZK. Called while
+ * waiting for a deregistration event to catch possible cases where the
+ * messages is lost. The message should never be lost, but we've seen
+ * cases where tasks hang in this state. This is a potential work-around.
+ *
+ * @param task
+ * @return
+ */
+
+ public synchronized boolean isRegistered(Task task) {
+ String key = toKey(task);
+ DrillbitTracker tracker = registry.get(key);
+ if (tracker==null) {
+ return false;
+ }
+ return tracker.state == DrillbitTracker.State.REGISTERED;
+ }
+
+ /**
+ * Mark that a task (YARN container) has ended. Updates the (zk --> task)
+ * registry by removing the task. The cluster controller state machine
+ * monitors ZK and does not end the task until the ZK registration for that
+ * task drops. As a result, the entry here should be in the deregistered state
+ * or something is seriously wrong.
+ *
+ * @param task
+ */
+
+ private void taskEnded(Task task) {
+
+ // If the task has no host name then the task is being cancelled before
+ // a YARN container was allocated. Just ignore such a case.
+
+ if (task.getHostName() == null) {
+ return;
+ }
+ String key = toKey(task);
+ DrillbitTracker tracker = registry.get(key);
+ assert tracker != null;
+ assert tracker.state == DrillbitTracker.State.DEREGISTERED;
+ registry.remove(key);
+ }
+
+ /**
+ * Periodically check ZK status. If the ZK connection has timed out, something
+ * is very seriously wrong. Shut the whole Drill cluster down since Drill
+ * cannot operate without ZooKeeper.
+ * <p>
+ * This method should not be synchronized. It checks only the ZK state, not
+ * internal state. Further, if we do reconnect to ZK, then a ZK thread may
+ * attempt to update this registry, which will acquire a synchronization lock.
+ */
+
+ @Override
+ public void tick(long curTime) {
+ if (lastUpdateTime + UPDATE_PERIOD_MS < curTime) {
+ return;
+ }
+
+ lastUpdateTime = curTime;
+ if (zkDriver.hasFailed()) {
+ int secs = (int) ((zkDriver.getLostConnectionDurationMs() + 500) / 1000);
+ LOG.error(
+ "ZooKeeper connection lost, failing after " + secs + " seconds.");
+ registryHandler.registryDown();
+ }
+ }
+
+ public void finish(RegistryHandler handler) {
+ zkDriver.removeDrillbitListener(this);
+ zkDriver.close();
+ }
+
+ public synchronized List<String> listUnmanagedDrillits() {
+ List<String> drillbits = new ArrayList<>();
+ for (DrillbitTracker item : registry.values()) {
+ if (item.state == DrillbitTracker.State.UNMANAGED) {
+ drillbits.add(item.key);
+ }
+ }
+ return drillbits;
+ }
+
+ /**
+ * Get the current registry for testing. Why for testing? Because this is
+ * unsynchronized. In production code, the map may change out from under you.
+ *
+ * @return
+ */
+
+ protected Map<String, DrillbitTracker> getRegistryForTesting() {
+ return registry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java
new file mode 100644
index 0000000..4e1b115
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/ZKRuntimeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.zk;
+
+public class ZKRuntimeException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ZKRuntimeException(String msg, Exception e) {
+ super(msg, e);
+ }
+
+ public ZKRuntimeException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java
new file mode 100644
index 0000000..14bb427
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/zk/package-info.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Interface between the Application Master and ZooKeeper. Classes here manage two
+ * registrations: Drillbits and the AM itself.
+ * <p>
+ * Drillbit registration is used to confirm that Drillbits have indeed come online.
+ * If Drillbits fail to come online, then the AM concludes that somethign went wrong.
+ * If Drilbits drop offline unexpectedly, the AM concludes that the Drillbit is sick
+ * and restarts it.
+ * <p>
+ * The AM registry prevents two AMs from attempting to manage the same cluster.
+ */
+
+package org.apache.drill.yarn.zk;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/config.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/config.ftl b/drill-yarn/src/main/resources/drill-am/config.ftl
new file mode 100644
index 0000000..8405c1f
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/config.ftl
@@ -0,0 +1,41 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+ <h4>Fully Resolved Configuration Settings</h4>
+ <p>
+
+ <table class="table table-hover" style="width: auto;">
+ <tr>
+ <th>Configuration Key</td>
+ <th>Value</td>
+ </tr>
+ <#list model as pair>
+ <tr>
+ <td>${pair.getName()}</td>
+ <td>${pair.getQuotedValue()}</td>
+ </tr>
+ </#list>
+ </table>
+ <p>
+ To modify these values:
+ <ol>
+ <li>Edit <code>$DRILL_SITE/drill-on-yarn.conf</code> (for the drill.yarn settings),</li>
+ <li>Edit <code>$DRILL_SITE/drill-override.conf</code> (for the drill.exec settings).</li>
+ <li>Restart your Drill cluster using the Drill-on-YARN client.</li>
+ </ol>
+</#macro>
+
+<@page_html/>
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/confirm.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/confirm.ftl b/drill-yarn/src/main/resources/drill-am/confirm.ftl
new file mode 100644
index 0000000..515293d
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/confirm.ftl
@@ -0,0 +1,70 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+ <h4><#if model.getType( ) == "STOPPED">
+ Stop Drill Cluster
+ <#else>
+ Resize Drill Cluster
+ </#if></h4>
+ <p>
+
+ <#if model.getType( ) == "RESIZED">
+ <div class="alert alert-success">
+ <strong>Success!</strong> Cluster resizing to ${model.getValue( )} nodes.
+ </div>
+ <#elseif model.getType( ) == "CANCELLED">
+ <div class="alert alert-info">
+ <strong>Success!</strong> Drillbit ${model.getValue( )} was cancelled.
+ </div>
+ <#elseif model.getType( ) == "NULL_RESIZE">
+ <div class="alert alert-info">
+ <strong>Note:</strong> The new size of ${model.getValue( )} is the
+ same as the current cluster size.
+ </div>
+ <#elseif model.getType( ) == "INVALID_RESIZE">
+ <div class="alert alert-danger">
+ <strong>Error!</strong> Invalid cluster resize level: ${model.getValue( )}.
+ Please <a href="/manage">try again</a>.
+ </div>
+ <#elseif model.getType( ) == "INVALID_GROW">
+ <div class="alert alert-danger">
+ <strong>Error!</strong> Invalid cluster grow amount: ${model.getValue( )}.
+ Please <a href="/manage">try again</a>.
+ </div>
+ <#elseif model.getType( ) == "INVALID_SHRINK">
+ <div class="alert alert-danger">
+ <strong>Error!</strong> Invalid cluster shrink amount: ${model.getValue( )}.
+ Please <a href="/manage">try again</a>.
+ </div>
+ <#elseif model.getType( ) == "INVALID_TASK">
+ <div class="alert alert-danger">
+ <strong>Error!</strong> Invalid Drillbit ID: ${model.getValue( )}.
+ Perhaps the Drillbit has already stopped.
+ </div>
+ <#elseif model.getType( ) == "STOPPED">
+ <div class="alert alert alert-success">
+ <strong>Success!</strong> Cluster is shutting down.
+ </div>
+ Pages on this site will be unavailable until the cluster restarts.
+ </#if>
+ <#if model.getType( ) == "CANCELLED">
+ Return to the <a href="/drillbits">Drillbits page</a>.
+ <#elseif model.getType( ) != "STOPPED">
+ Return to the <a href="/manage">Management page</a>.
+ </#if>
+</#macro>
+
+<@page_html/>
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/generic.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/generic.ftl b/drill-yarn/src/main/resources/drill-am/generic.ftl
new file mode 100644
index 0000000..b76a917
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/generic.ftl
@@ -0,0 +1,78 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<#-- Adapted from the Drill generic.ftl, adjusted for use in the AM. -->
+
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+</#macro>
+
+<#macro page_html>
+ <meta http-equiv="X-UA-Compatible" content="IE=edge">
+
+ <title>Apache Drill - Application Master</title>
+ <link rel="shortcut icon" href="/static/img/drill.ico">
+
+ <link href="/static/css/bootstrap.min.css" rel="stylesheet">
+ <link href="/drill-am/static/css/drill-am.css" rel="stylesheet">
+
+ <script src="/static/js/jquery.min.js"></script>
+ <script src="/static/js/bootstrap.min.js"></script>
+
+ <!-- HTML5 shim and Respond.js IE8 support of HTML5 elements and media queries -->
+ <!--[if lt IE 9]>
+ <script src="/static/js/html5shiv.js"></script>
+ <script src="/static/js/1.4.2/respond.min.js"></script>
+ <![endif]-->
+
+ <@page_head/>
+ </head>
+ <body role="document">
+ <div class="navbar navbar-inverse navbar-fixed-top" role="navigation">
+ <div class="container-fluid">
+ <div class="navbar-header">
+ <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
+ <span class="sr-only">Toggle navigation</span>
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ </button>
+ <a class="navbar-brand" href="/">Apache Drill</a>
+ </div>
+ <div class="navbar-collapse collapse">
+ <ul class="nav navbar-nav">
+ <li><a href="/config">Configuration</a></li>
+ <li><a href="/drillbits">Drillbits</a></li>
+ <li><a href="/manage">Manage</a></li>
+ <li><a href="/history">History</a></li>
+ </ul>
+ <ul class="nav navbar-nav navbar-right">
+ <li><a href="${docsLink}">Documentation</a>
+ <#if showLogin == true >
+ <li><a href="/login">Log In</a>
+ </#if>
+ <#if showLogout == true >
+ <li><a href="/logout">Log Out (${loggedInUserName})</a>
+ </#if>
+ </ul>
+ </div>
+ </div>
+ </div>
+
+ <div class="container-fluid drill-am" role="main">
+ <h3>YARN Application Master – ${clusterName}</h3>
+ <@page_body/>
+ </div>
+ </body>
+ </html>
+</#macro>
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/history.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/history.ftl b/drill-yarn/src/main/resources/drill-am/history.ftl
new file mode 100644
index 0000000..c588d06
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/history.ftl
@@ -0,0 +1,59 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+ <meta http-equiv="refresh" content="${refreshSecs}" >
+</#macro>
+
+<#macro page_body>
+ <h4>Drillbit History</h4>
+ <p>
+
+ <div class="table-responsive">
+ <table class="table table-hover">
+ <tr>
+ <th>ID</th>
+ <th>Try</th>
+ <th>Pool</th>
+ <th>Host</th>
+ <th>Container</th>
+ <th>Memory (MB)</th>
+ <th>Virtual Cores</th>
+ <th>Start Time</th>
+ <th>End Time</th>
+ <th>Disposition</th>
+ </th>
+ <#assign count=0>
+ <#list model as task>
+ <#assign count=count+1>
+ <tr>
+ <td><b>${task.getTaskId( )}</b></td>
+ <td>${task.getTryCount( )}</td>
+ <td>${task.getGroupName( )}</td>
+ <td><#if task.hasContainer( )><a href="${task.getNmLink( )}">${task.getHost( )}</a>
+ <#else> </#if></td>
+ <td>${task.getContainerId()}</td>
+ <td>${task.getMemory( )}</td>
+ <td>${task.getVcores( )}</td>
+ <td>${task.getStartTime( )}</td>
+ <td>${task.getEndTime( )}</td>
+ <td>${task.getDisposition( )}</td>
+ </tr>
+ </#list>
+ </table>
+ <#if count == 0>
+ No drillbits have completed.
+ </#if>
+ </div>
+</#macro>
+
+<@page_html/>
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/index.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/index.ftl b/drill-yarn/src/main/resources/drill-am/index.ftl
new file mode 100644
index 0000000..18d6ab5
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/index.ftl
@@ -0,0 +1,128 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+ <meta http-equiv="refresh" content="${model.getRefreshSecs( )}" >
+</#macro>
+
+<#macro page_body>
+ <h4>Drill Cluster Status</h4>
+
+ <table class="table table-hover" style="width: auto;">
+ <tr>
+ <td>YARN Application ID:</td>
+ <td><a href="${model.getRmAppLink( )}" data-toggle="tooltip" title="YARN Resource Manager page for this application">
+ ${model.getAppId( )}</a></td>
+ </tr>
+ <tr>
+ <td>YARN Resource Manager:</td>
+ <td><#if model.getRmLink()?? > <#-- Occurs early in startup before app is fully registered. -->
+ <a href="${model.getRmLink( )}" data-toggle="tooltip" title="YARN Resource Manager page for this container">
+ ${model.getRmHost( )}</a>
+ <#else>Unavailable
+ </#if></td>
+ </tr>
+ <tr>
+ <td>YARN Node Manager for AM:</td>
+ <td><#if model.getNmLink()?? > <#-- Occurs early in startup before app is fully registered. -->
+ <a href="${model.getNmLink( )}" data-toggle="tooltip" title="YARN Node Manager">
+ ${model.getNmHost( )}</a> |
+ <a href="${model.getNmAppLink( )}" data-toggle="tooltip" title="YARN Node Manager page for this application">App info</a>
+ <#else>Unavailable
+ </#if></td>
+ </tr>
+ <tr>
+ <td>ZooKeeper Hosts:</td>
+ <td><span data-toggle="tooltip" title="ZooKeeper connection string.">
+ ${model.getZkConnectionStr( )}</span></td>
+ </tr>
+ <tr>
+ <td>ZooKeeper Root:</td>
+ <td><span data-toggle="tooltip" title="ZooKeeper Drill root.">
+ ${model.getZkRoot( )}</span></td>
+ </tr>
+ <tr>
+ <td>ZooKeeper Cluster ID:</td>
+ <td><span data-toggle="tooltip" title="ZooKeeper Drill cluster-id.">
+ ${model.getZkClusterId( )}</span></td>
+ </tr>
+ <tr>
+ <td>State:</td>
+ <td><span data-toggle="tooltip" title="${model.getStateHint( )}">
+ ${model.getState( )}</span></td>
+ </tr>
+ <tr>
+ <td>Target Drillbit Count:</td>
+ <td>${model.getTargetCount( )}</td>
+ </tr>
+ <tr>
+ <td>Live Drillbit Count:</td>
+ <td>${model.getLiveCount( )}</td>
+ </tr>
+ <#if model.getUnmanagedCount( ) gt 0 >
+ <tr>
+ <td style="color: red">Unmanaged Drillbit Count:</td>
+ <td>${model.getUnmanagedCount( )}</td>
+ </tr>
+ </#if>
+ <#if model.getBlacklistCount( ) gt 0 >
+ <tr>
+ <td style="color: red">Blacklisted Node Count:</td>
+ <td>${model.getBlacklistCount( )}</td>
+ </tr>
+ </#if>
+ <tr>
+ <td>Total Drill Virtual Cores:</td>
+ <td>${model.getDrillTotalVcores( )}</td>
+ </tr>
+ <tr>
+ <td>Total Drill Memory (MB):</td>
+ <td>${model.getDrillTotalMemory( )}</td>
+ </tr>
+ <#if model.supportsDiskResource( ) >
+ <tr>
+ <td>Total Drill Disks:</td>
+ <td>${model.getDrillTotalDisks( )}</td>
+ </tr>
+ </#if>
+ </table>
+ <table class="table table-hover" style="width: auto;">
+ <tr>
+ <th>Group</th>
+ <th>Type</th>
+ <th>Target Drillbit Count</th>
+ <th>Total Drillbits</th>
+ <th>Live Drillbits</th>
+ <th>Memory per Drillbit (MB)</th>
+ <th>VCores per Drillbit</th>
+ <#if model.supportsDiskResource( ) >
+ <th>Disks per Drillbit</th>
+ </#if>
+ </tr>
+ <#list model.getGroups( ) as group>
+ <tr>
+ <td>${group.getName( )}</td>
+ <td>${group.getType( )}</td>
+ <td>${group.getTargetCount( )}</td>
+ <td>${group.getTaskCount( )}</td>
+ <td>${group.getLiveCount( )}</td>
+ <td>${group.getMemory( )}</td>
+ <td>${group.getVcores( )}</td>
+ <#if model.supportsDiskResource( ) >
+ <td>${group.getDisks( )}</td>
+ </#if>
+ </tr>
+ </#list>
+ </table>
+</#macro>
+
+<@page_html/>
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/resources/drill-am/login.ftl
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/resources/drill-am/login.ftl b/drill-yarn/src/main/resources/drill-am/login.ftl
new file mode 100644
index 0000000..036229e
--- /dev/null
+++ b/drill-yarn/src/main/resources/drill-am/login.ftl
@@ -0,0 +1,35 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<#include "*/generic.ftl">
+<#macro page_head>
+</#macro>
+
+<#macro page_body>
+ <div align="center" class="table-responsive">
+ <form role="form" name="input" action="/j_security_check" method="POST">
+ <fieldset>
+ <div class="form-group">
+ <img src="/drill-am/static/img/apache-drill-logo.png" alt="Apache Drill Logo">
+ <#if model??>
+ <div class="alert alert-danger">
+ <strong>Error</strong> ${model}
+ </div>
+ </#if>
+ <p><input type="text" size="30" name="j_username" placeholder="Username"></p>
+ <p><input type="password" size="30" name="j_password" placeholder="Password"></p>
+ <p><button type="submit" class="btn btn-primary">Log In</button></p>
+ </div>
+ </fieldset>
+ </form>
+ </div>
+</#macro>
+<@page_html/>
\ No newline at end of file
|