knox-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject knox git commit: KNOX-1041 - High Availability Support For Apache SOLR, HBase & Kafka (Rick Kellogg via Sandeep More)
Date Fri, 20 Oct 2017 14:38:42 GMT
Repository: knox
Updated Branches:
  refs/heads/master aa62fa2db -> a08aaf742


KNOX-1041 - High Availability Support For Apache SOLR, HBase & Kafka (Rick Kellogg via
Sandeep More)


Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/a08aaf74
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/a08aaf74
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/a08aaf74

Branch: refs/heads/master
Commit: a08aaf742a97a3c35c94e28406fc4b6ef3184005
Parents: aa62fa2
Author: Sandeep More <more@apache.org>
Authored: Fri Oct 20 10:38:34 2017 -0400
Committer: Sandeep More <more@apache.org>
Committed: Fri Oct 20 10:38:34 2017 -0400

----------------------------------------------------------------------
 .../provider/impl/BaseZookeeperURLManager.java  | 195 +++++++++++++++++++
 .../provider/impl/HBaseZookeeperURLManager.java | 138 +++++++++++++
 .../provider/impl/KafkaZookeeperURLManager.java | 152 +++++++++++++++
 .../provider/impl/SOLRZookeeperURLManager.java  | 118 +++++++++++
 .../ha/provider/impl/StringResponseHandler.java |  49 +++++
 ...apache.hadoop.gateway.ha.provider.URLManager |   5 +-
 .../impl/HBaseZookeeperURLManagerTest.java      |  72 +++++++
 .../impl/KafkaZookeeperURLManagerTest.java      |  71 +++++++
 .../impl/SOLRZookeeperURLManagerTest.java       | 110 +++++++++++
 9 files changed, 909 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java
