storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [01/11] storm git commit: STORM-845 Storm ElasticSearch connector
Date Wed, 05 Aug 2015 22:58:46 GMT
Repository: storm
Updated Branches:
  refs/heads/master 827ddbd8c -> 124e8468a


STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/acaab9c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/acaab9c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/acaab9c3

Branch: refs/heads/master
Commit: acaab9c3d6c13952a722480bc64393b5b2dc61bc
Parents: 0c00544
Author: SEUNGJIN LEE <sweetest.sj@navercorp.com>
Authored: Thu Jul 23 14:20:14 2015 +0900
Committer: SEUNGJIN LEE <sweetest.sj@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md          |  4 ++--
 .../storm/elasticsearch/bolt/EsIndexBolt.java   | 14 ++++++++++--
 .../elasticsearch/bolt/EsPercolateBolt.java     | 11 ++++++++-
 .../storm/elasticsearch/common/EsConfig.java    |  7 +++++-
 .../storm/elasticsearch/trident/EsState.java    | 24 ++++++++++++++++++--
 .../elasticsearch/trident/EsStateFactory.java   |  4 ++++
 .../elasticsearch/bolt/EsIndexBoltTest.java     | 23 +++++++++++++++++--
 .../elasticsearch/bolt/EsIndexTopology.java     |  4 ++--
 .../elasticsearch/bolt/EsPercolateBoltTest.java | 12 ++++++++--
 .../storm/elasticsearch/common/EsTestUtil.java  |  6 ++---
 .../trident/TridentEsTopology.java              | 15 +++++-------
 11 files changed, 98 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 3fa1592..9e1bbb8 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -6,7 +6,7 @@
 ## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
 
 EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index
& type combination. 
-User should make sure that there are "index","type", and "source" fields declared in preceding
bolts or spout.
+User should make sure that there are "source", "index","type", and "id" fields declared in
preceding bolts or spout.
 "index" and "type" fields are used for identifying target index and type.
 "source" is a document in JSON format string that will be indexed in elastic search.
 
@@ -20,7 +20,7 @@ EsIndexBolt indexBolt = new IndexBolt(esConfig);
 ## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
 
 EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate
request to specified index & type combination. 
-User should make sure that there are "index","type", and "source" fields declared in preceding
bolts or spout.
+User should make sure that there are "source", "index", and "type" fields declared in preceding
bolts or spout.
 "index" and "type" fields are used for identifying target index and type.
 "source" is a document in JSON format string that will be sent in percolate request to elastic
