storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2387 Handle tick tuples properly for Bolts in external modules
Date Thu, 09 Mar 2017 06:05:03 GMT
Repository: storm
Updated Branches:
  refs/heads/master e44498559 -> e4f05c219


STORM-2387 Handle tick tuples properly for Bolts in external modules

* introduce BaseTickTupleAwareRichBolt which makes logic branches for tick tuple and non-tick
tuple
* apply BaseTickTupleAwareRichBolt to Bolts in external modules
* also remove ack for tick tuples since it's not necessary


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/26cc6381
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/26cc6381
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/26cc6381

Branch: refs/heads/master
Commit: 26cc63813f35f4378934a5e5142f44858228c7fc
Parents: f0bfe0d
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Tue Feb 28 17:19:58 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Thu Mar 2 17:06:27 2017 +0900

----------------------------------------------------------------------
 .../storm/cassandra/bolt/BaseCassandraBolt.java | 24 ++------
 .../bolt/BatchCassandraWriterBolt.java          |  2 +-
 .../cassandra/bolt/CassandraWriterBolt.java     |  7 ---
 .../apache/storm/druid/bolt/DruidBeamBolt.java  | 47 +++++++--------
 .../elasticsearch/bolt/AbstractEsBolt.java      |  7 +--
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |  3 +-
 .../storm/elasticsearch/bolt/EsLookupBolt.java  |  3 +-
 .../elasticsearch/bolt/EsPercolateBolt.java     |  3 +-
 .../storm/eventhubs/bolt/EventHubBolt.java      |  6 +-
 .../eventhubs/samples/bolt/GlobalCountBolt.java |  5 ++
 .../samples/bolt/PartialCountBolt.java          |  5 ++
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  5 +-
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |  3 +-
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |  3 +-
 .../java/org/apache/storm/jms/bolt/JmsBolt.java |  6 +-
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  |  9 +--
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  |  9 +--
 .../storm/mongodb/bolt/MongoLookupBolt.java     |  5 ++
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |  5 ++
 .../org/apache/storm/mqtt/bolt/MqttBolt.java    | 26 ++++-----
 .../apache/storm/pmml/PMMLPredictorBolt.java    |  6 +-
 .../storm/redis/bolt/AbstractRedisBolt.java     |  5 +-
 .../storm/redis/bolt/RedisFilterBolt.java       |  2 +-
 .../storm/redis/bolt/RedisLookupBolt.java       |  2 +-
 .../apache/storm/redis/bolt/RedisStoreBolt.java |  2 +-
 .../apache/storm/solr/bolt/SolrUpdateBolt.java  | 11 ++--
 .../base/BaseTickTupleAwareRichBolt.java        | 60 ++++++++++++++++++++
 27 files changed, 168 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
index a1169dc..ea7750f 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
@@ -36,6 +36,7 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
@@ -49,9 +50,9 @@ import java.util.Map;
 /**
  * A base cassandra bolt.
  *
- * Default {@link org.apache.storm.topology.base.BaseRichBolt}
+ * Default {@link org.apache.storm.topology.base.BaseTickTupleAwareRichBolt}
  */
