storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/8] git commit: use latest version of disruptor 3.2.1
Date Thu, 31 Jul 2014 00:23:16 GMT
Repository: incubator-storm
Updated Branches:
  refs/heads/master 9ac011129 -> c8a6e716a


use latest version of disruptor 3.2.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/db16c1df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/db16c1df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/db16c1df

Branch: refs/heads/master
Commit: db16c1dfd0cf41a6d6e7f105ef0b121f5bec0642
Parents: dd1d213
Author: Boris Aksenov <aksenov@corp.finam.ru>
Authored: Sun Jun 8 02:55:03 2014 +0400
Committer: Boris Aksenov <aksenov@corp.finam.ru>
Committed: Sun Jun 8 02:55:03 2014 +0400

----------------------------------------------------------------------
 pom.xml                                         |  4 +-
 storm-core/pom.xml                              |  2 +-
 storm-core/src/clj/backtype/storm/disruptor.clj | 22 +++----
 .../backtype/storm/utils/DisruptorQueue.java    | 67 ++++++++++----------
 4 files changed, 46 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/db16c1df/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2c0cbc1..3235954 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,7 +167,7 @@
         <snakeyaml.version>1.11</snakeyaml.version>
         <httpclient.version>4.3.3</httpclient.version>
         <clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
-        <disruptor.version>2.10.1</disruptor.version>
+        <disruptor.version>3.2.1</disruptor.version>
         <jgrapht.version>0.9.0</jgrapht.version>
         <guava.version>13.0</guava.version>
         <logback-classic.version>1.0.6</logback-classic.version>
@@ -386,7 +386,7 @@
                 <version>${clojure.tools.cli.version}</version>
             </dependency>
             <dependency>
