helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch master updated: Shutdown the TaskStateModelFactory threads created in the tests. (#1140)
Date Tue, 07 Jul 2020 18:06:04 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fc8708  Shutdown the TaskStateModelFactory threads created in the tests. (#1140)
3fc8708 is described below

commit 3fc870815219d910c6e357b6b4ba843b2244c776
Author: Jiajun Wang <jjwang@linkedin.com>
AuthorDate: Tue Jul 7 11:05:56 2020 -0700

    Shutdown the TaskStateModelFactory threads created in the tests. (#1140)
    
    The TaskStateModelFactory initialization creates a thread pool. The expectation is that
the application code closes the thread pool and the threads when the participant instance
is shutting down. In most cases, this means the JVM is going to be shutdown. So this operation
is not a must. However, in the test cases, these thread pools leak thousands of threads.
    This PR adds cleanup logic to shutdown thread pools that are created for the participant
instances. Note that there is still thread leakage when the participants are created separately
instead of using the general methods.
---
 .../apache/helix/task/TaskStateModelFactory.java   | 10 +++++++++
 .../helix/task/TaskSynchronizedTestBase.java       | 26 +++++++++++++++++-----
 2 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index f30dd9f..6d7c80e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.JMException;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.HelixManager;
 import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -95,6 +96,15 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel>
{
     }
   }
 
+  @VisibleForTesting
+  void shutdownNow() {
+    _taskExecutor.shutdownNow();
+    _timerTaskExecutor.shutdownNow();
+    if (_monitor != null) {
+      _monitor.unregister();
+    }
+  }
+
   public boolean isShutdown() {
     return _taskExecutor.isShutdown();
   }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
index f90dd34..c281b16 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
@@ -35,6 +35,7 @@ import org.apache.helix.integration.task.MockTask;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
@@ -161,6 +162,10 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
   }
 
   protected void startParticipant(String zkAddr, int i) {
+    if (_participants[i] != null) {
+      stopParticipant(i);
+    }
+
     Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
     taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
     String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
@@ -168,7 +173,7 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
 
     // Register a Task state model factory.
     StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-    stateMachine.registerStateModelFactory("Task",
+    stateMachine.registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
         new TaskStateModelFactory(_participants[i], taskFactoryReg));
     _participants[i].syncStart();
   }
@@ -181,12 +186,23 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
 
   protected void stopParticipant(int i) {
     if (_participants.length <= i) {
-      throw new HelixException(
-          String.format("Can't stop participant %s, only %s participants" + "were set up.",
i,
+      throw new HelixException(String
+          .format("Can't stop participant %s, only %s participants" + "were set up.", i,
               _participants.length));
     }
-    if (_participants[i] != null && _participants[i].isConnected()) {
-      _participants[i].syncStop();
+    if (_participants[i] != null) {
+      if (_participants[i].isConnected()) {
+        _participants[i].syncStop();
+      }
+      // Shutdown the state model factories to close all threads.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      if (stateMachine != null) {
+        StateModelFactory stateModelFactory =
+            stateMachine.getStateModelFactory(TaskConstants.STATE_MODEL_NAME);
+        if (stateModelFactory != null && stateModelFactory instanceof TaskStateModelFactory)
{
+          ((TaskStateModelFactory) stateModelFactory).shutdownNow();
+        }
+      }
     }
   }
 


Mime
View raw message