storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [2/8] storm git commit: make disruptor queue wait timeout configurable
Date Thu, 16 Jul 2015 23:32:22 GMT
make disruptor queue wait timeout configurable

Conflicts:
	conf/defaults.yaml


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/449fb8cb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/449fb8cb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/449fb8cb

Branch: refs/heads/0.9.x-branch
Commit: 449fb8cbfdbd5bce14bc3a47f9e0a91b617c49f8
Parents: 979e23c
Author: errordaiwa <xingyusu@outlook.com>
Authored: Fri Jul 10 11:00:28 2015 +0800
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Fri Jul 17 08:18:03 2015 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                                           | 1 +
 storm-core/src/clj/backtype/storm/daemon/executor.clj        | 1 +
 storm-core/src/clj/backtype/storm/daemon/worker.clj          | 2 ++
 storm-core/src/clj/backtype/storm/disruptor.clj              | 4 ++--
 storm-core/src/jvm/backtype/storm/Config.java                | 7 +++++++
 storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java  | 8 ++++++--
 .../test/jvm/backtype/storm/utils/DisruptorQueueTest.java    | 2 +-
 7 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/449fb8cb/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e2b3300..c05fd9e 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -153,5 +153,6 @@ topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSeria
 topology.trident.batch.emit.interval.millis: 500
 topology.classpath: null
 topology.environment: null
+topology.disruptor.wait.timeout.millis: 10
 
 dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

http://git-wip-us.apache.org/repos/asf/storm/blob/449fb8cb/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 8acd459..df8219b 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -208,6 +208,7 @@
         batch-transfer->worker (disruptor/disruptor-queue
                                   (str "executor"  executor-id "-send-queue")
                                   (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
+                                  (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
                                   :claim-strategy :single-threaded
                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
         ]

http://git-wip-us.apache.org/repos/asf/storm/blob/449fb8cb/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index d5569a7..08b9f00 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -140,6 +140,7 @@
        ;; TODO: this depends on the type of executor
        (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
                                                   (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
+                                                  (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))]))
        (into {})
        ))
@@ -184,6 +185,7 @@
         storm-conf (read-supervisor-storm-conf conf storm-id)
         executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id
port assignment-versions))
         transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
+                                                  (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
         

http://git-wip-us.apache.org/repos/asf/storm/blob/449fb8cb/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index a723601..e96e49d 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -45,10 +45,10 @@
 ;; wouldn't make it to the acker until the batch timed out and another tuple was played into
the queue,
 ;; unblocking the consumer
 (defnk disruptor-queue
-  [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
+  [^String queue-name buffer-size timeout :claim-strategy :multi-threaded :wait-strategy
:block]
   (DisruptorQueue. queue-name
                    ((CLAIM-STRATEGY claim-strategy) buffer-size)
-                   (mk-wait-strategy wait-strategy)))
+                   (mk-wait-strategy wait-strategy) timeout))
 
 (defn clojure-handler
   [afn]

http://git-wip-us.apache.org/repos/asf/storm/blob/449fb8cb/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 77e7cbc..8ce9c79 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -859,6 +859,13 @@ public class Config extends HashMap<String, Object> {
     public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
     public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = Map.class;
 
+    /**
+     * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used
to tradeoff latency
+     * vs. CPU usage
+     */
+    public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+    public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = Number.class;
+
     public static void setClasspath(Map conf, String cp) {
         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/449fb8cb/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 1f2110d..195c2f7 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -60,8 +60,10 @@ public class DisruptorQueue implements IStatefulObject {
     
     private static String PREFIX = "disruptor-";
     private String _queueName = "";
+
+    private long _waitTimeout;
     
-    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
+    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait, long
timeout) {
          this._queueName = PREFIX + queueName;
         _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
         _consumer = new Sequence();
@@ -77,6 +79,8 @@ public class DisruptorQueue implements IStatefulObject {
                 throw new RuntimeException("This code should be unreachable!", e);
             }
         }
+
+        _waitTimeout = timeout;
     }
     
     public String getName() {
@@ -94,7 +98,7 @@ public class DisruptorQueue implements IStatefulObject {
     public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
         try {
             final long nextSequence = _consumer.get() + 1;
-            final long availableSequence = _barrier.waitFor(nextSequence, 1000, TimeUnit.MILLISECONDS);
+            final long availableSequence = _barrier.waitFor(nextSequence, _waitTimeout, TimeUnit.MILLISECONDS);
             if(availableSequence >= nextSequence) {
                 consumeBatchToCursor(availableSequence, handler);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/449fb8cb/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
index 653fd33..9ddb37a 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -148,6 +148,6 @@ public class DisruptorQueueTest extends TestCase {
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
         return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
-                queueSize), new BlockingWaitStrategy());
+                queueSize), new BlockingWaitStrategy(), 10L);
     }
 }


Mime
View raw message