storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/2] storm git commit: STORM-2724 Shutdown ExecutorService in WaterMarkEventGenerator in shutdown phase
Date Fri, 08 Sep 2017 23:07:28 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch e178990cb -> e462979b3


STORM-2724 Shutdown ExecutorService in WaterMarkEventGenerator in shutdown phase

* also names unnamed executor services in windowing package
* address review comment from @srdo


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a648db54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a648db54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a648db54

Branch: refs/heads/1.x-branch
Commit: a648db54b9848aac651dbff88ebe721369a7f477
Parents: fd6185f
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Thu Sep 7 23:36:10 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Sat Sep 9 08:05:43 2017 +0900

----------------------------------------------------------------------
 .../storm/topology/WindowedBoltExecutor.java    |  1 +
 .../storm/windowing/TimeTriggerPolicy.java      |  8 ++++++-
 .../windowing/WaterMarkEventGenerator.java      | 25 +++++++++++++++++++-
 .../windowing/WaterMarkEventGeneratorTest.java  |  6 +++++
 4 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a648db54/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index 965d11c..26c69a2 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -299,6 +299,7 @@ public class WindowedBoltExecutor implements IRichBolt {
 
     @Override
     public void cleanup() {
+        waterMarkEventGenerator.shutdown();
         windowManager.shutdown();
         bolt.cleanup();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a648db54/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
index f6e0197..882b6be 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.windowing;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.storm.topology.FailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,6 +26,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -46,7 +48,11 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T>
{
     public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy<T>
evictionPolicy) {
         this.duration = millis;
         this.handler = handler;
-        this.executor = Executors.newSingleThreadScheduledExecutor();
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("time-trigger-policy-%d")
+                .setDaemon(true)
+                .build();
+        this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
         this.evictionPolicy = evictionPolicy;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a648db54/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
index d4f431f..194c359 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
@@ -25,8 +25,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.topology.FailedException;
 import org.slf4j.Logger;
@@ -60,7 +63,13 @@ public class WaterMarkEventGenerator<T> implements Runnable {
                                    int eventTsLagMs, Set<GlobalStreamId> inputStreams)
{
         this.windowManager = windowManager;
         streamToTs = new ConcurrentHashMap<>();
-        executorService = Executors.newSingleThreadScheduledExecutor();
+
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("watermark-event-generator-%d")
+                .setDaemon(true)
+                .build();
+        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+
         this.interval = intervalMs;
         this.eventTsLag = eventTsLagMs;
         this.inputStreams = inputStreams;
@@ -126,4 +135,18 @@ public class WaterMarkEventGenerator<T> implements Runnable {
     public void start() {
         this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval,
TimeUnit.MILLISECONDS);
     }
+
+    public void shutdown() {
+        LOG.debug("Shutting down WaterMarkEventGenerator");
+        executorService.shutdown();
+
+        try {
+            if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException ie) {
+            executorService.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a648db54/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java
b/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java
index 9f4bcf8..e735619 100644
--- a/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java
@@ -18,6 +18,7 @@
 package org.apache.storm.windowing;
 
 import org.apache.storm.generated.GlobalStreamId;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -56,6 +57,11 @@ public class WaterMarkEventGeneratorTest {
         waterMarkEventGenerator.start();
     }
 
+    @After
+    public void tearDown() {
+        waterMarkEventGenerator.shutdown();
+    }
+
     @Test
     public void testTrackSingleStream() throws Exception {
         waterMarkEventGenerator.track(streamId("s1"), 100);


Mime
View raw message