storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [2/8] git commit: Merge branch 'master'
Date Thu, 31 Jul 2014 00:23:17 GMT
Merge branch 'master'


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/89109b05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/89109b05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/89109b05

Branch: refs/heads/master
Commit: 89109b058c19f5f254a0de9dcb1f40fef875f3bf
Parents: db16c1d 1a57fcf
Author: Boris Aksenov <aksenov@corp.finam.ru>
Authored: Mon Jun 9 20:36:55 2014 +0400
Committer: Boris Aksenov <aksenov@corp.finam.ru>
Committed: Mon Jun 9 20:36:55 2014 +0400

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 conf/defaults.yaml                              |   9 +
 .../src/clj/backtype/storm/daemon/executor.clj  |   1 +
 .../src/clj/backtype/storm/daemon/worker.clj    |  67 ++--
 storm-core/src/clj/backtype/storm/disruptor.clj |  16 +-
 .../src/clj/backtype/storm/messaging/loader.clj |  81 ++--
 .../src/clj/backtype/storm/messaging/local.clj  |  20 +-
 storm-core/src/clj/backtype/storm/timer.clj     |   5 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  19 +
 .../backtype/storm/messaging/IConnection.java   |  15 +-
 .../backtype/storm/messaging/netty/Client.java  | 388 +++++++++++--------
 .../backtype/storm/messaging/netty/Context.java |  41 +-
 .../storm/messaging/netty/ControlMessage.java   |   6 +-
 .../storm/messaging/netty/MessageBatch.java     |   5 +
 .../storm/messaging/netty/MessageDecoder.java   | 108 ++++--
 .../netty/NettyRenameThreadFactory.java         |  35 ++
 .../backtype/storm/messaging/netty/Server.java  | 145 ++++++-
 .../netty/StormClientErrorHandler.java          |  41 ++
 .../messaging/netty/StormClientHandler.java     |  87 -----
 .../netty/StormClientPipelineFactory.java       |   2 +-
 .../messaging/netty/StormServerHandler.java     |  40 +-
 .../storm/testing/TestEventLogSpout.java        | 139 +++++++
 .../storm/testing/TestEventOrderCheckBolt.java  |  76 ++++
 .../backtype/storm/utils/DisruptorQueue.java    |  10 +-
 .../backtype/storm/utils/TransferDrainer.java   | 113 ++++++
 .../src/jvm/backtype/storm/utils/Utils.java     |  46 ++-
 .../storm/messaging/netty_unit_test.clj         |  46 ++-
 .../test/clj/backtype/storm/messaging_test.clj  |  35 +-
 28 files changed, 1149 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/89109b05/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/disruptor.clj
index d5cb972,a199055..72bf0a7
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@@ -46,11 -47,11 +46,12 @@@
  ;; This would manifest itself in Trident when doing 1 batch at a time processing, and the
ack_init message
  ;; wouldn't make it to the acker until the batch timed out and another tuple was played
into the queue, 
  ;; unblocking the consumer
- (defnk disruptor-queue [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
-   (DisruptorQueue. (CLAIM-STRATEGY claim-strategy)
-     buffer-size
-     (mk-wait-strategy wait-strategy)
-     ))
+ (defnk disruptor-queue [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy
:block]
+   (DisruptorQueue. queue-name
 -                   ((CLAIM-STRATEGY claim-strategy) buffer-size)
++                   (CLAIM-STRATEGY claim-strategy)
++                   buffer-size
+                    (mk-wait-strategy wait-strategy)
+                    ))
  
  (defn clojure-handler [afn]
    (reify com.lmax.disruptor.EventHandler

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/89109b05/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index aaa4b34,8c5b466..df2545f
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@@ -37,27 -41,34 +37,35 @@@ import backtype.storm.metric.api.IState
   * the ability to catch up to the producer by processing tuples in batches.
   */
  public class DisruptorQueue implements IStatefulObject {
 -    static final Object FLUSH_CACHE = new Object();
 -    static final Object INTERRUPT = new Object();
 -    
 -    RingBuffer<MutableObject> _buffer;
 -    Sequence _consumer;
 -    SequenceBarrier _barrier;
 -    
 +    private static final Object FLUSH_CACHE = new Object();
 +    private static final Object INTERRUPT = new Object();
++    private static final String PREFIX = "disruptor-";
 +
 +    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
 +    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
 +
++    private final String _queueName;
 +    private final RingBuffer<MutableObject> _buffer;
 +    private final Sequence _consumer;
 +    private final SequenceBarrier _barrier;
 +
      // TODO: consider having a threadlocal cache of this variable to speed up reads?
      volatile boolean consumerStartedFlag = false;
 -    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
 -    private static String PREFIX = "disruptor-";
 -    private String _queueName = "";
 -    
 -    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
 -         this._queueName = PREFIX + queueName;
 -        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
 +
-     public DisruptorQueue(ProducerType producerType, int bufferSize, WaitStrategy wait)
{
++    public DisruptorQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy
wait) {
 +        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize,
wait);
++        _queueName = PREFIX + queueName;
          _consumer = new Sequence();
          _barrier = _buffer.newBarrier();
 -        _buffer.setGatingSequences(_consumer);
 -        if(claim instanceof SingleThreadedClaimStrategy) {
 -            consumerStartedFlag = true;
 -        }
 +        _buffer.addGatingSequences(_consumer);
 +        consumerStartedFlag = producerType == ProducerType.SINGLE;
      }
 -    
 +
+     public String getName() {
 -      return _queueName;
++        return _queueName;
+     }
 -    
++
++
      public void consumeBatch(EventHandler<Object> handler) {
          consumeBatchToCursor(_barrier.getCursor(), handler);
      }


Mime
View raw message