new file mode 100644
index 0000000..0b16144
--- /dev/null
+++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.URLManager;
+import org.apache.hadoop.gateway.ha.provider.impl.i18n.HaMessages;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Base implementation of URLManager intended for query of Zookeeper active hosts. In
+ * the event of a failure via markFailed, Zookeeper is queried again for active
+ * host information.
+ * 
+ * When configuring the HAProvider in the topology, the zookeeperEnsemble attribute must
be set to a
+ * comma delimited list of the host and port number, i.e. host1:2181,host2:2181. 
+ */
+public abstract class BaseZookeeperURLManager implements URLManager {
+	protected static final HaMessages LOG = MessagesFactory.get(HaMessages.class);
+	/**
+	 * Host Ping Timeout
+	 */
+	private static final int TIMEOUT = 2000;
+
+	private String zooKeeperEnsemble;
+	private ConcurrentLinkedQueue<String> urls = new ConcurrentLinkedQueue<String>();
+
+	// -------------------------------------------------------------------------------------
+	// URLManager interface methods
+	// -------------------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsConfig(HaServiceConfig config) {
+		if (!config.getServiceName().equalsIgnoreCase(getServiceName())) {
+			return false;
+		}
+		
+		String zookeeperEnsemble = config.getZookeeperEnsemble();
+		if (zookeeperEnsemble != null && zookeeperEnsemble.trim().length() > 0) {
+			return true;
+		}
+		
+		return false;
+	}
+
+	@Override
+	public void setConfig(HaServiceConfig config) {
+		zooKeeperEnsemble = config.getZookeeperEnsemble();
+		setURLs(lookupURLs());
+	}
+
+	@Override
+	public synchronized String getActiveURL() {
+		// None available so refresh
+		if (urls.isEmpty()) {
+			setURLs(lookupURLs());
+		}
+
+		return this.urls.peek();
+	}
+
+	@Override
+	public synchronized void setActiveURL(String url) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public synchronized List<String> getURLs() {
+		return Lists.newArrayList(this.urls.iterator());
+	}
+
+	@Override
+	public synchronized void markFailed(String url) {
+		// Capture complete URL of active host
+		String topURL = getActiveURL();
+
+		// Refresh URLs from ZooKeeper
+		setURLs(lookupURLs());
+
+		// Show failed URL and new URL
+		LOG.markedFailedUrl(topURL, getActiveURL());
+	}
+
+	@Override
+	public synchronized void setURLs(List<String> urls) {
+		if ((urls != null) && (!(urls.isEmpty()))) {
+			this.urls.clear();
+			this.urls.addAll(urls);
+		}
+	}
+
+	// -------------------------------------------------------------------------------------
+	// Abstract methods
+	// -------------------------------------------------------------------------------------
+
+	/**
+	 * Look within Zookeeper under the /live_nodes branch for active hosts
+	 * 
+	 * @return A List of URLs (never null)
+	 */
+	protected abstract List<String> lookupURLs();
+
+	/**
+	 * @return The name of the Knox Topology Service to support
+	 */
+	protected abstract String getServiceName();
+
+	// -------------------------------------------------------------------------------------
+	// Protected methods
+	// -------------------------------------------------------------------------------------
+
+	protected String getZookeeperEnsemble() {
+		return zooKeeperEnsemble;
+	}
+	
+	/**
+	 * Validate access to hosts using simple light weight ping style REST call.
+	 * 
+	 * @param hosts List of hosts to evaluate (required)
+	 * @param suffix Text to append to host (required) 
+	 * @param acceptHeader Used for Accept header (optional)
+	 * 
+	 * @return Hosts with successful access
+	 */
+	protected List<String> validateHosts(List<String> hosts, String suffix, String
acceptHeader) {
+		List<String> result = new ArrayList<String>();
+		
+		CloseableHttpClient client = null;
+		
+		try {
+			// Construct a HttpClient with short term timeout
+			RequestConfig.Builder requestBuilder = RequestConfig.custom()
+					.setConnectTimeout(TIMEOUT)
+					.setSocketTimeout(TIMEOUT)
+					.setConnectionRequestTimeout(TIMEOUT);
+
+			client = HttpClientBuilder.create()
+					.setDefaultRequestConfig(requestBuilder.build())
+					.build();
+			
+			for(String host: hosts) {
+				try	{
+					HttpGet get = new HttpGet(host + suffix);
+					
+					if (acceptHeader != null) {
+						get.setHeader("Accept", acceptHeader);
+					}
+					
+					String response = client.execute(get, new StringResponseHandler());
+					
+					if (response != null) {
+						result.add(host);
+					}
+				}
+				catch (Exception ex) {
+					// ignore host
+				}
+			}
+		}
+		catch (Exception ex) {
+			// Ignore errors
+		}
+		finally	{
+			IOUtils.closeQuietly(client);
+		}
+		
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java
new file mode 100644
index 0000000..8a414c7
--- /dev/null
+++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of URLManager intended for query of Zookeeper for active HBase RegionServer
hosts.
+ *  
+ * The assumption is that the HBase REST Server will be installed on the same host.  For
safety
+ * reasons, the REST Server is pinged for access before inclusion in the list of returned
hosts.
+ * 
+ * In the event of a failure via markFailed, Zookeeper is queried again for active
+ * host information.
+ * 
+ * When configuring the HAProvider in the topology, the zookeeperEnsemble
+ * attribute must be set to a comma delimited list of the host and port number,
+ * i.e. host1:2181,host2:2181.
+ */
+public class HBaseZookeeperURLManager extends BaseZookeeperURLManager {
+	/**
+	 * Default Port Number for HBase REST Server
+	 */
+	private static final int PORT_NUMBER = 8080;
+	
+	private String zookeeperNamespace = "hbase-unsecure";
+	
+	// -------------------------------------------------------------------------------------
+	// Abstract methods
+	// -------------------------------------------------------------------------------------
+
+	/**
+	 * Look within Zookeeper under the /hbase-unsecure/rs branch for active HBase RegionServer
hosts
+	 * 
+	 * @return A List of URLs (never null)
+	 */
+	@Override
+	protected List<String> lookupURLs() {
+		// Retrieve list of potential hosts from ZooKeeper
+		List<String> hosts = retrieveHosts();
+		
+		// Validate access to hosts using cheap ping style operation
+		List<String> validatedHosts = validateHosts(hosts,"/","text/xml");
+
+		// Randomize the hosts list for simple load balancing
+		if (!validatedHosts.isEmpty()) {
+			Collections.shuffle(validatedHosts);
+		}
+
+		return validatedHosts;
+	}
+
+	protected String getServiceName() {
+		return "WEBHBASE";
+	};
+
+	// -------------------------------------------------------------------------------------
+	// Private methods
+	// -------------------------------------------------------------------------------------
+
+	/**
+	 * @return Retrieve lists of hosts from ZooKeeper
+	 */
+	private List<String> retrieveHosts()
+	{
+		List<String> serverHosts = new ArrayList<>();
+		
+		CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder()
+				.connectString(getZookeeperEnsemble())
+				.retryPolicy(new ExponentialBackoffRetry(1000, 3))
+				.build();
+		
+		try {
+			zooKeeperClient.start();
+			
+			// Retrieve list of all region server hosts
+			List<String> serverNodes = zooKeeperClient.getChildren().forPath("/" + zookeeperNamespace
+ "/rs");
+			
+			for (String serverNode : serverNodes) {
+				String serverURL = constructURL(serverNode);
+				serverHosts.add(serverURL);
+			}
+		} catch (Exception e) {
+			LOG.failedToGetZookeeperUrls(e);
+			throw new RuntimeException(e);
+		} finally {
+			// Close the client connection with ZooKeeper
+			if (zooKeeperClient != null) {
+				zooKeeperClient.close();
+			}
+		}
+		
+		return serverHosts;
+	}
+	
+	/**
+	 * Given a String of the format "host,number,number" convert to a URL of the format
+	 * "http://host:port".
+	 * 
+	 * @param serverInfo Server Info from Zookeeper (required)
+	 * 
+	 * @return URL to HBASE
+	 */
+	private String constructURL(String serverInfo) {
+		String scheme = "http";
+
+		StringBuffer buffer = new StringBuffer();
+		buffer.append(scheme);
+		buffer.append("://");
+		// Strip off the host name 
+		buffer.append(serverInfo.substring(0,serverInfo.indexOf(",")));
+		buffer.append(":");
+		buffer.append(PORT_NUMBER);
+		
+		return buffer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java
new file mode 100644
index 0000000..c68c107
--- /dev/null
+++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of URLManager intended for query of Zookeeper for active Kafka hosts. 
+ * 
+ * The assumption is that the Confluent REST Proxy will be installed on the same host.  For
safety
+ * reasons, the REST Server is pinged for access before inclusion in the list of returned
hosts.
+ * 
+ * In the event of a failure via markFailed, Zookeeper is queried again for active
+ * host information.
+ * 
+ * When configuring the HAProvider in the topology, the zookeeperEnsemble
+ * attribute must be set to a comma delimited list of the host and port number,
+ * i.e. host1:2181,host2:2181.
+ */
+public class KafkaZookeeperURLManager extends BaseZookeeperURLManager {
+	/**
+	 * Default Port Number for Confluent Kafka REST Server
+	 */
+	private static final int PORT_NUMBER = 8082;
+	/**
+	 * Base path for retrieval from Zookeeper
+	 */
+	private static final String BASE_PATH = "/brokers/ids";
+	
+	// -------------------------------------------------------------------------------------
+	// Abstract methods
+	// -------------------------------------------------------------------------------------
+
+	/**
+	 * Look within Zookeeper under the /broker/ids branch for active Kafka hosts
+	 * 
+	 * @return A List of URLs (never null)
+	 */
+	@Override
+	protected List<String> lookupURLs() {
+		// Retrieve list of potential hosts from ZooKeeper
+		List<String> hosts = retrieveHosts();
+		
+		// Validate access to hosts using cheap ping style operation
+		List<String> validatedHosts = validateHosts(hosts,"/topics","application/vnd.kafka.v2+json");
+
+		// Randomize the hosts list for simple load balancing
+		if (!validatedHosts.isEmpty()) {
+			Collections.shuffle(validatedHosts);
+		}
+
+		return validatedHosts;
+	}
+
+	protected String getServiceName() {
+		return "KAFKA";
+	};
+
+	// -------------------------------------------------------------------------------------
+	// Private methods
+	// -------------------------------------------------------------------------------------
+
+	/**
+	 * @return Retrieve lists of hosts from ZooKeeper
+	 */
+	private List<String> retrieveHosts()
+	{
+		List<String> serverHosts = new ArrayList<>();
+		
+		CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder()
+				.connectString(getZookeeperEnsemble())
+				.retryPolicy(new ExponentialBackoffRetry(1000, 3))
+				.build();
+		
+		try {
+			zooKeeperClient.start();
+
+			// Retrieve list of host URLs from ZooKeeper
+			List<String> brokers = zooKeeperClient.getChildren().forPath(BASE_PATH);
+
+			for (String broker : brokers) {
+				String serverInfo = new String(zooKeeperClient.getData().forPath(BASE_PATH + "/" + broker),
Charset.forName("UTF-8"));
+				
+				String serverURL = constructURL(serverInfo);
+				serverHosts.add(serverURL);
+			}
+		} catch (Exception e) {
+			LOG.failedToGetZookeeperUrls(e);
+			throw new RuntimeException(e);
+		} finally {
+			// Close the client connection with ZooKeeper
+			if (zooKeeperClient != null) {
+				zooKeeperClient.close();
+			}
+		}
+		
+		return serverHosts;
+	}
+	
+	/**
+	 * Given a String of the format "{"jmx_port":-1,"timestamp":"1505763958072","endpoints":["PLAINTEXT://host:6667"],"host":"host","version":3,"port":6667}"

+	 * convert to a URL of the format "http://host:port".
+	 * 
+	 * @param serverInfo Server Info in JSON Format from Zookeeper (required)
+	 * 
+	 * @return URL to Kafka
+	 * @throws ParseException 
+	 */
+	private String constructURL(String serverInfo) throws ParseException {
+		String scheme = "http";
+
+		StringBuffer buffer = new StringBuffer();
+		
+		buffer.append(scheme);
+		buffer.append("://");
+		
+		JSONParser parser = new JSONParser(JSONParser.DEFAULT_PERMISSIVE_MODE);
+		JSONObject obj = (JSONObject) parser.parse(serverInfo);
+		buffer.append(obj.get("host"));
+		
+		buffer.append(":");
+		buffer.append(PORT_NUMBER);
+
+		return buffer.toString();
+	}	
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java
new file mode 100644
index 0000000..f612e9b
--- /dev/null
+++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of URLManager intended for query of Zookeeper for active SOLR Cloud hosts.

+ * In the event of a failure via markFailed, Zookeeper is queried again for active
+ * host information.
+ * 
+ * When configuring the HAProvider in the topology, the zookeeperEnsemble
+ * attribute must be set to a comma delimited list of the host and port number,
+ * i.e. host1:2181,host2:2181.
+ */
+public class SOLRZookeeperURLManager extends BaseZookeeperURLManager {
+
+	// -------------------------------------------------------------------------------------
+	// Abstract methods
+	// -------------------------------------------------------------------------------------
+
+	/**
+	 * Look within Zookeeper under the /live_nodes branch for active SOLR hosts
+	 * 
+	 * @return A List of URLs (never null)
+	 */
+	@Override
+	protected List<String> lookupURLs() {
+		// Retrieve list of potential hosts from ZooKeeper
+		List<String> hosts = retrieveHosts();
+		
+		// Randomize the hosts list for simple load balancing
+		if (!hosts.isEmpty()) {
+			Collections.shuffle(hosts);
+		}
+
+		return hosts;
+	}
+
+	protected String getServiceName() {
+		return "SOLR";
+	};
+
+	// -------------------------------------------------------------------------------------
+	// Private methods
+	// -------------------------------------------------------------------------------------
+
+	/**
+	 * @return Retrieve lists of hosts from ZooKeeper
+	 */
+	private List<String> retrieveHosts()
+	{
+		List<String> serverHosts = new ArrayList<>();
+		
+		CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder()
+				.connectString(getZookeeperEnsemble())
+				.retryPolicy(new ExponentialBackoffRetry(1000, 3))
+				.build();
+		
+		try {
+			zooKeeperClient.start();
+			List<String> serverNodes = zooKeeperClient.getChildren().forPath("/live_nodes");
+			for (String serverNode : serverNodes) {
+				String serverURL = constructURL(serverNode);
+				serverHosts.add(serverURL);
+			}
+		} catch (Exception e) {
+			LOG.failedToGetZookeeperUrls(e);
+			throw new RuntimeException(e);
+		} finally {
+			// Close the client connection with ZooKeeper
+			if (zooKeeperClient != null) {
+				zooKeeperClient.close();
+			}
+		}
+
+		return serverHosts;
+	}
+	
+	/**
+	 * Given a String of the format "host:port_solr" convert to a URL of the format
+	 * "http://host:port/solr".
+	 * 
+	 * @param serverInfo Server Info from Zookeeper (required)
+	 * 
+	 * @return URL to SOLR
+	 */
+	private String constructURL(String serverInfo) {
+		String scheme = "http";
+
+		StringBuffer buffer = new StringBuffer();
+		buffer.append(scheme);
+		buffer.append("://");
+		buffer.append(serverInfo.replace("_", "/"));
+		return buffer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java
new file mode 100644
index 0000000..68b68c6
--- /dev/null
+++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import java.io.IOException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * Apache HttpClient ResponseHandler for String HttpResponse
+ */
+public class StringResponseHandler implements ResponseHandler<String>
+{
+	@Override
+	public String handleResponse(HttpResponse response)
+	throws ClientProtocolException, IOException 
+	{
+		int status = response.getStatusLine().getStatusCode();
+		
+		if (status >= 200 && status < 300)
+		{
+			HttpEntity entity = response.getEntity();
+			return entity != null ?EntityUtils.toString(entity) : null;
+		}
+		else
+		{
+			throw new ClientProtocolException("Unexcepted response status: " + status);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
b/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
index d1ec0b9..7530ac6 100644
--- a/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
+++ b/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
@@ -16,4 +16,7 @@
 # limitations under the License.
 ##########################################################################
 
-org.apache.hadoop.gateway.ha.provider.impl.HS2ZookeeperURLManager
\ No newline at end of file
+org.apache.hadoop.gateway.ha.provider.impl.HS2ZookeeperURLManager
+org.apache.hadoop.gateway.ha.provider.impl.SOLRZookeeperURLManager
+org.apache.hadoop.gateway.ha.provider.impl.KafkaZookeeperURLManager
+org.apache.hadoop.gateway.ha.provider.impl.HBaseZookeeperURLManager
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java
new file mode 100644
index 0000000..087651e
--- /dev/null
+++ b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.URLManager;
+import org.apache.hadoop.gateway.ha.provider.URLManagerLoader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple unit tests for HBaseZookeeperURLManager.
+ * 
+ * @see HBaseZookeeperURLManager
+ */
+public class HBaseZookeeperURLManagerTest {
+	
+  private TestingCluster cluster;
+
+  @Before
+  public void setup() throws Exception {
+    cluster = new TestingCluster(3);
+    cluster.start();
+
+    CuratorFramework zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+
+    zooKeeperClient.start();
+    zooKeeperClient.create().forPath("/hbase-unsecure");
+    zooKeeperClient.create().forPath("/hbase-unsecure/rs");
+    zooKeeperClient.close();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    cluster.stop();
+  }
+
+  @Test
+  public void testHBaseZookeeperURLManagerLoading() {
+    HaServiceConfig config = new DefaultHaServiceConfig("WEBHBASE");
+    config.setEnabled(true);
+    config.setZookeeperEnsemble(cluster.getConnectString());
+    URLManager manager = URLManagerLoader.loadURLManager(config);
+    Assert.assertNotNull(manager);
+    Assert.assertTrue(manager instanceof HBaseZookeeperURLManager);
+  }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java
new file mode 100644
index 0000000..50dedbf
--- /dev/null
+++ b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.URLManager;
+import org.apache.hadoop.gateway.ha.provider.URLManagerLoader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple unit tests for KafkaZookeeperURLManager.
+ * 
+ * @see KafkaZookeeperURLManager
+ */
+public class KafkaZookeeperURLManagerTest {
+  private TestingCluster cluster;
+
+  @Before
+  public void setup() throws Exception {
+    cluster = new TestingCluster(3);
+    cluster.start();
+
+    CuratorFramework zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+
+    zooKeeperClient.start();
+    zooKeeperClient.create().forPath("/brokers");
+    zooKeeperClient.create().forPath("/brokers/ids");
+    zooKeeperClient.close();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    cluster.stop();
+  }
+	
+  @Test
+  public void testHBaseZookeeperURLManagerLoading() {
+    HaServiceConfig config = new DefaultHaServiceConfig("KAFKA");
+    config.setEnabled(true);
+    config.setZookeeperEnsemble(cluster.getConnectString());
+    URLManager manager = URLManagerLoader.loadURLManager(config);
+    Assert.assertNotNull(manager);
+    Assert.assertTrue(manager instanceof KafkaZookeeperURLManager);
+  }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java
----------------------------------------------------------------------
diff --git a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java
new file mode 100644
index 0000000..6cc6fa7
--- /dev/null
+++ b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.URLManager;
+import org.apache.hadoop.gateway.ha.provider.URLManagerLoader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Simple unit tests for SOLRZookeeperURLManager.
+ * 
+ * @see SOLRZookeeperURLManager
+ */
+public class SOLRZookeeperURLManagerTest {
+
+  private TestingCluster cluster;
+  private SOLRZookeeperURLManager manager;
+
+  @Before
+  public void setup() throws Exception {
+    cluster = new TestingCluster(3);
+    cluster.start();
+
+    CuratorFramework zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+
+    zooKeeperClient.start();
+    zooKeeperClient.create().forPath("/live_nodes");
+    zooKeeperClient.create().forPath("/live_nodes/host1:8983_solr");
+    zooKeeperClient.create().forPath("/live_nodes/host2:8983_solr");
+    zooKeeperClient.create().forPath("/live_nodes/host3:8983_solr");
+    zooKeeperClient.close();
+    manager = new SOLRZookeeperURLManager();
+    HaServiceConfig config = new DefaultHaServiceConfig("SOLR");
+    config.setEnabled(true);
+    config.setZookeeperEnsemble(cluster.getConnectString());
+    manager.setConfig(config);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    cluster.stop();
+  }
+
+  @Test
+  public void testURLs() throws Exception {
+    List<String> urls = manager.getURLs();
+    Assert.assertNotNull(urls);
+
+    // Order of URLS is not deterministic out of Zookeeper
+    // So we just check for expected values
+    
+    TreeSet<String> expected = new TreeSet<String>();
+
+    expected.add("http://host1:8983/solr");
+    expected.add("http://host2:8983/solr");
+    expected.add("http://host3:8983/solr");
+    
+    for(String url : urls)
+    {
+    	assertTrue(expected.contains(url));
+    	expected.remove(url);
+    }
+    
+    assertEquals(0,expected.size());
+    
+    // Unable to test markFailed because the SOLRZookeeperURLManager always does a refresh
on Zookeeper contents.
+  }
+
+  @Test
+  public void testSOLRZookeeperURLManagerLoading() {
+    HaServiceConfig config = new DefaultHaServiceConfig("SOLR");
+    config.setEnabled(true);
+    config.setZookeeperEnsemble(cluster.getConnectString());
+    URLManager manager = URLManagerLoader.loadURLManager(config);
+    Assert.assertNotNull(manager);
+    Assert.assertTrue(manager instanceof SOLRZookeeperURLManager);
+  }
+}


Mime
View raw message