storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [03/13] storm git commit: Extract ES client creator from ES bolts
Date Thu, 27 Aug 2015 22:12:47 GMT
Extract ES client creator from ES bolts


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3400c7db
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3400c7db
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3400c7db

Branch: refs/heads/master
Commit: 3400c7db4b70b6a61750f3d4ad1f948c49e9a34d
Parents: e0bee05
Author: Alex Panov <alex.panov@teradata.com>
Authored: Thu Aug 20 12:28:08 2015 +0200
Committer: Alex Panov <alex.panov@teradata.com>
Committed: Thu Aug 20 12:28:08 2015 +0200

----------------------------------------------------------------------
 .../elasticsearch/bolt/AbstractEsBolt.java      | 50 +++++++-------
 .../elasticsearch/bolt/ElasticSearchClient.java | 57 ++++++++++++++++
 .../elasticsearch/bolt/TransportAddresses.java  | 72 ++++++++++++++++++++
 .../bolt/TransportAddressesTest.java            | 64 +++++++++++++++++
 4 files changed, 216 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3400c7db/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index ff1b543..a19a660 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -17,27 +17,26 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
+import java.util.Map;
+
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
 
 public abstract class AbstractEsBolt extends BaseRichBolt {
+
     private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
+
     protected static Client client;
+
     protected OutputCollector collector;
     private EsConfig esConfig;
 
@@ -51,19 +50,7 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
             this.collector = outputCollector;
             synchronized (AbstractEsBolt.class) {
                 if (client == null) {
-                    Settings settings =
-                            ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
-                                    .put("client.transport.sniff", "true").build();
-                    List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
-                    for (String node : esConfig.getNodes()) {
-                        String[] hostAndPort = node.split(":");
-                        if(hostAndPort.length != 2){
-                            throw new IllegalArgumentException("incorrect Elasticsearch node
format, should follow {host}:{port} pattern");
-                        }
-                        transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0],
Integer.parseInt(hostAndPort[1])));
-                    }
-                    client = new TransportClient(settings)
-                            .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
+                    client = new ElasticSearchClient(esConfig).construct();
                 }
             }
         } catch (Exception e) {
@@ -72,10 +59,19 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
     }
 
     @Override
-    public void execute(Tuple tuple) {
-    }
+    public abstract void execute(Tuple tuple);
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
     }
+
+    @VisibleForTesting
+    static Client getClient() {
+        return AbstractEsBolt.client;
+    }
+
+    @VisibleForTesting
+    static void replaceClient(Client client) {
+        AbstractEsBolt.client = client;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3400c7db/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
new file mode 100644
index 0000000..0a9f4ea
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import java.io.Serializable;
+
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+final class ElasticSearchClient implements Serializable {
+
+    private final EsConfig esConfig;
+
+    ElasticSearchClient(EsConfig esConfig) {
+        this.esConfig = esConfig;
+    }
+
+    Client construct() {
+        Settings settings = createBasicSettings();
+        TransportClient transportClient = new TransportClient(settings);
+        addTransportAddresses(transportClient);
+        return transportClient;
+    }
+
+    private Settings createBasicSettings() {
+        return ImmutableSettings.settingsBuilder()
+                                .put("cluster.name", esConfig.getClusterName())
+                                .put("client.transport.sniff", "true")
+                                .build();
+    }
+
+    private void addTransportAddresses(TransportClient transportClient) {
+        Iterable<InetSocketTransportAddress> transportAddresses = new TransportAddresses(esConfig.getNodes());
+        for (InetSocketTransportAddress transportAddress : transportAddresses) {
+            transportClient.addTransportAddress(transportAddress);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3400c7db/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
new file mode 100644
index 0000000..f4479df
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+final class TransportAddresses implements Iterable<InetSocketTransportAddress> {
+
+    static final String DELIMETER = ":";
+
+    private final String[] nodes;
+
+    TransportAddresses(String[] nodes) {
+        if (nodes == null) {
+            throw new IllegalArgumentException("Elasticsearch hosts cannot be null");
+        }
+        if (nodes.length == 0) {
+            throw new IllegalArgumentException("At least one Elasticsearch host must be specified");
+        }
+
+        this.nodes = nodes;
+    }
+
+    @Override
+    public Iterator<InetSocketTransportAddress> iterator() {
+        List<InetSocketTransportAddress> result = new LinkedList<>();
+
+        for (String node : nodes) {
+            InetSocketTransportAddress transportAddress = transformToInetAddress(node);
+            result.add(transportAddress);
+        }
+
+        return result.iterator();
+    }
+
+    private InetSocketTransportAddress transformToInetAddress(String node) {
+        String[] hostAndPort = node.split(DELIMETER);
+        if (hostAndPort.length != 2) {
+            throw new IllegalArgumentException(
+                    "Incorrect Elasticsearch node format, should follow {host}" + DELIMETER
+ "{port} pattern");
+        }
+        String hostname = hostname(hostAndPort[0]);
+        return new InetSocketTransportAddress(hostname, port(hostAndPort[1]));
+    }
+
+    private String hostname(String input) {
+        return input.trim();
+    }
+
+    private int port(String input) {
+        return Integer.parseInt(input.trim());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3400c7db/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
new file mode 100644
index 0000000..ea07131
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
@@ -0,0 +1,64 @@
+package org.apache.storm.elasticsearch.bolt;
+
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+public class TransportAddressesTest {
+
+    @Test
+    public void readsMultipleHosts() throws Exception {
+        String[] hosts = new String[] {"h1:1000", "h2:10003"};
+        TransportAddresses addresses = new TransportAddresses(hosts);
+        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
+                                                 new InetSocketTransportAddress("h2", 10003)));
+    }
+
+    @Test
+    public void stripsSpaces() throws Exception {
+        String[] hosts = new String[] {"h1:1000", " h2:10003 "};
+        TransportAddresses addresses = new TransportAddresses(hosts);
+        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
+                                                 new InetSocketTransportAddress("h2", 10003)));
+    }
+
+    @Test
+    public void readsOneHost() throws Exception {
+        String[] hosts = new String[] {"h1:1000"};
+        TransportAddresses addresses = new TransportAddresses(hosts);
+        assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnNullHosts() throws Exception {
+        new TransportAddresses(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnEmptyArray() throws Exception {
+        new TransportAddresses(new String[] {}).iterator();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnInvalidHostAndPortPair() throws Exception {
+        new TransportAddresses(new String[] {"h1:1000", "h2"}).iterator();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnInvalidPortValue() throws Exception {
+        new TransportAddresses(new String[] {"h1:-1000"}).iterator();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnPortNotANumber() throws Exception {
+        new TransportAddresses(new String[] {"h1:dummy"}).iterator();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsOnInvalidHostAndPortFormat() throws Exception {
+        new TransportAddresses(new String[] {"h1:dummy:231"}).iterator();
+    }
+
+}


Mime
View raw message