search.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 9d7522c..c1d7daa 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -30,6 +30,10 @@ import java.util.Map;
 public class EsIndexBolt extends AbstractEsBolt {
     private static final Logger LOG = LoggerFactory.getLogger(EsIndexBolt.class);
 
+    /**
+     * EsIndexBolt constructor
+     * @param esConfig Elasticsearch configuration containing node addresses and cluster
name {@link EsConfig}
+     */
     public EsIndexBolt(EsConfig esConfig) {
         super(esConfig);
     }
@@ -39,13 +43,19 @@ public class EsIndexBolt extends AbstractEsBolt {
         super.prepare(map, topologyContext, outputCollector);
     }
 
+    /**
+     * Executes index request for given tuple.
+     * @param tuple should contain string values of 4 declared fields: "source", "index",
"type", "id"
+     */
     @Override
     public void execute(Tuple tuple) {
         try {
+            String source = tuple.getStringByField("source");
             String index = tuple.getStringByField("index");
             String type = tuple.getStringByField("type");
-            String source = tuple.getStringByField("source");
-            client.prepareIndex(index, type).setSource(source).execute().actionGet();
+            String id = tuple.getStringByField("id");
+
+            client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
             collector.ack(tuple);
         } catch (Exception e) {
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 3142fc1..7ee6835 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -34,6 +34,10 @@ import java.util.Map;
 public class EsPercolateBolt extends AbstractEsBolt {
     private static final Logger LOG = LoggerFactory.getLogger(EsPercolateBolt.class);
 
+    /**
+     * EsPercolateBolt constructor
+     * @param esConfig Elasticsearch configuration containing node addresses and cluster
name {@link EsConfig}
+     */
     public EsPercolateBolt(EsConfig esConfig) {
         super(esConfig);
     }
@@ -43,12 +47,17 @@ public class EsPercolateBolt extends AbstractEsBolt {
         super.prepare(map, topologyContext, outputCollector);
     }
 
+    /**
+     * Executes percolate request for given tuple.
+     * @param tuple should contain string values of 3 declared fields: "source", "index",
"type"
+     */
     @Override
     public void execute(Tuple tuple) {
         try {
+            String source = tuple.getStringByField("source");
             String index = tuple.getStringByField("index");
             String type = tuple.getStringByField("type");
-            String source = tuple.getStringByField("source");
+
             PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
                     .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
             if (response.getCount() > 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index c97d77f..0b57788 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -26,7 +26,12 @@ public class EsConfig implements Serializable{
     public EsConfig() {
     }
 
-    public EsConfig(String clusterName, String[] nodes, int port) {
+    /**
+     * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
+     * @param clusterName Elasticsearch cluster name
+     * @param nodes Elasticsearch addresses in host:port pattern string array
+     */
+    public EsConfig(String clusterName, String[] nodes) {
         this.clusterName = clusterName;
         this.nodes = nodes;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 58de6cc..e804084 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -42,15 +42,33 @@ public class EsState implements State {
     private static Client client;
     private EsConfig esConfig;
 
+    /**
+     * EsState constructor
+     * @param esConfig Elasticsearch configuration containing node addresses and cluster
name {@link EsConfig}
+     */
     public EsState(EsConfig esConfig) {
         this.esConfig = esConfig;
     }
 
+    /**
+     * @param txid
+     *
+     * Elasticsearch index requests with same id will result in update operation
+     * which means if same tuple replays, only one record will be stored in elasticsearch
for same document
+     * without control with txid
+     */
     @Override
     public void beginCommit(Long txid) {
 
     }
 
+    /**
+     * @param txid
+     *
+     * Elasticsearch index requests with same id will result in update operation
+     * which means if same tuple replays, only one record will be stored in elasticsearch
for same document
+     * without control with txid
+     */
     @Override
     public void commit(Long txid) {
 
@@ -83,10 +101,12 @@ public class EsState implements State {
     public void updateState(List<TridentTuple> tuples, TridentCollector collector)
{
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         for (TridentTuple tuple : tuples) {
+            String source = tuple.getStringByField("source");
             String index = tuple.getStringByField("index");
             String type = tuple.getStringByField("type");
-            String source = tuple.getStringByField("source");
-            bulkRequest.add(client.prepareIndex(index, type).setSource(source));
+            String id = tuple.getStringByField("id");
+
+            bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
         }
         BulkResponse bulkResponse = bulkRequest.execute().actionGet();
         if (bulkResponse.hasFailures()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index d7f4330..b1eaa04 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -34,6 +34,10 @@ public class EsStateFactory implements StateFactory {
 
     }
 
+    /**
+     * EsStateFactory constructor
+     * @param esConfig Elasticsearch configuration containing node addresses and cluster
name {@link EsConfig}
+     */
     public EsStateFactory(EsConfig esConfig){
         this.esConfig = esConfig;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index 28b8bf7..dd4b088 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -20,6 +20,12 @@ package org.apache.storm.elasticsearch.bolt;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.elasticsearch.action.count.CountRequest;
+import org.elasticsearch.action.count.CountRequestBuilder;
+import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,14 +42,27 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName("test-cluster");
         esConfig.setNodes(new String[]{"127.0.0.1:9300"});
+
         bolt = new EsIndexBolt(esConfig);
         bolt.prepare(config, null, collector);
+
+        String source = "{\"user\":\"user1\"}";
         String index = "index1";
         String type = "type1";
-        String source = "{\"user\":\"user1\"}";
-        Tuple tuple = EsTestUtil.generateTestTuple(index, type, source);
+        String id = "docId";
+        Tuple tuple = EsTestUtil.generateTestTuple(source, index, type, id);
+
         bolt.execute(tuple);
+
         verify(collector).ack(tuple);
+
+        node.client().admin().indices().prepareRefresh(index).execute().actionGet();
+        CountResponse resp = node.client().prepareCount(index)
+                .setQuery(new TermQueryBuilder("_type", type))
+                .execute().actionGet();
+
+        Assert.assertEquals(1, resp.getCount());
+
         bolt.cleanup();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index f5e868a..fc9c178 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -80,7 +80,7 @@ public class EsIndexTopology {
         private String typeName = "type1";
 
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("index", "type", "source"));
+            declarer.declare(new Fields("source", "index", "type", "id"));
         }
 
         public void open(Map config, TopologyContext context,
@@ -91,8 +91,8 @@ public class EsIndexTopology {
 
         public void nextTuple() {
             String source = sources[index];
-            Values values = new Values(indexName, typeName, source);
             UUID msgId = UUID.randomUUID();
+            Values values = new Values(source, indexName, typeName, msgId);
             this.pending.put(msgId, values);
             this.collector.emit(values, msgId);
             index++;

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index 4520389..ea0504b 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -21,6 +21,9 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,17 +42,22 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
         esConfig.setNodes(new String[]{"localhost:9300"});
         bolt = new EsPercolateBolt(esConfig);
         bolt.prepare(config, null, collector);
+
+        String source = "{\"user\":\"user1\"}";
         String index = "index1";
         String type = ".percolator";
-        String source = "{\"user\":\"user1\"}";
+
         node.client().prepareIndex("index1",".percolator")
                 .setId("1")
                 .setSource("{\"query\":{\"match\":{\"user\":\"user1\"}}}").
                 execute().actionGet();
-        Tuple tuple = EsTestUtil.generateTestTuple(index, type, source);
+        Tuple tuple = EsTestUtil.generateTestTuple(source, index, type, null);
+
         bolt.execute(tuple);
+
         verify(collector).ack(tuple);
         verify(collector).emit(new Values("1"));
+
         bolt.cleanup();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 2c0026d..3b20383 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -34,16 +34,16 @@ import org.elasticsearch.node.NodeBuilder;
 import java.util.HashMap;
 
 public class EsTestUtil {
-    public static Tuple generateTestTuple(String index, String type, String source) {
+    public static Tuple generateTestTuple(String source, String index, String type, String
id) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
                 new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String streamId) {
-                return new Fields("index", "type", "source");
+                return new Fields("source", "index", "type", "id");
             }
         };
-        return new TupleImpl(topologyContext, new Values(index, type, source), 1, "");
+        return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, "");
     }
 
     public static Node startEsNode(){

http://git-wip-us.apache.org/repos/asf/storm/blob/acaab9c3/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index aed06f6..2c951f8 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -32,10 +32,7 @@ import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IBatchSpout;
 import storm.trident.state.StateFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class TridentEsTopology {
 
@@ -72,10 +69,10 @@ public class TridentEsTopology {
         int maxBatchSize;
         HashMap<Long, List<List<Object>>> batches = new HashMap<Long,
List<List<Object>>>();
         private Values[] outputs = {
-                new Values("index1", "type1", "{\"user\":\"user1\"}"),
-                new Values("index1", "type2", "{\"user\":\"user2\"}"),
-                new Values("index2", "type1", "{\"user\":\"user3\"}"),
-                new Values("index2", "type2", "{\"user\":\"user4\"}")
+                new Values("{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()),
+                new Values("{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString())
         };
         private int index = 0;
         boolean cycle = false;
@@ -90,7 +87,7 @@ public class TridentEsTopology {
 
         @Override
         public Fields getOutputFields() {
-            return new Fields("index", "type", "source");
+            return new Fields("source", "index", "type", "id");
         }
 
         @Override


Mime
View raw message