helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch master updated: Fix the concurrent modification error happens during the HelixManager initHandlers() call (#904)
Date Sat, 21 Mar 2020 01:18:26 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new d109c64  Fix the concurrent modification error happens during the HelixManager initHandlers()
call (#904)
d109c64 is described below

commit d109c64834b1b288c937b9f9693b26e74487a244
Author: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com>
AuthorDate: Fri Mar 20 18:18:16 2020 -0700

    Fix the concurrent modification error happens during the HelixManager initHandlers() call
(#904)
    
    Fix the concurrent modification error that happens during the HelixManager initHandlers()
call.
    Add test case to verify the fix and ensure this error not happen again.
---
 .../apache/helix/manager/zk/ZKHelixManager.java    | 11 +--
 ...andleNewSession.java => TestHandleSession.java} | 94 ++++++++++++++++++++--
 2 files changed, 92 insertions(+), 13 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index b4368f9..532ef44 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -1004,7 +1004,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
   void initHandlers(List<CallbackHandler> handlers) {
     synchronized (this) {
       if (handlers != null) {
-        for (CallbackHandler handler : handlers) {
+        // get a copy of the list and iterate over the copy list
+        // in case handler.init() modifies the original handler list
+        List<CallbackHandler> tmpHandlers = new ArrayList<>(handlers);
+        for (CallbackHandler handler : tmpHandlers) {
           handler.init();
           LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
         }
@@ -1016,10 +1019,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
     synchronized (this) {
       if (_handlers != null) {
         // get a copy of the list and iterate over the copy list
-        // in case handler.reset() modify the original handler list
-        List<CallbackHandler> tmpHandlers = new ArrayList<>();
-        tmpHandlers.addAll(_handlers);
-
+        // in case handler.reset() modifies the original handler list
+        List<CallbackHandler> tmpHandlers = new ArrayList<>(_handlers);
         for (CallbackHandler handler : tmpHandlers) {
           handler.reset(isShutdown);
           LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
similarity index 84%
rename from helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
rename to helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
index 00bedcf..92eb3c4 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
@@ -22,31 +22,37 @@ package org.apache.helix.manager.zk;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
+import org.apache.helix.api.listeners.CurrentStateChangeListener;
+import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.model.LiveInstance;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 
-public class TestHandleNewSession extends ZkTestBase {
+public class TestHandleSession extends ZkTestBase {
+  private static final String _className = TestHelper.getTestClassName();
+
   @Test
   public void testHandleNewSession() throws Exception {
-    String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
+    String clusterName = _className + "_" + methodName;
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -191,9 +197,8 @@ public class TestHandleNewSession extends ZkTestBase {
    */
   @Test(timeOut = 5 * 60 * 1000L)
   public void testDiscardExpiredSessions() throws Exception {
-    final String className = TestHelper.getTestClassName();
     final String methodName = TestHelper.getTestMethodName();
-    final String clusterName = className + "_" + methodName;
+    final String clusterName = _className + "_" + methodName;
 
     final ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(ZK_ADDR));
@@ -314,9 +319,8 @@ public class TestHandleNewSession extends ZkTestBase {
    */
   @Test
   public void testSessionExpiredWhenResetHandlers() throws Exception {
-    final String className = TestHelper.getTestClassName();
     final String methodName = TestHelper.getTestMethodName();
-    final String clusterName = className + "_" + methodName;
+    final String clusterName = _className + "_" + methodName;
 
     final ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(ZK_ADDR));
@@ -466,6 +470,78 @@ public class TestHandleNewSession extends ZkTestBase {
     deleteCluster(clusterName);
   }
 
+  class MockLiveInstanceChangeListener implements LiveInstanceChangeListener {
+    private final HelixManager _manager;
+    private final Set<String> _expectedLiveInstances;
+
+    public MockLiveInstanceChangeListener(HelixManager manager,
+        Set<String> expectedLiveInstanceNames) {
+      _manager = manager;
+      _expectedLiveInstances = expectedLiveInstanceNames;
+    }
+
+    @Override
+    public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+        NotificationContext changeContext) {
+      if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
+        for (LiveInstance liveInstance : liveInstances) {
+          if (_expectedLiveInstances.contains(liveInstance.getInstanceName())) {
+            try {
+              _manager.addCurrentStateChangeListener(
+                  (CurrentStateChangeListener) (instanceName, statesInfo, currentStateChangeContext)
-> {
+                    // empty callback
+                  }, liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
+            } catch (Exception e) {
+              throw new HelixException("Unexpected exception in the test method.", e);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentInitCallbackHandlers() throws Exception {
+    final String clusterName =
+        CLUSTER_PREFIX + "_" + _className + "_" + TestHelper.getTestMethodName();
+    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+    final String spectatorName = TestHelper.getTestMethodName() + "Spectator";
+    try {
+      BlockingHandleNewSessionZkHelixManager helixManager =
+          new BlockingHandleNewSessionZkHelixManager(clusterName, spectatorName,
+              InstanceType.SPECTATOR, _gZkClient.getServers());
+      helixManager.connect();
+      // Add two mock listeners that will add more callback handlers while handling INIT
or CALLBACK event.
+      // Note that we have to test with 2 separate listeners so one of them has a chance
to fail if
+      // there is a concurrent modification exception.
+      helixManager.addLiveInstanceChangeListener(
+          new MockLiveInstanceChangeListener(helixManager, Collections.singleton("localhost_1")));
+      helixManager.addLiveInstanceChangeListener(
+          new MockLiveInstanceChangeListener(helixManager, Collections.singleton("localhost_2")));
+
+      // Session expire will trigger all callbacks to be init. And the injected liveInstance
+      // listener will trigger more callbackhandlers to be registered during the init process.
+      ZkTestHelper.asyncExpireSession(helixManager.getZkClient());
+      // Create mock live instance znodes to trigger the internal callback handling logic
which will
+      // modify the handler list.
+      setupLiveInstances(clusterName, new int[] { 1, 2 });
+      // Start new session handling so the manager will call the initHandler() for initializing
all
+      // existing handlers.
+      helixManager.proceedNewSessionHandling();
+      // Ensure the new session has been processed.
+      TestHelper.verify(() -> helixManager.getHandleNewSessionEndTime() != 0, 3000);
+      // Verify that both new mock current state callback handlers have been initialized
normally.
+      // Note that if there is concurrent modification that cause errors, one of the callback
will
+      // not be initialized normally.
+      for (CallbackHandler handler : helixManager.getHandlers()) {
+        Assert.assertTrue(handler.isReady(),
+            "CallbackHandler is not initialized as expected. It might be caused by a ConcurrentModificationException");
+      }
+    } finally {
+      TestHelper.dropCluster(clusterName, _gZkClient);
+    }
+  }
+
   static class BlockingHandleNewSessionZkHelixManager extends ZKHelixManager {
     private final Semaphore newSessionHandlingCount = new Semaphore(1);
     private long handleNewSessionStartTime = 0L;
@@ -485,6 +561,8 @@ public class TestHandleNewSession extends ZkTestBase {
     }
 
     void proceedNewSessionHandling() {
+      handleNewSessionStartTime = 0L;
+      handleNewSessionEndTime = 0L;
       newSessionHandlingCount.release();
     }
 


Mime
View raw message