helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject helix git commit: [HELIX-681] change controller msg purge timeout to larger number
Date Wed, 25 Apr 2018 01:04:10 GMT
Repository: helix
Updated Branches:
  refs/heads/master 6916b4899 -> ba86a3f55


[HELIX-681] change controller msg purge timeout to larger number


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ba86a3f5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ba86a3f5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ba86a3f5

Branch: refs/heads/master
Commit: ba86a3f554d600a2f781f0975335f3bad431a3ba
Parents: 6916b48
Author: Harry Zhang <zhan849@usc.edu>
Authored: Tue Apr 24 18:02:39 2018 -0700
Committer: Harry Zhang <zhan849@usc.edu>
Committed: Tue Apr 24 18:02:39 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/SystemPropertyKeys.java  |  3 +++
 .../helix/controller/stages/MessageGenerationPhase.java |  8 ++++++--
 .../src/test/java/org/apache/helix/ZkUnitTestBase.java  |  1 +
 .../helix/controller/stages/TestRebalancePipeline.java  | 12 ++++++------
 4 files changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ba86a3f5/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 2277258..7af9635 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -27,4 +27,7 @@ public class SystemPropertyKeys {
   public static final String ASYNC_BATCH_MODE_ENABLED = "helix.callbackhandler.isAsyncBatchModeEnabled";
 
   public static final String LEGACY_ASYNC_BATCH_MODE_ENABLED = "isAsyncBatchModeEnabled";
+
+  // Controller
+  public static final String CONTROLLER_MESSAGE_PURGE_DELAY = "helix.controller.stages.MessageGenerationPhase.messagePurgeDelay";
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ba86a3f5/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index de466ac..86bacea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
@@ -42,6 +43,7 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,10 +55,12 @@ public class MessageGenerationPhase extends AbstractBaseStage {
 
   // If we see there is any invalid pending message leaving on host, i.e. message
   // tells participant to change from SLAVE to MASTER, and the participant is already
-  // at MASTER state, we wait for 3 sec and if the message is still not cleaned up by
+  // at MASTER state, we wait for timeout and if the message is still not cleaned up by
   // participant, controller will cleanup them proactively to unblock further state
   // transition
-  private final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = 3 * 1000;
+  public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
+      .getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
+
   private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/ba86a3f5/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index 4437899..b29375b 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -75,6 +75,7 @@ public class ZkUnitTestBase {
   public void beforeSuite() throws Exception {
     // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends
     System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, "3000");
 
     _zkServer = TestHelper.startZkServer(ZK_ADDR);
     AssertJUnit.assertTrue(_zkServer != null);

http://git-wip-us.apache.org/repos/asf/helix/blob/ba86a3f5/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 9d4790d..d38907d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -129,8 +129,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    HelixManager manager = new DummyClusterManager(clusterName, accessor);
-    ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     refreshClusterConfig(clusterName, accessor);
     final String resourceName = "testResource_dup";
     String[] resourceGroups = new String[] {
@@ -152,6 +150,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         0, 1
     });
 
+    long msgPurgeDelay = MessageGenerationPhase.DEFAULT_OBSELETE_MSG_PURGE_DELAY;
+
     ClusterControllerManager controller =
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
@@ -173,13 +173,13 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", "session_1",
         "SLAVE", true);
 
-    // Controller has 3s timeout, so after 1s, controller should not have GCed message
+    // Controller has timeout > 1sec, so after 1s, controller should not have GCed message
     Thread.sleep(1000);
     Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_0")).size(),
1);
     Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_1")).size(),
1);
 
-    // After another 2 second, controller should cleanup messages and continue to rebalance
-    Thread.sleep(3000);
+    // After another purge delay, controller should cleanup messages and continue to rebalance
+    Thread.sleep(msgPurgeDelay);
     // Manually trigger another rebalance by touching current state
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
         "SLAVE");
@@ -197,7 +197,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     // controller will clean it up
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
         "MASTER", true);
-    Thread.sleep(3500);
+    Thread.sleep(msgPurgeDelay);
     // touch current state to trigger rebalance
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
         "MASTER", false);


Mime
View raw message