storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [03/11] storm git commit: STORM-845 Storm ElasticSearch connector
Date Wed, 05 Aug 2015 22:58:48 GMT
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a446cea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a446cea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a446cea

Branch: refs/heads/master
Commit: 6a446ceae184c0fe13cc993c7966ee73bb63394f
Parents: 9b2fd72
Author: SEUNGJIN LEE <sweetest.sj@navercorp.com>
Authored: Wed Jun 3 17:33:57 2015 +0900
Committer: SEUNGJIN LEE <sweetest.sj@navercorp.com>
Committed: Thu Jul 23 14:32:17 2015 +0900

----------------------------------------------------------------------
 README.markdown                                 |   1 +
 external/storm-elasticsearch/README.md          |  71 ++++++++++
 external/storm-elasticsearch/pom.xml            |  95 +++++++++++++
 .../elasticsearch/bolt/AbstractEsBolt.java      |  78 +++++++++++
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |  61 ++++++++
 .../elasticsearch/bolt/EsPercolateBolt.java     |  72 ++++++++++
 .../storm/elasticsearch/common/EsConfig.java    |  59 ++++++++
 .../storm/elasticsearch/trident/EsState.java    |  90 ++++++++++++
 .../elasticsearch/trident/EsStateFactory.java   |  47 +++++++
 .../storm/elasticsearch/trident/EsUpdater.java  |  31 +++++
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |  78 +++++++++++
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  50 +++++++
 .../elasticsearch/bolt/EsIndexTopology.java     | 121 ++++++++++++++++
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  56 ++++++++
 .../storm/elasticsearch/common/EsConstants.java |  22 +++
 .../storm/elasticsearch/common/EsTestUtil.java  |  70 ++++++++++
 .../trident/TridentEsTopology.java              | 139 +++++++++++++++++++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 ++
 19 files changed, 1156 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index 502e89f..4d758c7 100644
