storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/2] storm git commit: STORM-3135: Check for thread interrupted status in Utils.asyncLoop, since not all async loops (e.g. BoltExecutor) are guaranteed to call interruptible methods. Remove JCQueue.INTERRUPTED, replace with regular interrupts of the cal
Date Tue, 10 Jul 2018 22:48:53 GMT
Repository: storm
Updated Branches:
  refs/heads/master 07c795af5 -> 6b043d794


STORM-3135: Check for thread interrupted status in Utils.asyncLoop, since not all async loops
(e.g. BoltExecutor) are guaranteed to call interruptible methods. Remove JCQueue.INTERRUPTED,
replace with regular interrupts of the calling threads


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8cb4baf2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8cb4baf2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8cb4baf2

Branch: refs/heads/master
Commit: 8cb4baf251d742e2d06bdb0f7853ea8b6783a567
Parents: 10f01f4
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Fri Jun 29 22:42:36 2018 +0200
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Tue Jul 10 23:41:01 2018 +0200

----------------------------------------------------------------------
 .../apache/storm/daemon/worker/WorkerTransfer.java |  5 +----
 .../jvm/org/apache/storm/executor/Executor.java    |  3 ---
 .../apache/storm/executor/ExecutorShutdown.java    |  2 +-
 .../apache/storm/metric/internal/RateTracker.java  |  4 +++-
 .../src/jvm/org/apache/storm/utils/JCQueue.java    | 17 +++++++++--------
 .../src/jvm/org/apache/storm/utils/Utils.java      |  3 +++
 .../storm/utils/JCQueueBackpressureTest.java       | 10 ++++++----
 .../jvm/org/apache/storm/utils/JCQueueTest.java    | 15 +++++----------
 8 files changed, 28 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8cb4baf2/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index ad1d27d..e48c9e7 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -88,9 +88,6 @@ class WorkerTransfer implements JCQueue.Consumer {
 
     @Override
     public void accept(Object tuple) {
-        if (tuple == JCQueue.INTERRUPT) {
-            throw new RuntimeException(new InterruptedException("Worker Transfer Thread interrupted"));
-        }
         TaskMessage tm = (TaskMessage) tuple;
         drainer.add(tm);
     }
@@ -138,7 +135,7 @@ class WorkerTransfer implements JCQueue.Consumer {
 
 
     public void haltTransferThd() {
-        transferQueue.haltWithInterrupt();
+        transferQueue.close();
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/8cb4baf2/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 741ad38..e3de2e1 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -259,9 +259,6 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
 
     @Override
     public void accept(Object event) {
-        if (event == JCQueue.INTERRUPT) {
-            throw new RuntimeException(new InterruptedException("JCQ processing interrupted"));
-        }
         AddressedTuple addressedTuple = (AddressedTuple) event;
         int taskId = addressedTuple.getDest();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8cb4baf2/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 2b29642..37ea683 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -90,7 +90,7 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor
{
     public void shutdown() {
         try {
             LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
-            executor.getReceiveQueue().haltWithInterrupt();
+            executor.getReceiveQueue().close();
             for (Utils.SmartThread t : threads) {
                 t.interrupt();
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/8cb4baf2/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java b/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java
index 4c0d241..30a8d02 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java
@@ -12,13 +12,14 @@
 
 package org.apache.storm.metric.internal;
 
+import java.io.Closeable;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This class is a utility to track the rate of something.
  */
-public class RateTracker {
+public class RateTracker implements Closeable {
     private final int _bucketSizeMillis;
     //Old Buckets and their length are only touched when rotating or gathering the metrics,
which should not be that frequent
     // As such all access to them should be protected by synchronizing with the RateTracker
instance
@@ -94,6 +95,7 @@ public class RateTracker {
         return events * 1000.0 / duration;
     }
 
+    @Override
     public void close() {
         if (_task != null) {
             _task.cancel();

http://git-wip-us.apache.org/repos/asf/storm/blob/8cb4baf2/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 207f1e0..08e26f7 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.utils;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -36,8 +37,7 @@ import org.apache.storm.shade.org.jctools.queues.MpscUnboundedArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JCQueue implements IStatefulObject {
-    public static final Object INTERRUPT = new Object();
+public class JCQueue implements IStatefulObject, Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
     private static final String PREFIX = "jc-";
     private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR =
@@ -57,7 +57,7 @@ public class JCQueue implements IStatefulObject {
     private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>();
// ensure 1 instance per producer thd.
     private final JCQueue.QueueMetrics metrics;
     private final IWaitStrategy backPressureWaitStrategy;
-    private String queueName;
+    private final String queueName;
 
     public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy
backPressureWaitStrategy,
                    String topologyId, String componentId, Integer taskId, int port) {
@@ -87,11 +87,11 @@ public class JCQueue implements IStatefulObject {
         return queueName;
     }
 
-    public boolean haltWithInterrupt() {
-        boolean res = tryPublishInternal(INTERRUPT);
-        metrics.close();
+    @Override
+    public void close() {
+        //No need to block, the task run by the executor is safe to run even after metrics
are closed
         METRICS_REPORTER_EXECUTOR.shutdown();
-        return res;
+        metrics.close();
     }
 
     /**
@@ -425,7 +425,7 @@ public class JCQueue implements IStatefulObject {
     /**
      * This inner class provides methods to access the metrics of the JCQueue.
      */
-    public class QueueMetrics {
+    public class QueueMetrics implements Closeable {
         private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
         private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
         private final AtomicLong droppedMessages = new AtomicLong(0);
@@ -477,6 +477,7 @@ public class JCQueue implements IStatefulObject {
             droppedMessages.incrementAndGet();
         }
 
+        @Override
         public void close() {
             arrivalsTracker.close();
             insertFailuresTracker.close();

http://git-wip-us.apache.org/repos/asf/storm/blob/8cb4baf2/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 34ab137..b200fac 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -347,6 +347,9 @@ public class Utils {
                 try {
                     final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call()
: afn;
                     while (true) {
+                        if (Thread.interrupted()) {
+                            throw new InterruptedException();
+                        }
                         final Long s = fn.call();
                         if (s == null) { // then stop running it
                             break;

http://git-wip-us.apache.org/repos/asf/storm/blob/8cb4baf2/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
index 8fefe8f..46390c0 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -12,15 +12,14 @@
 
 package org.apache.storm.utils;
 
-import junit.framework.TestCase;
 import org.apache.storm.policy.WaitStrategyPark;
 import org.apache.storm.utils.JCQueue.Consumer;
 import org.junit.Assert;
 import org.junit.Test;
 
 
-public class JCQueueBackpressureTest extends TestCase {
-
+public class JCQueueBackpressureTest {
+    
     private static JCQueue createQueue(String name, int queueSize) {
         return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test",
1000, 1000);
     }
@@ -41,7 +40,8 @@ public class JCQueueBackpressureTest extends TestCase {
         TestConsumer consumer = new TestConsumer();
         queue.consume(consumer);
         Assert.assertEquals(MESSAGES, consumer.lastMsg);
-
+        
+        queue.close();
     }
 
     // check that tryPublish() & tryOverflowPublish() work as expected
@@ -69,6 +69,8 @@ public class JCQueueBackpressureTest extends TestCase {
         queue.consume(new TestConsumer(), () -> consumeCount.increment() <= 1);
         Assert.assertEquals(CAPACITY - 1, queue.size());
         Assert.assertTrue(queue.tryPublish(0));
+        
+        queue.close();
     }
 
     private static class TestConsumer implements Consumer {

http://git-wip-us.apache.org/repos/asf/storm/blob/8cb4baf2/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index 2956b2c..09a5dd1 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -20,7 +20,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 public class JCQueueTest {
 
@@ -43,7 +42,7 @@ public class JCQueueTest {
 
                 @Override
                 public void accept(Object event) {
-                    if (event == JCQueue.INTERRUPT) {
+                    if (Thread.currentThread().isInterrupted()) {
                         throw new RuntimeException(new InterruptedException("ConsumerThd
interrupted"));
                     }
                     if (head) {
@@ -147,8 +146,8 @@ public class JCQueueTest {
             assertFalse("producer " + i + " is still alive", producerThreads[i].isAlive());
         }
 
-        Thread.sleep(sleepMs);
-        assertTrue("Unable to send halt interrupt", queue.haltWithInterrupt());
+        queue.close();
+        consumerThread.interrupt();
         consumerThread.join(TIMEOUT);
         assertFalse("consumer is still alive", consumerThread.isAlive());
     }
@@ -193,12 +192,8 @@ public class JCQueueTest {
 
         @Override
         public void run() {
-            try {
-                while (true) {
-                    queue.consume(handler);
-                }
-            } catch (RuntimeException e) {
-                //break
+            while (!Thread.currentThread().isInterrupted()) {
+                queue.consume(handler);
             }
         }
     }


Mime
View raw message