storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/7] storm git commit: STORM-974 Introduces Tuple -> ES document mapper to get rid of constant field mapping
Date Thu, 27 Aug 2015 22:32:26 GMT
Repository: storm
Updated Branches:
  refs/heads/master 4faee0e29 -> 07e0ff2e2


STORM-974 Introduces Tuple -> ES document mapper to get rid of constant field mapping

* EsTupleMapper defines how to extract source, index, type, and id from tuple for ElasticSearch.
* Applies EsTupleMapper to completely get rid of constant field mapping.
* Also modifying README.md to show how to use.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1f93a3f0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1f93a3f0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1f93a3f0

Branch: refs/heads/master
Commit: 1f93a3f0f8194715c35869766dec5fc623066d3e
Parents: b0baa63
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Wed Aug 12 22:46:13 2015 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Wed Aug 12 22:47:29 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md          | 69 ++++++++++++++------
 .../storm/elasticsearch/bolt/EsIndexBolt.java   | 23 ++++---
 .../elasticsearch/bolt/EsPercolateBolt.java     | 22 +++++--
 .../elasticsearch/common/ESTupleMapper.java     | 55 ++++++++++++++++
 .../storm/elasticsearch/trident/EsState.java    | 24 +++++--
 .../elasticsearch/trident/EsStateFactory.java   | 11 +++-
 .../storm/elasticsearch/trident/EsUpdater.java  |  4 ++
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  5 +-
 .../elasticsearch/bolt/EsIndexTopology.java     |  4 +-
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  4 +-
 .../storm/elasticsearch/common/EsTestUtil.java  | 25 +++++++
 .../trident/TridentEsTopology.java              |  6 +-
 12 files changed, 205 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 4fa1098..12a7c67 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -6,37 +6,75 @@
 ## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
 
 EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index
& type combination. 
-User should make sure that there are "source", "index","type", and "id" fields declared in
preceding bolts or spout.
-"index" and "type" fields are used for identifying target index and type.
+Users should make sure that ```EsTupleMapper``` can extract "source", "index", "type", and
"id" from input tuple.
+"index" and "type" are used for identifying target index and type.
 "source" is a document in JSON format string that will be indexed in Elasticsearch.
 
 ```java
+class SampleEsTupleMapper implements EsTupleMapper {
+    @Override
+    public String getSource(ITuple tuple) {
+        return tuple.getStringByField("source");
+    }
+
+    @Override
+    public String getIndex(ITuple tuple) {
+        return tuple.getStringByField("index");
+    }
+
+    @Override
+    public String getType(ITuple tuple) {
+        return tuple.getStringByField("type");
+    }
+
+    @Override
+    public String getId(ITuple tuple) {
+        return tuple.getStringByField("id");
+    }
+}
+
 EsConfig esConfig = new EsConfig();
 esConfig.setClusterName(clusterName);
 esConfig.setNodes(new String[]{"localhost:9300"});
-EsIndexBolt indexBolt = new EsIndexBolt(esConfig);
+EsTupleMapper tupleMapper = new SampleEsTupleMapper();
+EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
 ```
 
 ## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
 
 EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate
request to specified index & type combination. 
-User should make sure that there are "source", "index", and "type" fields declared in preceding
bolts or spout.
-"index" and "type" fields are used for identifying target index and type.
+User should make sure ```EsTupleMapper``` can extract "source", "index", "type" from input
tuple.
+"index" and "type" are used for identifying target index and type.
 "source" is a document in JSON format string that will be sent in percolate request to Elasticsearch.
 
 ```java
 EsConfig esConfig = new EsConfig();
 esConfig.setClusterName(clusterName);
 esConfig.setNodes(new String[]{"localhost:9300"});
-EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
+EsTupleMapper tupleMapper = new SampleEsTupleMapper();
+EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
 ```
 
 If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original
source and Percolate.Match
 for each Percolate.Match in PercolateResponse.
 