-public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
+public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt {
 
     private static final Logger LOG = LoggerFactory.getLogger(BaseCassandraBolt.class);
 
@@ -171,27 +172,10 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt
{
     @Override
     public final void execute(Tuple input) {
         getAsyncHandler().flush(outputCollector);
-        if (TupleUtils.isTick(input)) {
-            onTickTuple();
-            outputCollector.ack(input);
-        } else {
-            process(input);
-        }
+        super.execute(input);
     }
 
     /**
-     * Process a single tuple of input.
-     *
-     * @param input The input tuple to be processed.
-     */
-    abstract protected void process(Tuple input);
-
-    /**
-     * Calls by an input tick tuple.
-     */
-    abstract protected void onTickTuple();
-
-    /**
      * {@inheritDoc}
      */
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
index e575655..e3a7e53 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
@@ -106,7 +106,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>>
{
      * {@inheritDoc}
      */
     @Override
-    protected void onTickTuple() {
+    protected void onTickTuple(Tuple tuple) {
         prepareAndExecuteStatement();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
index 3d1229e..f2eef9c 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
@@ -59,13 +59,6 @@ public class CassandraWriterBolt extends BaseCassandraBolt<Tuple>
{
         if (statements.size() == 1) getAsyncExecutor().execAsync(statements.get(0), input);
         else getAsyncExecutor().execAsync(statements, input);
     }
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    protected void onTickTuple() {
-        /** do nothing **/
-    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
index cd87aff..86b724f 100644
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
@@ -26,9 +26,11 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +49,7 @@ import java.util.Map;
  * <p/>
  *
  */
-public class DruidBeamBolt<E> extends BaseRichBolt {
+public class DruidBeamBolt<E> extends BaseTickTupleAwareRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
 
     private volatile  OutputCollector collector;
@@ -75,31 +77,30 @@ public class DruidBeamBolt<E> extends BaseRichBolt {
     }
 
     @Override
-    public void execute(final Tuple tuple) {
-      Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
-        LOG.debug("Sent tuple : [{}]" , tuple);
+    protected void process(final Tuple tuple) {
+        Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
+        LOG.debug("Sent tuple : [{}]", tuple);
 
         future.addEventListener(new FutureEventListener() {
-          @Override
-          public void onFailure(Throwable cause) {
-              if (cause instanceof MessageDroppedException) {
-                  collector.ack(tuple);
-                  LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]" , tuple);
-                  if (druidConfig.getDiscardStreamId() != null)
-                      collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple,
System.currentTimeMillis()));
-              }
-              else {
-                  collector.fail(tuple);
-                  LOG.debug("Tuple Processing Failed : [{}]" , tuple);
-              }
-          }
+            @Override
+            public void onFailure(Throwable cause) {
+                if (cause instanceof MessageDroppedException) {
+                    collector.ack(tuple);
+                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", tuple);
+                    if (druidConfig.getDiscardStreamId() != null)
+                        collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple,
System.currentTimeMillis()));
+                } else {
+                    collector.fail(tuple);
+                    LOG.debug("Tuple Processing Failed : [{}]", tuple);
+                }
+            }
 
-          @Override
-          public void onSuccess(Object value) {
-              collector.ack(tuple);
-              LOG.debug("Tuple Processing Success : [{}]" , tuple);
-          }
-      });
+            @Override
+            public void onSuccess(Object value) {
+                collector.ack(tuple);
+                LOG.debug("Tuple Processing Success : [{}]", tuple);
+            }
+        });
 
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index 1ea80ad..b53e183 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -21,6 +21,8 @@ import java.util.Map;
 
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.utils.TupleUtils;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -34,7 +36,7 @@ import org.apache.storm.tuple.Tuple;
 
 import static org.elasticsearch.common.base.Preconditions.checkNotNull;
 
-public abstract class AbstractEsBolt extends BaseRichBolt {
+public abstract class AbstractEsBolt extends BaseTickTupleAwareRichBolt {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
 
@@ -63,9 +65,6 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
     }
 
     @Override
-    public abstract void execute(Tuple tuple);
-
-    @Override
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index f79d38d..5d66eb9 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -23,6 +23,7 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.utils.TupleUtils;
 
 import java.util.Map;
 
@@ -54,7 +55,7 @@ public class EsIndexBolt extends AbstractEsBolt {
      * Tuple should have relevant fields (source, index, type, id) for tupleMapper to extract
ES document.
      */
     @Override
-    public void execute(Tuple tuple) {
+    public void process(Tuple tuple) {
         try {
             String source = tupleMapper.getSource(tuple);
             String index = tupleMapper.getIndex(tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index 0cc2c79..895e30b 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import org.apache.storm.elasticsearch.ElasticsearchGetRequest;
 import org.apache.storm.elasticsearch.EsLookupResultOutput;
 import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.utils.TupleUtils;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 
@@ -51,7 +52,7 @@ public class EsLookupBolt extends AbstractEsBolt {
     }
 
     @Override
-    public void execute(Tuple tuple) {
+    public void process(Tuple tuple) {
         try {
             Collection<Values> values = lookupValuesInEs(tuple);
             tryEmitAndAck(values, tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index ad8f3f0..0b96dfc 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -25,6 +25,7 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.utils.TupleUtils;
 import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.action.percolate.PercolateSourceBuilder;
 
@@ -61,7 +62,7 @@ public class EsPercolateBolt extends AbstractEsBolt {
      * and Percolate.Match for each Percolate.Match in PercolateResponse.
      */
     @Override
-    public void execute(Tuple tuple) {
+    public void process(Tuple tuple) {
         try {
             String source = tupleMapper.getSource(tuple);
             String index = tupleMapper.getIndex(tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index 3d64cc5..604c62d 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -19,6 +19,8 @@ package org.apache.storm.eventhubs.bolt;
 
 import java.util.Map;
 
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +37,7 @@ import org.apache.storm.tuple.Tuple;
 /**
  * A bolt that writes event message to EventHub.
  */
-public class EventHubBolt extends BaseRichBolt {
+public class EventHubBolt extends BaseTickTupleAwareRichBolt {
 	private static final long serialVersionUID = 1L;
 	private static final Logger logger = LoggerFactory
 			.getLogger(EventHubBolt.class);
@@ -82,7 +84,7 @@ public class EventHubBolt extends BaseRichBolt {
 	}
 
 	@Override
-	public void execute(Tuple tuple) {
+	protected void process(Tuple tuple) {
 		try {
 			sender.send(boltConfig.getEventDataFormat().serialize(tuple));
 			collector.ack(tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
index 6a34788..bc9219e 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -20,6 +20,7 @@ package org.apache.storm.eventhubs.samples.bolt;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +67,10 @@ public class GlobalCountBolt extends BaseBasicBolt {
 
   @Override
   public void execute(Tuple tuple, BasicOutputCollector collector) {
+    if (TupleUtils.isTick(tuple)) {
+      return;
+    }
+
     int partial = (Integer)tuple.getValueByField("partial_count");
     globalCount += partial;
     globalCountDiff += partial;

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
index 215f8f7..eaf2b65 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -19,6 +19,7 @@ package org.apache.storm.eventhubs.samples.bolt;
 
 import java.util.Map;
 
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +49,10 @@ public class PartialCountBolt extends BaseBasicBolt {
   
   @Override
   public void execute(Tuple tuple, BasicOutputCollector collector) {
+    if (TupleUtils.isTick(tuple)) {
+      return;
+    }
+
     partialCount++;
     if(partialCount == PartialCountBatchSize) {
       collector.emit(new Values(PartialCountBatchSize));

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index a09c73c..9cfbb4e 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -24,12 +24,15 @@ import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.commons.lang.Validate;
 import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.common.JdbcClient;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-public abstract class AbstractJdbcBolt extends BaseRichBolt {
+public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcBolt.class);
 
     protected OutputCollector collector;

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
index b93a1eb..dafef21 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,7 +82,7 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
     }
 
     @Override
-    public void execute(Tuple tuple) {
+    protected void process(Tuple tuple) {
         try {
             List<Column> columns = jdbcMapper.getColumns(tuple);
             List<List<Column>> columnLists = new ArrayList<List<Column>>();

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 7224786..9589edd 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.Validate;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +56,7 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
     }
 
     @Override
-    public void execute(Tuple tuple) {
+    protected void process(Tuple tuple) {
         try {
             List<Column> columns = jdbcLookupMapper.getColumns(tuple);
             List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
b/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
index 791b11f..d691e75 100644
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
+++ b/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
@@ -30,6 +30,8 @@ import javax.jms.Session;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.jms.JmsMessageProducer;
 import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +62,7 @@ import org.apache.storm.tuple.Tuple;
  * The JmsBolt is typically an endpoint in a topology -- in other words
  * it does not emit any tuples.
  */
-public class JmsBolt extends BaseRichBolt {
+public class JmsBolt extends BaseTickTupleAwareRichBolt {
     private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
 
     private boolean autoAck = true;
@@ -147,7 +149,7 @@ public class JmsBolt extends BaseRichBolt {
      * If JMS sending fails, the tuple will be failed.
      */
     @Override
-    public void execute(Tuple input) {
+    protected void process(Tuple input) {
         // write the tuple to a JMS destination...
         LOG.debug("Tuple received. Sending JMS message.");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
index 84d3334..30f97a0 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -21,6 +21,7 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -51,7 +52,7 @@ import java.util.Properties;
  * <p/>
  * respectively.
  */
-public class KafkaBolt<K, V> extends BaseRichBolt {
+public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
     private static final long serialVersionUID = -5205886631877033478L;
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
@@ -130,11 +131,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     }
 
     @Override
-    public void execute(final Tuple input) {
-        if (TupleUtils.isTick(input)) {
-          collector.ack(input);
-          return; // Do not try to send ticks to Kafka
-        }
+    protected void process(final Tuple input) {
         K key = null;
         V message = null;
         String topic = null;

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
index b5bb124..0fff78a 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -21,6 +21,7 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -54,7 +55,7 @@ import java.util.Properties;
  * @deprecated Please use the KafkaBolt in storm-kafka-client
  */
 @Deprecated
-public class KafkaBolt<K, V> extends BaseRichBolt {
+public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
 
@@ -112,11 +113,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     }
 
     @Override
-    public void execute(final Tuple input) {
-        if (TupleUtils.isTick(input)) {
-          collector.ack(input);
-          return; // Do not try to send ticks to Kafka
-        }
+    protected void process(final Tuple input) {
         K key = null;
         V message = null;
         String topic = null;

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
index 909826d..d8f73dc 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
@@ -23,6 +23,7 @@ import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
@@ -51,6 +52,10 @@ public class MongoLookupBolt extends AbstractMongoBolt {
 
     @Override
     public void execute(Tuple tuple) {
+        if (TupleUtils.isTick(tuple)) {
+            return;
+        }
+
         try{
             //get query filter
             Bson filter = queryCreator.createFilter(tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 9fb6c56..3c67604 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -22,6 +22,7 @@ import org.apache.storm.mongodb.common.QueryFilterCreator;
 import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
@@ -51,6 +52,10 @@ public class MongoUpdateBolt extends AbstractMongoBolt {
 
     @Override
     public void execute(Tuple tuple) {
+        if (TupleUtils.isTick(tuple)) {
+            return;
+        }
+
         try{
             //get document
             Document doc = mapper.toDocument(tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
index 98324ee..f6ca1bf 100644
--- a/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
+++ b/external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
@@ -28,6 +28,7 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 
-public class MqttBolt extends BaseRichBolt {
+public class MqttBolt extends BaseTickTupleAwareRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class);
     private MqttTupleMapper mapper;
     private transient MqttPublisher publisher;
@@ -84,19 +85,16 @@ public class MqttBolt extends BaseRichBolt {
     }
 
     @Override
-    public void execute(Tuple input) {
-        //ignore tick tuples
-        if(!TupleUtils.isTick(input)){
-            MqttMessage message = this.mapper.toMessage(input);
-            try {
-                this.publisher.publish(message);
-                this.collector.ack(input);
-            } catch (Exception e) {
-                LOG.warn("Error publishing MQTT message. Failing tuple.", e);
-                // should we fail the tuple or kill the worker?
-                collector.reportError(e);
-                collector.fail(input);
-            }
+    protected void process(Tuple input) {
+        MqttMessage message = this.mapper.toMessage(input);
+        try {
+            this.publisher.publish(message);
+            this.collector.ack(input);
+        } catch (Exception e) {
+            LOG.warn("Error publishing MQTT message. Failing tuple.", e);
+            // should we fail the tuple or kill the worker?
+            collector.reportError(e);
+            collector.fail(input);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java
b/external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java
index 551cabf..2c681b4 100644
--- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java
+++ b/external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java
@@ -25,15 +25,17 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 
-public class PMMLPredictorBolt extends BaseRichBolt {
+public class PMMLPredictorBolt extends BaseTickTupleAwareRichBolt {
     protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
 
     private final ModelOutputs outputs;
@@ -65,7 +67,7 @@ public class PMMLPredictorBolt extends BaseRichBolt {
     }
 
     @Override
-    public void execute(Tuple input) {
+    protected void process(Tuple input) {
         try {
             final Map<String, List<Object>> scoresPerStream = runner.scoredTuplePerStream(input);
             LOG.debug("Input tuple [{}] generated predicted scores [{}]", input, scoresPerStream);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index e569ac9..8c9310c 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -24,6 +24,9 @@ import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
 import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
 import redis.clients.jedis.JedisCommands;
 
 import java.util.Map;
@@ -48,7 +51,7 @@ import java.util.Map;
  *
  */
 // TODO: Separate Jedis / JedisCluster to provide full operations for each environment to
users
-public abstract class AbstractRedisBolt extends BaseRichBolt {
+public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
     protected OutputCollector collector;
 
     private transient JedisCommandsInstanceContainer container;

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
index 4c858bc..c7ba705 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
@@ -87,7 +87,7 @@ public class RedisFilterBolt extends AbstractRedisBolt {
      * {@inheritDoc}
      */
     @Override
-    public void execute(Tuple input) {
+    public void process(Tuple input) {
         String key = filterMapper.getKeyFromTuple(input);
 
         boolean found;

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
index 0652923..0a4fde1 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -72,7 +72,7 @@ public class RedisLookupBolt extends AbstractRedisBolt {
      * {@inheritDoc}
      */
     @Override
-    public void execute(Tuple input) {
+    public void process(Tuple input) {
         String key = lookupMapper.getKeyFromTuple(input);
         Object lookupValue;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
index 022f834..7a910ce 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -67,7 +67,7 @@ public class RedisStoreBolt extends AbstractRedisBolt {
      * {@inheritDoc}
      */
     @Override
-    public void execute(Tuple input) {
+    public void process(Tuple input) {
         String key = storeMapper.getKeyFromTuple(input);
         String value = storeMapper.getValueFromTuple(input);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
index ff0a96e..4feaaf5 100644
--- a/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
@@ -22,6 +22,7 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
 import org.apache.solr.client.solrj.SolrClient;
@@ -40,7 +41,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class SolrUpdateBolt extends BaseRichBolt {
+public class SolrUpdateBolt extends BaseTickTupleAwareRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(SolrUpdateBolt.class);
 
     /**
@@ -85,12 +86,10 @@ public class SolrUpdateBolt extends BaseRichBolt {
     }
 
     @Override
-    public void execute(Tuple tuple) {
+    protected void process(Tuple tuple) {
         try {
-            if (!TupleUtils.isTick(tuple)) {    // Don't add tick tuples to the SolrRequest
-                SolrRequest request = solrMapper.toSolrRequest(tuple);
-                solrClient.request(request, solrMapper.getCollection());
-            }
+            SolrRequest request = solrMapper.toSolrRequest(tuple);
+            solrClient.request(request, solrMapper.getCollection());
             ack(tuple);
         } catch (Exception e) {
             fail(tuple, e);

http://git-wip-us.apache.org/repos/asf/storm/blob/26cc6381/storm-core/src/jvm/org/apache/storm/topology/base/BaseTickTupleAwareRichBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/base/BaseTickTupleAwareRichBolt.java
b/storm-core/src/jvm/org/apache/storm/topology/base/BaseTickTupleAwareRichBolt.java
new file mode 100644
index 0000000..be6a6fc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/base/BaseTickTupleAwareRichBolt.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology.base;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+
+/**
+ * This class is based on BaseRichBolt, but is aware of tick tuple.
+ */
+public abstract class BaseTickTupleAwareRichBolt extends BaseRichBolt {
+    /**
+     * {@inheritDoc}
+     *
+     * @param tuple the tuple to process.
+     */
+    @Override
+    public void execute(final Tuple tuple) {
+        if (TupleUtils.isTick(tuple)) {
+            onTickTuple(tuple);
+        } else {
+            process(tuple);
+        }
+    }
+
+    /**
+     * Process a single tick tuple of input. Tick tuple doesn't need to be acked.
+     * It provides default "DO NOTHING" implementation for convenient. Override this method
if needed.
+     *
+     * More details on {@link org.apache.storm.task.IBolt#execute(Tuple)}.
+     *
+     * @param tuple The input tuple to be processed.
+     */
+    protected void onTickTuple(final Tuple tuple) {
+    }
+
+    /**
+     * Process a single non-tick tuple of input. Implementation needs to handle ack manually.
+     * More details on {@link org.apache.storm.task.IBolt#execute(Tuple)}.
+     *
+     * @param tuple The input tuple to be processed.
+     */
+    protected abstract void process(final Tuple tuple);
+}


Mime
View raw message