--- a/README.markdown
+++ b/README.markdown
@@ -210,6 +210,7 @@ under the License.
 * Charles Chan ([@charleswhchan](https://github.com/charleswhchan))
 * Chuanlei Ni ([@chuanlei](https://github.com/chuanlei))
 * Xingyu Su ([@errordaiwa](https://github.com/errordaiwa))
+* Adrian Seungjin Lee ([@sweetest](https://github.com/sweetest))
 
 ## Acknowledgements
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
new file mode 100644
index 0000000..562fd6d
--- /dev/null
+++ b/external/storm-elasticsearch/README.md
@@ -0,0 +1,71 @@
+# Storm ElasticSearch Bolt & Trident State
+
+  EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into ElasticSearch directly.
+  For detailed description, please refer to the following.   
+
+## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
+
+EsIndexBolt streams tuples directly into ElasticSearch. Tuples are indexed in specified index & type combination. 
+User should make sure that there are "index","type", and "source" fields declared in preceding bolts or spout.
+"index" and "type" fields are used for identifying target index and type.
+"source" is a document in JSON format string that will be indexed in elastic search.
+
+```java
+EsConfig esConfig = new EsConfig();
+esConfig.setClusterName(clusterName);
+esConfig.setHost(new String[]{"localhost"});
+esConfig.setPort(9300);
+EsIndexBolt indexBolt = new IndexBolt(esConfig);
+```
+
+## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
+
+EsPercolateBolt streams tuples directly into ElasticSearch. Tuples are used to send percolate request to specified index & type combination. 
+User should make sure that there are "index","type", and "source" fields declared in preceding bolts or spout.
+"index" and "type" fields are used for identifying target index and type.
+"source" is a document in JSON format string that will be sent in percolate request to elastic search.
+
+```java
+EsConfig esConfig = new EsConfig();
+esConfig.setClusterName(clusterName);
+esConfig.setHost(new String[]{"localhost"});
+esConfig.setPort(9300);
+EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
+```
+
+### EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
+  
+Two bolts above takes in EsConfig as a constructor arg.
+
+  ```java
+   EsConfig esConfig = new EsConfig();
+   esConfig.setClusterName(clusterName);
+   esConfig.setHost(new String[]{"localhost"});
+   esConfig.setPort(9300);
+  ```
+
+EsConfig params
+
+|Arg  |Description | Type
+|---	|--- |---
+|clusterName | ElasticSearch cluster name | String (required) |
+|host | ElasticSearch host | String array (required) |
+|port | ElasticSearch port | int (required) |
+
+
+ 
+## EsState (org.apache.storm.elasticsearch.trident.EsState)
+
+ElasticSearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig as an arg.
+
+```code
+   EsConfig esConfig = new EsConfig();
+   esConfig.setClusterName(clusterName);
+   esConfig.setHost(new String[]{"localhost"});
+   esConfig.setPort(9300);
+                	     		
+   StateFactory factory = new EsStateFactory(esConfig);
+   TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+ ```
+  
+## Committer Sponsors

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
new file mode 100644
index 0000000..8e5db5d
--- /dev/null
+++ b/external/storm-elasticsearch/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-elasticsearch</artifactId>
+
+    <developers>
+        <developer>
+            <id>sweetest</id>
+            <name>Adrian Seungjin Lee</name>
+            <email>sweetest.sj@navercorp.com</email>
+        </developer>
+    </developers>
+
+    <properties>
+        <elasticsearch.version>1.5.2</elasticsearch.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>0.9.4</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>1.5.0</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.0</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <version>2.5</version>
+                <executions>
+                    <execution>
+                        <id>cleanup</id>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>clean</goal>
+                        </goals>
+                        <configuration>
+                            <excludeDefaultDirectories>true</excludeDefaultDirectories>
+                            <filesets>
+                                <fileset>
+                                    <directory>./data/</directory>
+                                </fileset>
+                            </filesets>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
new file mode 100644
index 0000000..cd7fc81
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractEsBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
+    protected OutputCollector collector;
+    protected static Client client;
+    private EsConfig esConfig;
+
+    public AbstractEsBolt(EsConfig esConfig) {
+        this.esConfig = esConfig;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        System.out.println(this.getClass().getName());
+        try {
+            this.collector = outputCollector;
+            synchronized (AbstractEsBolt.class) {
+                if (client == null) {
+                    Settings settings =
+                            ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
+                                    .put("client.transport.sniff", "false").build();
+                    List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
+                    for (String host : esConfig.getHost()) {
+                        transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+                    }
+                    client = new TransportClient(settings)
+                            .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("unable to initialize EsBolt ", e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
new file mode 100644
index 0000000..9d7522c
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EsIndexBolt extends AbstractEsBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBolt.class);
+
+    public EsIndexBolt(EsConfig esConfig) {
+        super(esConfig);
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        super.prepare(map, topologyContext, outputCollector);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            String index = tuple.getStringByField("index");
+            String type = tuple.getStringByField("type");
+            String source = tuple.getStringByField("source");
+            client.prepareIndex(index, type).setSource(source).execute().actionGet();
+            collector.ack(tuple);
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println(e);
+            collector.reportError(e);
+            collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
new file mode 100644
index 0000000..3142fc1
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.percolate.PercolateSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EsPercolateBolt extends AbstractEsBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(EsPercolateBolt.class);
+
+    public EsPercolateBolt(EsConfig esConfig) {
+        super(esConfig);
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        super.prepare(map, topologyContext, outputCollector);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            String index = tuple.getStringByField("index");
+            String type = tuple.getStringByField("type");
+            String source = tuple.getStringByField("source");
+            PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
+                    .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
+            if (response.getCount() > 0) {
+                for (PercolateResponse.Match match : response) {
+                    String id = match.getId().toString();
+                    collector.emit(new Values(id));
+                }
+            }
+            collector.ack(tuple);
+        } catch (Exception e) {
+            e.printStackTrace();
+            collector.reportError(e);
+            collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields("id"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
new file mode 100644
index 0000000..f2aa48f
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import java.io.Serializable;
+
+public class EsConfig implements Serializable{
+    private String clusterName;
+    private String[] host;
+    private int port;
+
+    public EsConfig() {
+    }
+
+    public EsConfig(String clusterName, String[] host, int port) {
+        this.clusterName = clusterName;
+        this.host = host;
+        this.port = port;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String[] getHost() {
+        return host;
+    }
+
+    public void setHost(String[] host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
new file mode 100644
index 0000000..e753119
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class EsState implements State {
+    private EsConfig esConfig;
+    private static Client client;
+    private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
+
+    public EsState(EsConfig esConfig) {
+        this.esConfig = esConfig;
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+
+    }
+
+    @Override
+    public void commit(Long txid) {
+
+    }
+
+    public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        synchronized (EsState.class) {
+            if (client == null) {
+                Settings settings =
+                        ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
+                                .put("client.transport.sniff", "true").build();
+                List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
+                for (String host : esConfig.getHost()) {
+                    transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort()));
+                }
+                client = new TransportClient(settings)
+                        .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
+            }
+        }
+
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+        for (TridentTuple tuple : tuples) {
+            String index = tuple.getStringByField("index");
+            String type = tuple.getStringByField("type");
+            String source = tuple.getStringByField("source");
+            bulkRequest.add(client.prepareIndex(index, type).setSource(source));
+        }
+        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+        if (bulkResponse.hasFailures()) {
+            LOG.warn("failed processing bulk index requests " + bulkResponse.buildFailureMessage());
+            throw new FailedException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
new file mode 100644
index 0000000..d7f4330
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import backtype.storm.task.IMetricsContext;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class EsStateFactory implements StateFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(EsStateFactory.class);
+    private EsConfig esConfig;
+
+    public EsStateFactory(){
+
+    }
+
+    public EsStateFactory(EsConfig esConfig){
+        this.esConfig = esConfig;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        EsState esState = new EsState(esConfig);
+        esState.prepare(conf, metrics, partitionIndex, numPartitions);
+        return esState;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
new file mode 100644
index 0000000..6fa42f3
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class EsUpdater extends BaseStateUpdater<EsState> {
+    @Override
+    public void updateState(EsState state, List<TridentTuple> tuples, TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
new file mode 100644
index 0000000..fdf7cd4
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class AbstractEsBoltTest {
+    protected Config config = new Config();
+    protected OutputCollector collector = mock(OutputCollector.class);
+    protected Node node;
+
+    @Before
+    public void setup() throws Exception {
+        System.out.println("setup");
+        node = NodeBuilder.nodeBuilder().data(true).settings(
+                ImmutableSettings.builder()
+                        .put(ClusterName.SETTING, "test-cluster")
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+                        .put(EsExecutors.PROCESSORS, 1)
+                        .put("http.enabled", false)
+                        .put("index.percolator.map_unmapped_fields_as_string", true)
+                        .put("index.store.type", "memory")
+        ).build();
+        node.start();
+        ensureEsGreen(node);
+        ClusterHealthResponse chr = node.client().admin().cluster()
+                .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
+        System.out.println(chr.getStatus());
+        Thread.sleep(1000);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        System.out.println("cleanup");
+        node.stop();
+        node.close();
+    }
+
+    private void ensureEsGreen(Node node) {
+        ClusterHealthResponse chr = node.client().admin().cluster()
+                .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
+        assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
new file mode 100644
index 0000000..e66da19
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.Mockito.verify;
+
+public class EsIndexBoltTest extends AbstractEsBoltTest{
+    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
+    private EsIndexBolt bolt;
+
+    @Test
+    public void testEsIndexBolt()
+            throws Exception {
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName("test-cluster");
+        esConfig.setHost(new String[]{"127.0.0.1"});
+        esConfig.setPort(9300);
+        bolt = new EsIndexBolt(esConfig);
+        bolt.prepare(config, null, collector);
+        String index = "index1";
+        String type = "type1";
+        String source = "{\"user\":\"user1\"}";
+        Tuple tuple = EsTestUtil.generateTestTuple(index, type, source);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        bolt.cleanup();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
new file mode 100644
index 0000000..4a82c63
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EsIndexTopology {
+
+    static final String SPOUT_ID = "spout";
+    static final String BOLT_ID = "bolt";
+    static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+        config.setNumWorkers(1);
+        TopologyBuilder builder = new TopologyBuilder();
+        UserDataSpout spout = new UserDataSpout();
+        builder.setSpout(SPOUT_ID, spout, 1);
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName(EsConstants.clusterName);
+        esConfig.setHost(new String[]{"localhost"});
+        esConfig.setPort(9300);
+        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
+
+        EsTestUtil.startEsNode();
+        EsTestUtil.waitForSeconds(5);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
+        EsTestUtil.waitForSeconds(20);
+        cluster.killTopology(TOPOLOGY_NAME);
+        System.out.println("cluster begin to shutdown");
+        cluster.shutdown();
+        System.out.println("cluster shutdown");
+        System.exit(0);
+    }
+
+    public static class UserDataSpout extends BaseRichSpout {
+        private ConcurrentHashMap<UUID, Values> pending;
+        private SpoutOutputCollector collector;
+        private String[] sources = {
+                "{\"user\":\"user1\"}",
+                "{\"user\":\"user2\"}",
+                "{\"user\":\"user3\"}",
+                "{\"user\":\"user4\"}"
+        };
+        private int index = 0;
+        private int count = 0;
+        private long total = 0L;
+        private String indexName = "index1";
+        private String typeName = "type1";
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("index", "type", "source"));
+        }
+
+        public void open(Map config, TopologyContext context,
+                         SpoutOutputCollector collector) {
+            this.collector = collector;
+            this.pending = new ConcurrentHashMap<UUID, Values>();
+        }
+
+        public void nextTuple() {
+            String source = sources[index];
+            Values values = new Values(indexName, typeName, source);
+            UUID msgId = UUID.randomUUID();
+            this.pending.put(msgId, values);
+            this.collector.emit(values, msgId);
+            index++;
+            if (index >= sources.length) {
+                index = 0;
+            }
+            count++;
+            total++;
+            if (count > 1000) {
+                count = 0;
+                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
+            }
+            Thread.yield();
+        }
+
+        public void ack(Object msgId) {
+            this.pending.remove(msgId);
+        }
+
+        public void fail(Object msgId) {
+            System.out.println("**** RESENDING FAILED TUPLE");
+            this.collector.emit(this.pending.get(msgId), msgId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
new file mode 100644
index 0000000..1bd338f
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.Mockito.verify;
+
+public class EsPercolateBoltTest extends AbstractEsBoltTest {
+    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
+    private EsPercolateBolt bolt;
+
+    @Test
+    public void testEsPercolateBolt()
+            throws Exception {
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName("test-cluster");
+        esConfig.setHost(new String[]{"127.0.0.1"});
+        esConfig.setPort(9300);
+        bolt = new EsPercolateBolt(esConfig);
+        bolt.prepare(config, null, collector);
+        String index = "index1";
+        String type = ".percolator";
+        String source = "{\"user\":\"user1\"}";
+        node.client().prepareIndex("index1",".percolator")
+                .setId("1")
+                .setSource("{\"query\":{\"match\":{\"user\":\"user1\"}}}").
+                execute().actionGet();
+        Tuple tuple = EsTestUtil.generateTestTuple(index, type, source);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verify(collector).emit(new Values("1"));
+        bolt.cleanup();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConstants.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConstants.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConstants.java
new file mode 100644
index 0000000..98bb71d
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConstants.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+public class EsConstants {
+    public static String clusterName = "test-cluster";
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
new file mode 100644
index 0000000..2c0026d
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.util.HashMap;
+
+public class EsTestUtil {
+    public static Tuple generateTestTuple(String index, String type, String source) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
+                new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String streamId) {
+                return new Fields("index", "type", "source");
+            }
+        };
+        return new TupleImpl(topologyContext, new Values(index, type, source), 1, "");
+    }
+
+    public static Node startEsNode(){
+        Node node = NodeBuilder.nodeBuilder().data(true).settings(
+                ImmutableSettings.builder()
+                        .put(ClusterName.SETTING, EsConstants.clusterName)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+                        .put(EsExecutors.PROCESSORS, 1)
+                        .put("http.enabled", false)
+                        .put("index.percolator.map_unmapped_fields_as_string", true)
+                        .put("index.store.type", "memory")
+        ).build();
+        node.start();
+        return node;
+    }
+
+    public static void waitForSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
new file mode 100644
index 0000000..b1e62ff
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsConstants;
+import org.apache.storm.elasticsearch.common.EsTestUtil;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+import storm.trident.state.StateFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TridentEsTopology {
+
+    static final String TOPOLOGY_NAME = "elasticsearch-test-topology2";
+
+    public static void main(String[] args) {
+        int batchSize = 100;
+        FixedBatchSpout spout = new FixedBatchSpout(batchSize);
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout", spout);
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName(EsConstants.clusterName);
+        esConfig.setHost(new String[]{"localhost"});
+        esConfig.setPort(9300);
+        Fields esFields = new Fields("index", "type", "source");
+        StateFactory factory = new EsStateFactory(esConfig);
+        TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+
+        EsTestUtil.startEsNode();
+        EsTestUtil.waitForSeconds(5);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(TOPOLOGY_NAME, null, topology.build());
+        EsTestUtil.waitForSeconds(20);
+        cluster.killTopology(TOPOLOGY_NAME);
+        System.out.println("cluster begin to shutdown");
+        cluster.shutdown();
+        System.out.println("cluster shutdown");
+        System.exit(0);
+    }
+
+    public static class FixedBatchSpout implements IBatchSpout {
+        int maxBatchSize;
+        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+        private Values[] outputs = {
+                new Values("index1", "type1", "{\"user\":\"user1\"}"),
+                new Values("index1", "type2", "{\"user\":\"user2\"}"),
+                new Values("index2", "type1", "{\"user\":\"user3\"}"),
+                new Values("index2", "type2", "{\"user\":\"user4\"}")
+        };
+        private int index = 0;
+        boolean cycle = false;
+
+        public FixedBatchSpout(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+        }
+
+        public void setCycle(boolean cycle) {
+            this.cycle = cycle;
+        }
+
+        @Override
+        public Fields getOutputFields() {
+            return new Fields("index", "type", "source");
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context) {
+            index = 0;
+        }
+
+        @Override
+        public void emitBatch(long batchId, TridentCollector collector) {
+            List<List<Object>> batch = this.batches.get(batchId);
+            if (batch == null) {
+                batch = new ArrayList<List<Object>>();
+                if (index >= outputs.length && cycle) {
+                    index = 0;
+                }
+                for (int i = 0; i < maxBatchSize; index++, i++) {
+                    if (index == outputs.length) {
+                        index = 0;
+                    }
+                    batch.add(outputs[index]);
+                }
+                this.batches.put(batchId, batch);
+            }
+            for (List<Object> list : batch) {
+                collector.emit(list);
+            }
+        }
+
+        @Override
+        public void ack(long batchId) {
+            this.batches.remove(batchId);
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public Map getComponentConfiguration() {
+            Config conf = new Config();
+            conf.setMaxTaskParallelism(1);
+            return conf;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bd65f04..cb00783 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
         <module>external/storm-redis</module>
         <module>external/storm-eventhubs</module>
         <module>external/flux</module>
+        <module>external/storm-elasticsearch</module>
     </modules>
 
     <scm>

http://git-wip-us.apache.org/repos/asf/storm/blob/6a446cea/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 21d802a..bb77a0a 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -190,6 +190,20 @@
             </includes>
             <fileMode>0644</fileMode>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-elasticsearch/target</directory>
+            <outputDirectory>external/storm-elasticsearch</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-elasticsearch</directory>
+            <outputDirectory>external/storm-elasticsearch</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
         <fileSet>
             <directory>${project.basedir}/../../external/flux/flux-core/target</directory>


Mime
View raw message