storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [5/8] storm git commit: Merge branch 'STORM-929' into STORM-935
Date Thu, 16 Jul 2015 23:32:25 GMT
Merge branch 'STORM-929' into STORM-935

Conflicts:
	conf/defaults.yaml


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fb2e9690
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fb2e9690
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fb2e9690

Branch: refs/heads/0.9.x-branch
Commit: fb2e9690c718b7448ef59e05a9e8bfb6afa703de
Parents: b3c5dc9
Author: errordaiwa <xingyusu@outlook.com>
Authored: Tue Jul 14 12:40:45 2015 +0800
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Fri Jul 17 08:23:56 2015 +0900

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java               | 7 +++++++
 storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 7 +++++--
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fb2e9690/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 993052d..ff8b595 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -866,6 +866,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
     public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
 
+    /**
+     * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used
to tradeoff latency
+     * vs. CPU usage
+     */
+    public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+    public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
+
     public static void setClasspath(Map conf, String cp) {
         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fb2e9690/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index c4c936a..4d3f18b 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -98,8 +98,11 @@ public class DisruptorQueue implements IStatefulObject {
     public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
         try {
             final long nextSequence = _consumer.get() + 1;
-            final long availableSequence = _barrier.waitFor(nextSequence);
-            if(availableSequence >= nextSequence) {
+            final long availableSequence =
+                    _waitTimeout == 0L ? _barrier.waitFor(nextSequence) : _barrier.waitFor(nextSequence,
_waitTimeout,
+                            TimeUnit.MILLISECONDS);
+
+            if (availableSequence >= nextSequence) {
                 consumeBatchToCursor(availableSequence, handler);
             }
         } catch (AlertException e) {


Mime
View raw message