storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2557: A bug in DisruptorQueue causing severe underestimation of queue arrival rates
Date Tue, 20 Jun 2017 12:20:39 GMT
Repository: storm
Updated Branches:
  refs/heads/master 8a71ec5be -> 4461a1d54


STORM-2557: A bug in DisruptorQueue causing severe underestimation of queue arrival rates

* create a method for counting number of tuples
* Closes #2164


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8299b0c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8299b0c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8299b0c9

Branch: refs/heads/master
Commit: 8299b0c9e0ca0213055e2199d3a6947cf33e08ba
Parents: 8a71ec5
Author: wendyshusband <tkl449@126.com>
Authored: Sat Jun 17 13:40:17 2017 +0800
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Tue Jun 20 21:11:41 2017 +0900

----------------------------------------------------------------------
 .../org/apache/storm/utils/DisruptorQueue.java  | 24 ++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8299b0c9/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 5c423e1..1d45087 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -500,8 +500,25 @@ public class DisruptorQueue implements IStatefulObject {
         return Thread.currentThread().getId();
     }
 
+    private long getTupleCount(Object obj) {
+        //a published object could be an instance of either AddressedTuple, ArrayList<AddressedTuple>,
or HashMap<Integer, ArrayList<TaskMessage>>.
+        long tupleCount;
+        if (obj instanceof ArrayList) {
+            tupleCount = ((ArrayList) obj).size();
+        } else if (obj instanceof HashMap) {
+            tupleCount = 0;
+            for (Object value:((HashMap) obj).values()) {
+                tupleCount += ((ArrayList) value).size();
+            }
+        } else {
+            tupleCount = 1;
+        }
+        return tupleCount;
+    }
+
     private void publishDirectSingle(Object obj, boolean block) throws InsufficientCapacityException
{
         long at;
+        long numberOfTuples;
         if (block) {
             at = _buffer.next();
         } else {
@@ -510,7 +527,8 @@ public class DisruptorQueue implements IStatefulObject {
         AtomicReference<Object> m = _buffer.get(at);
         m.set(obj);
         _buffer.publish(at);
-        _metrics.notifyArrivals(1);
+        numberOfTuples = getTupleCount(obj);
+        _metrics.notifyArrivals(numberOfTuples);
     }
 
     private void publishDirect(ArrayList<Object> objs, boolean block) throws InsufficientCapacityException
{
@@ -524,13 +542,15 @@ public class DisruptorQueue implements IStatefulObject {
             }
             long begin = end - (size - 1);
             long at = begin;
+            long numberOfTuples = 0;
             for (Object obj: objs) {
                 AtomicReference<Object> m = _buffer.get(at);
                 m.set(obj);
                 at++;
+                numberOfTuples += getTupleCount(obj);
             }
             _buffer.publish(begin, end);
-            _metrics.notifyArrivals(size);
+            _metrics.notifyArrivals(numberOfTuples);
         }
     }
 


Mime
View raw message