storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [07/11] storm git commit: STORM-845 Storm ElasticSearch connector
Date Wed, 05 Aug 2015 22:58:52 GMT
STORM-845 Storm ElasticSearch connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77986c62
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77986c62
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77986c62

Branch: refs/heads/master
Commit: 77986c62870d72cf712fc835e1f19c6844a89ed8
Parents: acaab9c
Author: SEUNGJIN LEE <sweetest.sj@navercorp.com>
Authored: Mon Jul 27 14:23:10 2015 +0900
Committer: SEUNGJIN LEE <sweetest.sj@navercorp.com>
Committed: Mon Jul 27 14:23:10 2015 +0900

----------------------------------------------------------------------
 external/storm-elasticsearch/README.md                      | 9 ++++++---
 .../org/apache/storm/elasticsearch/bolt/EsIndexBolt.java    | 3 ---
 .../apache/storm/elasticsearch/bolt/EsPercolateBolt.java    | 7 ++-----
 .../apache/storm/elasticsearch/trident/EsStateFactory.java  | 1 -
 .../storm/elasticsearch/bolt/EsPercolateBoltTest.java       | 4 +++-
 5 files changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 9e1bbb8..c0b9b57 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -1,7 +1,7 @@
 # Storm Elasticsearch Bolt & Trident State
 
   EsIndexBolt, EsPercolateBolt and EsState allows users to stream data from storm into Elasticsearch
directly.
-  For detailed description, please refer to the following.   
+  For detailed description, please refer to the following.
 
 ## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
 
@@ -31,7 +31,10 @@ esConfig.setNodes(new String[]{"localhost:9300"});
 EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
 ```
 
-### EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
+If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original
source and Percolate.Match
+for each Percolate.Match in PercolateResponse.
+
+## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
   
 Two bolts above takes in EsConfig as a constructor arg.
 
@@ -41,7 +44,7 @@ Two bolts above takes in EsConfig as a constructor arg.
    esConfig.setNodes(new String[]{"localhost:9300"});
   ```
 
-EsConfig params
+### EsConfig params
 
 |Arg  |Description | Type
 |---	|--- |---

http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index c1d7daa..0d5cff8 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 public class EsIndexBolt extends AbstractEsBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBolt.class);
 
     /**
      * EsIndexBolt constructor
@@ -58,8 +57,6 @@ public class EsIndexBolt extends AbstractEsBolt {
             client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
             collector.ack(tuple);
         } catch (Exception e) {
-            e.printStackTrace();
-            System.out.println(e);
             collector.reportError(e);
             collector.fail(tuple);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 7ee6835..394462e 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 public class EsPercolateBolt extends AbstractEsBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(EsPercolateBolt.class);
 
     /**
      * EsPercolateBolt constructor
@@ -62,13 +61,11 @@ public class EsPercolateBolt extends AbstractEsBolt {
                     .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
             if (response.getCount() > 0) {
                 for (PercolateResponse.Match match : response) {
-                    String id = match.getId().toString();
-                    collector.emit(new Values(id));
+                    collector.emit(new Values(source, match));
                 }
             }
             collector.ack(tuple);
         } catch (Exception e) {
-            e.printStackTrace();
             collector.reportError(e);
             collector.fail(tuple);
         }
@@ -76,6 +73,6 @@ public class EsPercolateBolt extends AbstractEsBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-        outputFieldsDeclarer.declare(new Fields("id"));
+        outputFieldsDeclarer.declare(new Fields("source", "match"));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index b1eaa04..c3a2e6c 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -27,7 +27,6 @@ import storm.trident.state.StateFactory;
 import java.util.Map;
 
 public class EsStateFactory implements StateFactory {
-    private static final Logger LOG = LoggerFactory.getLogger(EsStateFactory.class);
     private EsConfig esConfig;
 
     public EsStateFactory(){

http://git-wip-us.apache.org/repos/asf/storm/blob/77986c62/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index ea0504b..fd4fa4f 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -22,6 +22,7 @@ import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
 import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.any;
 
 public class EsPercolateBoltTest extends AbstractEsBoltTest {
     private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
@@ -56,7 +58,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
         bolt.execute(tuple);
 
         verify(collector).ack(tuple);
-        verify(collector).emit(new Values("1"));
+        verify(collector).emit(new Values(source, any(PercolateResponse.Match.class)));
 
         bolt.cleanup();
     }


Mime
View raw message