+## EsState (org.apache.storm.elasticsearch.trident.EsState)
+
+Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig
and EsTupleMapper as an arg.
+
+```code
+   EsConfig esConfig = new EsConfig();
+   esConfig.setClusterName(clusterName);
+   esConfig.setNodes(new String[]{"localhost:9300"});
+   EsTupleMapper tupleMapper = new SampleEsTupleMapper();
+
+   StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+   TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+ ```
+
 ## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
   
-Two bolts above takes in EsConfig as a constructor arg.
+Provided components (Bolt, State) takes in EsConfig as a constructor arg.
 
   ```java
    EsConfig esConfig = new EsConfig();
@@ -51,20 +89,11 @@ Two bolts above takes in EsConfig as a constructor arg.
 |clusterName | Elasticsearch cluster name | String (required) |
 |nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port}
pattern | String array (required) |
 
+## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
 
- 
-## EsState (org.apache.storm.elasticsearch.trident.EsState)
-
-Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig
as an arg.
-
-```code
-   EsConfig esConfig = new EsConfig();
-   esConfig.setClusterName(clusterName);
-   esConfig.setNodes(new String[]{"localhost:9300"});
-
-   StateFactory factory = new EsStateFactory(esConfig);
-   TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
- ```
+For storing tuple to Elasticsearch or percolating tuple from Elasticsearch, we need to define
which fields are used for.
+Users need to define your own by implementing ```EsTupleMapper```. 
+You can refer ```SampleEsTupleMapper``` above to see how to implement your own.
   
 ## Committer Sponsors
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 0d5cff8..36b9d28 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -22,19 +22,24 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.elasticsearch.common.EsConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 
 import java.util.Map;
 
+/**
+ * Basic bolt for storing tuple to ES document.
+ */
 public class EsIndexBolt extends AbstractEsBolt {
+    private final EsTupleMapper tupleMapper;
 
     /**
      * EsIndexBolt constructor
      * @param esConfig Elasticsearch configuration containing node addresses and cluster
name {@link EsConfig}
+     * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
      */
-    public EsIndexBolt(EsConfig esConfig) {
+    public EsIndexBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
         super(esConfig);
+        this.tupleMapper = tupleMapper;
     }
 
     @Override
@@ -43,16 +48,16 @@ public class EsIndexBolt extends AbstractEsBolt {
     }
 
     /**
-     * Executes index request for given tuple.
-     * @param tuple should contain string values of 4 declared fields: "source", "index",
"type", "id"
+     * {@inheritDoc}
+     * Tuple should have relevant fields (source, index, type, id) for tupleMapper to extract
ES document.
      */
     @Override
     public void execute(Tuple tuple) {
         try {
-            String source = tuple.getStringByField("source");
-            String index = tuple.getStringByField("index");
-            String type = tuple.getStringByField("type");
-            String id = tuple.getStringByField("id");
+            String source = tupleMapper.getSource(tuple);
+            String index = tupleMapper.getIndex(tuple);
+            String type = tupleMapper.getType(tuple);
+            String id = tupleMapper.getId(tuple);
 
             client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
             collector.ack(tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 394462e..f6c5ef9 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.action.percolate.PercolateSourceBuilder;
 import org.slf4j.Logger;
@@ -31,14 +32,21 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+/**
+ * Basic bolt for retrieve matched percolate queries.
+ */
 public class EsPercolateBolt extends AbstractEsBolt {
 
+    private final EsTupleMapper tupleMapper;
+
     /**
      * EsPercolateBolt constructor
      * @param esConfig Elasticsearch configuration containing node addresses and cluster
name {@link EsConfig}
+     * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
      */
-    public EsPercolateBolt(EsConfig esConfig) {
+    public EsPercolateBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
         super(esConfig);
+        this.tupleMapper = tupleMapper;
     }
 
     @Override
@@ -47,15 +55,17 @@ public class EsPercolateBolt extends AbstractEsBolt {
     }
 
     /**
-     * Executes percolate request for given tuple.
-     * @param tuple should contain string values of 3 declared fields: "source", "index",
"type"
+     * {@inheritDoc}
+     * Tuple should have relevant fields (source, index, type) for storeMapper to extract
ES document.<br/>
+     * If there exists non-empty percolate response, EsPercolateBolt will emit tuple with
original source
+     * and Percolate.Match for each Percolate.Match in PercolateResponse.
      */
     @Override
     public void execute(Tuple tuple) {
         try {
-            String source = tuple.getStringByField("source");
-            String index = tuple.getStringByField("index");
-            String type = tuple.getStringByField("type");
+            String source = tupleMapper.getSource(tuple);
+            String index = tupleMapper.getIndex(tuple);
+            String type = tupleMapper.getType(tuple);
 
             PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
                     .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/ESTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/ESTupleMapper.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/ESTupleMapper.java
new file mode 100644
index 0000000..f8a66bd
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/ESTupleMapper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * TupleMapper defines how to extract source, index, type, and id from tuple for ElasticSearch.
+ */
+public interface EsTupleMapper extends Serializable {
+    /**
+     * Extracts source from tuple.
+     * @param tuple source tuple
+     * @return source
+     */
+    String getSource(ITuple tuple);
+
+    /**
+     * Extracts index from tuple.
+     * @param tuple source tuple
+     * @return index
+     */
+    String getIndex(ITuple tuple);
+
+    /**
+     * Extracts type from tuple.
+     * @param tuple source tuple
+     * @return type
+     */
+    String getType(ITuple tuple);
+
+    /**
+     * Extracts id from tuple.
+     * @param tuple source tuple
+     * @return id
+     */
+    String getId(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e804084..e3865e5 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -20,6 +20,7 @@ package org.apache.storm.elasticsearch.trident;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.topology.FailedException;
 import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
@@ -37,17 +38,23 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Trident State for storing tuple to ES document.
+ */
 public class EsState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
     private static Client client;
     private EsConfig esConfig;
+    private EsTupleMapper tupleMapper;
 
     /**
      * EsState constructor
      * @param esConfig Elasticsearch configuration containing node addresses and cluster
name {@link EsConfig}
+     * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
      */
-    public EsState(EsConfig esConfig) {
+    public EsState(EsConfig esConfig, EsTupleMapper tupleMapper) {
         this.esConfig = esConfig;
+        this.tupleMapper = tupleMapper;
     }
 
     /**
@@ -98,13 +105,20 @@ public class EsState implements State {
         }
     }
 
+    /**
+     * Store current state to ElasticSearch.
+     *
+     * @param tuples list of tuples for storing to ES.
+     *               Each tuple should have relevant fields (source, index, type, id) for
EsState's tupleMapper to extract ES document.
+     * @param collector
+     */
     public void updateState(List<TridentTuple> tuples, TridentCollector collector)
{
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         for (TridentTuple tuple : tuples) {
-            String source = tuple.getStringByField("source");
-            String index = tuple.getStringByField("index");
-            String type = tuple.getStringByField("type");
-            String id = tuple.getStringByField("id");
+            String source = tupleMapper.getSource(tuple);
+            String index = tupleMapper.getIndex(tuple);
+            String type = tupleMapper.getType(tuple);
+            String id = tupleMapper.getId(tuple);
 
             bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index c3a2e6c..2d3fa60 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -19,6 +19,7 @@ package org.apache.storm.elasticsearch.trident;
 
 import backtype.storm.task.IMetricsContext;
 import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.trident.state.State;
@@ -26,8 +27,12 @@ import storm.trident.state.StateFactory;
 
 import java.util.Map;
 
+/**
+ * StateFactory for providing EsState.
+ */
 public class EsStateFactory implements StateFactory {
     private EsConfig esConfig;
+    private EsTupleMapper tupleMapper;
 
     public EsStateFactory(){
 
@@ -36,14 +41,16 @@ public class EsStateFactory implements StateFactory {
     /**
      * EsStateFactory constructor
      * @param esConfig Elasticsearch configuration containing node addresses and cluster
name {@link EsConfig}
+     * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
      */
-    public EsStateFactory(EsConfig esConfig){
+    public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
         this.esConfig = esConfig;
+        this.tupleMapper = tupleMapper;
     }
 
     @Override
     public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions)
{
-        EsState esState = new EsState(esConfig);
+        EsState esState = new EsState(esConfig, tupleMapper);
         esState.prepare(conf, metrics, partitionIndex, numPartitions);
         return esState;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
index 6fa42f3..935c92e 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
@@ -24,6 +24,10 @@ import storm.trident.tuple.TridentTuple;
 import java.util.List;
 
 public class EsUpdater extends BaseStateUpdater<EsState> {
+    /**
+     * {@inheritDoc}
+     * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper
to extract ES document.
+     */
     @Override
     public void updateState(EsState state, List<TridentTuple> tuples, TridentCollector
collector) {
         state.updateState(tuples, collector);

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index dd4b088..8176a4b 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -20,6 +20,7 @@ package org.apache.storm.elasticsearch.bolt;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.elasticsearch.action.count.CountRequest;
 import org.elasticsearch.action.count.CountRequestBuilder;
 import org.elasticsearch.action.count.CountResponse;
@@ -43,7 +44,9 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{
         esConfig.setClusterName("test-cluster");
         esConfig.setNodes(new String[]{"127.0.0.1:9300"});
 
-        bolt = new EsIndexBolt(esConfig);
+        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+
+        bolt = new EsIndexBolt(esConfig, tupleMapper);
         bolt.prepare(config, null, collector);
 
         String source = "{\"user\":\"user1\"}";

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index fc9c178..1f0118b 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -29,6 +29,7 @@ import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsConstants;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 
 import java.util.Map;
 import java.util.UUID;
@@ -49,7 +50,8 @@ public class EsIndexTopology {
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName(EsConstants.clusterName);
         esConfig.setNodes(new String[]{"localhost:9300"});
-        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
+        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
 
         EsTestUtil.startEsNode();
         EsTestUtil.waitForSeconds(5);

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index fd4fa4f..1733286 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -21,6 +21,7 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.index.query.TermQueryBuilder;
@@ -42,7 +43,8 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName("test-cluster");
         esConfig.setNodes(new String[]{"localhost:9300"});
-        bolt = new EsPercolateBolt(esConfig);
+        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+        bolt = new EsPercolateBolt(esConfig, tupleMapper);
         bolt.prepare(config, null, collector);
 
         String source = "{\"user\":\"user1\"}";

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 3b20383..572f03b 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -21,6 +21,7 @@ import backtype.storm.Config;
 import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
@@ -46,6 +47,30 @@ public class EsTestUtil {
         return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, "");
     }
 
+    public static EsTupleMapper generateDefaultTupleMapper() {
+        return new EsTupleMapper() {
+            @Override
+            public String getSource(ITuple tuple) {
+                return tuple.getStringByField("source");
+            }
+
+            @Override
+            public String getIndex(ITuple tuple) {
+                return tuple.getStringByField("index");
+            }
+
+            @Override
+            public String getType(ITuple tuple) {
+                return tuple.getStringByField("type");
+            }
+
+            @Override
+            public String getId(ITuple tuple) {
+                return tuple.getStringByField("id");
+            }
+        };
+    }
+
     public static Node startEsNode(){
         Node node = NodeBuilder.nodeBuilder().data(true).settings(
                 ImmutableSettings.builder()

http://git-wip-us.apache.org/repos/asf/storm/blob/1f93a3f0/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 2c951f8..ee5e607 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -25,6 +25,7 @@ import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsConstants;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
@@ -48,8 +49,9 @@ public class TridentEsTopology {
         EsConfig esConfig = new EsConfig();
         esConfig.setClusterName(EsConstants.clusterName);
         esConfig.setNodes(new String[]{"localhost:9300"});
-        Fields esFields = new Fields("index", "type", "source");
-        StateFactory factory = new EsStateFactory(esConfig);
+        Fields esFields = new Fields("index", "type", "source", "id");
+        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+        StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
         TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(),
new Fields());
 
         EsTestUtil.startEsNode();


Mime
View raw message