storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-1919 Introduce FilterBolt on storm-redis
Date Fri, 08 Jul 2016 08:12:04 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 89d590a64 -> f8e57535d


STORM-1919 Introduce FilterBolt on storm-redis

* introduce RedisFilterBolt, and relevant class (RedisFilterMapper)
* add example topology: WhitelistWordCount
* update how to use to README.md
* also correct some javadocs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29f54fa8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29f54fa8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29f54fa8

Branch: refs/heads/1.x-branch
Commit: 29f54fa8b5a829076ef49f9921bcc094fc242a61
Parents: 89d590a
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Fri Jun 24 20:50:49 2016 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Fri Jul 8 17:11:08 2016 +0900

----------------------------------------------------------------------
 external/storm-redis/README.md                  |  57 ++++++-
 .../storm/redis/bolt/RedisFilterBolt.java       | 148 ++++++++++++++++++
 .../storm/redis/bolt/RedisLookupBolt.java       |   2 +-
 .../apache/storm/redis/bolt/RedisStoreBolt.java |   2 +-
 .../redis/common/mapper/RedisFilterMapper.java  |  32 ++++
 .../redis/common/mapper/RedisLookupMapper.java  |   2 +-
 .../redis/topology/WhitelistWordCount.java      | 155 +++++++++++++++++++
 7 files changed, 390 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/29f54fa8/external/storm-redis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-redis/README.md b/external/storm-redis/README.md
index f22a833..2c6c950 100644
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@ -21,13 +21,16 @@ use it as a maven dependency:
 
 ### For normal Bolt
 
-Storm-redis provides basic Bolt implementations, ```RedisLookupBolt``` and ```RedisStoreBolt```.
+Storm-redis provides basic Bolt implementations, ```RedisLookupBolt``` and ```RedisStoreBolt```,
and ```RedisFilterBolt```.
 
-As name represents its usage, ```RedisLookupBolt``` retrieves value from Redis using key,
and ```RedisStoreBolt``` stores key / value to Redis. One tuple will be matched to one key
/ value pair, and you can define match pattern to ```TupleMapper```.
+As name represents its usage, ```RedisLookupBolt``` retrieves value from Redis using key,
and ```RedisStoreBolt``` stores key / value to Redis, and ```RedisFilterBolt``` filters out
tuple which key or field doesn't exist on Redis.
 
-You can also choose data type from ```RedisDataTypeDescription``` to use. Please refer ```RedisDataTypeDescription.RedisDataType```
to see what data types are supported. In some data types (hash and sorted set), it requires
additional key and converted key from tuple becomes element.
+One tuple will be matched to one key / value pair, and you can define match pattern to ```TupleMapper```.
 
-These interfaces are combined with ```RedisLookupMapper``` and ```RedisStoreMapper``` which
fit ```RedisLookupBolt``` and ```RedisStoreBolt``` respectively.
+You can also choose data type from ```RedisDataTypeDescription``` to use. Please refer ```RedisDataTypeDescription.RedisDataType```
to see what data types are supported. In some data types (hash and sorted set, and set if
only RedisFilterBolt), it requires additional key and converted key from tuple becomes element.
+
+These interfaces are combined with ```RedisLookupMapper``` and ```RedisStoreMapper``` and
```RedisFilterMapper``` which fit ```RedisLookupBolt``` and ```RedisStoreBolt```, and ```RedisFilterBolt```
respectively.
+(When you want to implement RedisFilterMapper, be sure to set declareOutputFields() to declare
same fields to input stream, since FilterBolt forwards input tuples when they exist on Redis.)
  
 
 #### RedisLookupBolt example
 
@@ -81,6 +84,50 @@ RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
 RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
 ```
 
+#### RedisFilterBolt example
+
+```java
+
+class BlacklistWordFilterMapper implements RedisFilterMapper {
+    private RedisDataTypeDescription description;
+    private final String setKey = "blacklist";
+
+    public BlacklistWordFilterMapper() {
+        description = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.SET, setKey);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return description;
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return tuple.getStringByField("word");
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return null;
+    }
+}
+
+```
+
+```java
+
+JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+        .setHost(host).setPort(port).build();
+RedisFilterMapper filterMapper = new BlacklistWordFilterMapper();
+RedisFilterBolt filterBolt = new RedisFilterBolt(poolConfig, filterMapper);
+```
+
 #### RedisStoreBolt example
 
 ```java
