storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/2] storm git commit: MINOR: Update DruidBeamBolt logs
Date Wed, 06 Sep 2017 13:59:15 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch fd6185f8f -> e178990cb


MINOR: Update DruidBeamBolt logs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/14043be9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/14043be9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/14043be9

Branch: refs/heads/1.x-branch
Commit: 14043be91ab246c043221262aaf132b0193e9201
Parents: fd6185f
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
Authored: Tue Aug 29 14:01:00 2017 +0530
Committer: Manikumar Reddy O <manikumar.reddy@gmail.com>
Committed: Tue Sep 5 21:27:55 2017 +0530

----------------------------------------------------------------------
 .../org/apache/storm/druid/bolt/DruidBeamBolt.java     | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/14043be9/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
index 86b724f..fdd9449 100644
--- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
+++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
@@ -25,12 +25,10 @@ import com.twitter.util.FutureEventListener;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,27 +76,28 @@ public class DruidBeamBolt<E> extends BaseTickTupleAwareRichBolt
{
 
     @Override
     protected void process(final Tuple tuple) {
-        Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
-        LOG.debug("Sent tuple : [{}]", tuple);
+        final E mappedEvent = druidEventMapper.getEvent(tuple);
+        Future future = tranquilizer.send(mappedEvent);
+        LOG.debug("Sent tuple : [{}]", mappedEvent);
 
         future.addEventListener(new FutureEventListener() {
             @Override
             public void onFailure(Throwable cause) {
                 if (cause instanceof MessageDroppedException) {
                     collector.ack(tuple);
-                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", tuple);
+                    LOG.debug("Tuple Dropped due to MessageDroppedException {} : [{}]", cause.getMessage(),
mappedEvent);
                     if (druidConfig.getDiscardStreamId() != null)
                         collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple,
System.currentTimeMillis()));
                 } else {
                     collector.fail(tuple);
-                    LOG.debug("Tuple Processing Failed : [{}]", tuple);
+                    LOG.error("Tuple Processing Failed : [{}]", mappedEvent, cause);
                 }
             }
 
             @Override
             public void onSuccess(Object value) {
                 collector.ack(tuple);
-                LOG.debug("Tuple Processing Success : [{}]", tuple);
+                LOG.debug("Tuple Processing Success : [{}]", mappedEvent);
             }
         });
 


Mime
View raw message