-                <groupId>com.googlecode.disruptor</groupId>
+                <groupId>com.lmax</groupId>
                 <artifactId>disruptor</artifactId>
                 <version>${disruptor.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/db16c1df/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index fec6218..ae35821 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -142,7 +142,7 @@
         </dependency>
 
         <dependency>
-            <groupId>com.googlecode.disruptor</groupId>
+            <groupId>com.lmax</groupId>
             <artifactId>disruptor</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/db16c1df/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index 9456d1a..d5cb972 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -15,9 +15,8 @@
 ;; limitations under the License.
 (ns backtype.storm.disruptor
   (:import [backtype.storm.utils DisruptorQueue])
-  (:import [com.lmax.disruptor MultiThreadedClaimStrategy SingleThreadedClaimStrategy
-              BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
-              BusySpinWaitStrategy])
+  (:import [com.lmax.disruptor BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
BusySpinWaitStrategy])
+  (:import [com.lmax.disruptor.dsl ProducerType])
   (:require [clojure [string :as str]])
   (:require [clojure [set :as set]])
   (:use [clojure walk])
@@ -25,16 +24,16 @@
   )
 
 (def CLAIM-STRATEGY
-  {:multi-threaded (fn [size] (MultiThreadedClaimStrategy. (int size)))
-   :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))
-    })
-    
+  {:multi-threaded (ProducerType/MULTI)
+   :single-threaded (ProducerType/SINGLE)
+   })
+
 (def WAIT-STRATEGY
   {:block (fn [] (BlockingWaitStrategy.))
    :yield (fn [] (YieldingWaitStrategy.))
    :sleep (fn [] (SleepingWaitStrategy.))
    :spin (fn [] (BusySpinWaitStrategy.))
-    })
+   })
 
 
 (defn- mk-wait-strategy [spec]
@@ -48,9 +47,10 @@
 ;; wouldn't make it to the acker until the batch timed out and another tuple was played into
the queue, 
 ;; unblocking the consumer
 (defnk disruptor-queue [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
-  (DisruptorQueue. ((CLAIM-STRATEGY claim-strategy) buffer-size)
-                   (mk-wait-strategy wait-strategy)
-                   ))
+  (DisruptorQueue. (CLAIM-STRATEGY claim-strategy)
+    buffer-size
+    (mk-wait-strategy wait-strategy)
+    ))
 
 (defn clojure-handler [afn]
   (reify com.lmax.disruptor.EventHandler

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/db16c1df/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 56751c6..aaa4b34 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -18,22 +18,18 @@
 package backtype.storm.utils;
 
 import com.lmax.disruptor.AlertException;
-import com.lmax.disruptor.ClaimStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.InsufficientCapacityException;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.Sequence;
 import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.SingleThreadedClaimStrategy;
+import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.ProducerType;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
-import java.util.Map;
 import backtype.storm.metric.api.IStatefulObject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 /**
  *
@@ -41,39 +37,39 @@ import java.util.logging.Logger;
  * the ability to catch up to the producer by processing tuples in batches.
  */
 public class DisruptorQueue implements IStatefulObject {
-    static final Object FLUSH_CACHE = new Object();
-    static final Object INTERRUPT = new Object();
-    
-    RingBuffer<MutableObject> _buffer;
-    Sequence _consumer;
-    SequenceBarrier _barrier;
-    
+    private static final Object FLUSH_CACHE = new Object();
+    private static final Object INTERRUPT = new Object();
+
+    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
+    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
+
+    private final RingBuffer<MutableObject> _buffer;
+    private final Sequence _consumer;
+    private final SequenceBarrier _barrier;
+
     // TODO: consider having a threadlocal cache of this variable to speed up reads?
     volatile boolean consumerStartedFlag = false;
-    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
-    
-    public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
-        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
+
+    public DisruptorQueue(ProducerType producerType, int bufferSize, WaitStrategy wait) {
+        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
         _consumer = new Sequence();
         _barrier = _buffer.newBarrier();
-        _buffer.setGatingSequences(_consumer);
-        if(claim instanceof SingleThreadedClaimStrategy) {
-            consumerStartedFlag = true;
-        }
+        _buffer.addGatingSequences(_consumer);
+        consumerStartedFlag = producerType == ProducerType.SINGLE;
     }
-    
+
     public void consumeBatch(EventHandler<Object> handler) {
         consumeBatchToCursor(_barrier.getCursor(), handler);
     }
-    
+
     public void haltWithInterrupt() {
         publish(INTERRUPT);
     }
-    
+
     public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
         try {
             final long nextSequence = _consumer.get() + 1;
-            final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
+            final long availableSequence = _barrier.waitFor(nextSequence);
             if(availableSequence >= nextSequence) {
                 consumeBatchToCursor(availableSequence, handler);
             }
@@ -81,10 +77,12 @@ public class DisruptorQueue implements IStatefulObject {
             throw new RuntimeException(e);
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
+        } catch (TimeoutException e) {
+            throw new RuntimeException(e);
         }
     }
-    
-    
+
+
     private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
         for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
             try {
@@ -93,7 +91,7 @@ public class DisruptorQueue implements IStatefulObject {
                 mo.setObject(null);
                 if(o==FLUSH_CACHE) {
                     Object c = null;
-                    while(true) {                        
+                    while(true) {
                         c = _cache.poll();
                         if(c==null) break;
                         else handler.onEvent(c, curr, true);
@@ -110,7 +108,7 @@ public class DisruptorQueue implements IStatefulObject {
         //TODO: only set this if the consumer cursor has changed?
         _consumer.set(cursor);
     }
-    
+
     /*
      * Caches until consumerStarted is called, upon which the cache is flushed to the consumer
      */
@@ -121,11 +119,11 @@ public class DisruptorQueue implements IStatefulObject {
             throw new RuntimeException("This code should be unreachable!");
         }
     }
-    
+
     public void tryPublish(Object obj) throws InsufficientCapacityException {
         publish(obj, false);
     }
-    
+
     public void publish(Object obj, boolean block) throws InsufficientCapacityException {
         if(consumerStartedFlag) {
             final long id;
@@ -142,14 +140,14 @@ public class DisruptorQueue implements IStatefulObject {
             if(consumerStartedFlag) flushCache();
         }
     }
-    
+
     public void consumerStarted() {
         if(!consumerStartedFlag) {
             consumerStartedFlag = true;
             flushCache();
         }
     }
-    
+
     private void flushCache() {
         publish(FLUSH_CACHE);
     }
@@ -162,7 +160,6 @@ public class DisruptorQueue implements IStatefulObject {
 
     @Override
     public Object getState() {
-        Map state = new HashMap<String, Object>();
         // get readPos then writePos so it's never an under-estimate
         long rp = readPos();
         long wp = writePos();
@@ -177,6 +174,6 @@ public class DisruptorQueue implements IStatefulObject {
         @Override
         public MutableObject newInstance() {
             return new MutableObject();
-        }        
+        }
     }
 }


Mime
View raw message