@@ -121,7 +168,7 @@ RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
 
 ### For non-simple Bolt
 
-If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt```, storm-redis
also provides ```AbstractRedisBolt``` to let you extend and apply your business logic.
+If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt``` and ```RedisFilterBolt```,
storm-redis also provides ```AbstractRedisBolt``` to let you extend and apply your business
logic.
 
 ```java
 

http://git-wip-us.apache.org/repos/asf/storm/blob/29f54fa8/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
new file mode 100644
index 0000000..4c858bc
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisFilterMapper;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import redis.clients.jedis.GeoCoordinate;
+import redis.clients.jedis.JedisCommands;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from Redis and filters out if key/field doesn't exist.
+ * If key/field exists on Redis, this bolt just forwards input tuple to default stream.
+ * <p/>
+ * Supported data types: STRING, HASH, SET, SORTED_SET, HYPER_LOG_LOG, GEO.
+ * <p/>
+ * Note: For STRING it checks such key exists on the key space.
+ * For HASH and SORTED_SET and GEO, it checks such field exists on that data structure.
+ * For SET and HYPER_LOG_LOG, it check such value exists on that data structure.
+ * (Note that it still refers key from tuple via RedisFilterMapper#getKeyFromTuple())
+ * In order to apply checking this to SET, you need to input additional key this case.
+ * <p/>
+ * Note2: If you want to just query about existence of key regardless of actual data type,
+ * specify STRING to data type of RedisFilterMapper.
+ */
+public class RedisFilterBolt extends AbstractRedisBolt {
+    private final RedisFilterMapper filterMapper;
+    private final RedisDataTypeDescription.RedisDataType dataType;
+    private final String additionalKey;
+
+    /**
+     * Constructor for single Redis environment (JedisPool)
+     * @param config configuration for initializing JedisPool
+     * @param filterMapper mapper containing which datatype, query key that Bolt uses
+     */
+    public RedisFilterBolt(JedisPoolConfig config, RedisFilterMapper filterMapper) {
+        super(config);
+
+        this.filterMapper = filterMapper;
+
+        RedisDataTypeDescription dataTypeDescription = filterMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+
+        if (dataType == RedisDataTypeDescription.RedisDataType.SET &&
+            additionalKey == null) {
+            throw new IllegalArgumentException("additionalKey should be defined");
+        }
+    }
+
+    /**
+     * Constructor for Redis Cluster environment (JedisCluster)
+     * @param config configuration for initializing JedisCluster
+     * @param filterMapper mapper containing which datatype, query key that Bolt uses
+     */
+    public RedisFilterBolt(JedisClusterConfig config, RedisFilterMapper filterMapper) {
+        super(config);
+
+        this.filterMapper = filterMapper;
+
+        RedisDataTypeDescription dataTypeDescription = filterMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void execute(Tuple input) {
+        String key = filterMapper.getKeyFromTuple(input);
+
+        boolean found;
+        JedisCommands jedisCommand = null;
+        try {
+            jedisCommand = getInstance();
+
+            switch (dataType) {
+                case STRING:
+                    found = jedisCommand.exists(key);
+                    break;
+
+                case SET:
+                    found = jedisCommand.sismember(additionalKey, key);
+                    break;
+
+                case HASH:
+                    found = jedisCommand.hexists(additionalKey, key);
+                    break;
+
+                case SORTED_SET:
+                    found = jedisCommand.zrank(additionalKey, key) != null;
+                    break;
+
+                case HYPER_LOG_LOG:
+                    found = jedisCommand.pfcount(key) > 0;
+                    break;
+
+                case GEO:
+                    List<GeoCoordinate> geopos = jedisCommand.geopos(additionalKey,
key);
+                    found = (geopos != null && geopos.size() > 0);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: "
+ dataType);
+            }
+
+            if (found) {
+                collector.emit(input, input.getValues());
+            }
+
+            collector.ack(input);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(input);
+        } finally {
+            returnInstance(jedisCommand);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        filterMapper.declareOutputFields(declarer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/29f54fa8/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
index 968ade0..0652923 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -31,7 +31,7 @@ import java.util.List;
 /**
  * Basic bolt for querying from Redis and emits response as tuple.
  * <p/>
- * Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG
+ * Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG,
GEO
  */
 public class RedisLookupBolt extends AbstractRedisBolt {
     private final RedisLookupMapper lookupMapper;

http://git-wip-us.apache.org/repos/asf/storm/blob/29f54fa8/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
index 00ff218..022f834 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -28,7 +28,7 @@ import redis.clients.jedis.JedisCommands;
 /**
  * Basic bolt for writing to Redis
  * <p/>
- * Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG
+ * Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG,
GEO
  */
 public class RedisStoreBolt extends AbstractRedisBolt {
     private final RedisStoreMapper storeMapper;

http://git-wip-us.apache.org/repos/asf/storm/blob/29f54fa8/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisFilterMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisFilterMapper.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisFilterMapper.java
new file mode 100644
index 0000000..b112046
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisFilterMapper.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+
+/**
+ * RedisFilterMapper is for defining spec. which is used for querying value from Redis and
filtering.
+ */
+public interface RedisFilterMapper extends TupleMapper, RedisMapper {
+
+    /**
+     * declare what are the fields that this code will output.
+     * @param declarer OutputFieldsDeclarer
+     */
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/29f54fa8/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
index fe464f5..8c9ebf4 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
@@ -24,7 +24,7 @@ import org.apache.storm.tuple.Values;
 import java.util.List;
 
 /**
- * RedisStoreMapper is for defining spec. which is used for querying value from Redis and
converting response to tuple.
+ * RedisLookupMapper is for defining spec. which is used for querying value from Redis and
converting response to tuple.
  */
 public interface RedisLookupMapper extends TupleMapper, RedisMapper {
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/29f54fa8/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WhitelistWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WhitelistWordCount.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WhitelistWordCount.java
new file mode 100644
index 0000000..bcb2e0b
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WhitelistWordCount.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.redis.bolt.RedisFilterBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisFilterMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+
+public class WhitelistWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String WHITELIST_BOLT = "WHITELIST_BOLT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String PRINT_BOLT = "PRINT_BOLT";
+
+    private static final String TEST_REDIS_HOST = "127.0.0.1";
+    private static final int TEST_REDIS_PORT = 6379;
+
+    public static class PrintWordTotalCountBolt extends BaseRichBolt {
+        private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
+        private static final Random RANDOM = new Random();
+        private OutputCollector collector;
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            String wordName = input.getStringByField("word");
+            String countStr = input.getStringByField("count");
+
+            // print lookup result with low probability
+            if(RANDOM.nextInt(1000) > 995) {
+                int count = 0;
+                if (countStr != null) {
+                    count = Integer.parseInt(countStr);
+                }
+                LOG.info("Count result - word : " + wordName + " / count : " + count);
+            }
+
+            collector.ack(input);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        String host = TEST_REDIS_HOST;
+        int port = TEST_REDIS_PORT;
+
+        if (args.length >= 2) {
+            host = args[0];
+            port = Integer.parseInt(args[1]);
+        }
+
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                .setHost(host).setPort(port).build();
+
+        WordSpout spout = new WordSpout();
+        RedisFilterMapper filterMapper = setupWhitelistMapper();
+        RedisFilterBolt whitelistBolt = new RedisFilterBolt(poolConfig, filterMapper);
+        WordCounter wordCounterBolt = new WordCounter();
+        PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(WHITELIST_BOLT, whitelistBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(COUNT_BOLT, wordCounterBolt, 1).fieldsGrouping(WHITELIST_BOLT, new
Fields("word"));
+        builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(COUNT_BOLT);
+
+        if (args.length == 2) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 3) {
+            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
+        } else{
+            System.out.println("Usage: WhitelistWordCount <redis host> <redis port>
(topology name)");
+        }
+    }
+
+    private static RedisFilterMapper setupWhitelistMapper() {
+        return new WhitelistWordFilterMapper();
+    }
+
+    private static class WhitelistWordFilterMapper implements RedisFilterMapper {
+        private RedisDataTypeDescription description;
+        private final String setKey = "whitelist";
+
+        public WhitelistWordFilterMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.SET, setKey);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return null;
+        }
+    }
+}


Mime
View raw message