helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] helix git commit: CallbackHandler to use either java config property or class annotation to enable batch callback handling.
Date Thu, 08 Mar 2018 19:20:57 GMT
Repository: helix
Updated Branches:
  refs/heads/master d602da9dd -> cdc3b8d60


CallbackHandler to use either java config property or class annotation to enable batch callback
handling.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2dbd88ff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2dbd88ff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2dbd88ff

Branch: refs/heads/master
Commit: 2dbd88ffed2eb84697225a58e8831b740ecd5919
Parents: d602da9
Author: Lei Xia <lxia@linkedin.com>
Authored: Fri Mar 2 14:20:14 2018 -0800
Committer: Lei Xia <lxia@linkedin.com>
Committed: Fri Mar 2 14:51:44 2018 -0800

----------------------------------------------------------------------
 .../helix/manager/zk/CallbackHandler.java       |   7 ++
 .../helix/manager/zk/ZkCallbackCache.java       |   2 +-
 .../helix/TestListenerCallbackBatchMode.java    | 110 ++++++++++++++-----
 3 files changed, 92 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 701cf9f..42adade 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -152,6 +153,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class);
     PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class);
 
+    String asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled");
+    if (asyncBatchModeEnabled != null) {
+      _batchModeEnabled = Boolean.parseBoolean(asyncBatchModeEnabled);
+      logger.info("isAsyncBatchModeEnabled by default: " + _batchModeEnabled);
+    }
+
     if (batchMode != null) {
       _batchModeEnabled = batchMode.enabled();
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
index 530d9d9..5b82242 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
@@ -57,7 +57,7 @@ public class ZkCallbackCache<T> extends Cache<T> implements
IZkChildListener, IZ
     _accessor = accessor;
     _chrootPath = chrootPath;
 
-    _listener = new ConcurrentHashMap<String, Set<HelixPropertyListener>>();
+    _listener = new ConcurrentHashMap<>();
     _eventThread = eventThread;
 
     // init cache

http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
index 328098c..8cc3b0f 100644
--- a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
+++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
@@ -49,7 +49,8 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
       }
     }
 
-    @Override public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
+    @Override
+    public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
         NotificationContext context) {
       if (context.getType().equals(NotificationContext.Type.CALLBACK)) {
         _instanceConfigChangedCount++;
@@ -80,6 +81,14 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
     }
   }
 
+  @BatchMode (enabled = false)
+  class BatchDisableddListener extends Listener {
+    @Override
+    public void onIdealStateChange(List<IdealState> idealState, NotificationContext
changeContext) {
+      super.onIdealStateChange(idealState, changeContext);
+    }
+  }
+
 
   private HelixManager _manager;
   private int _numNode = 8;
@@ -122,21 +131,45 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
 
     final Listener listener = new Listener();
     addListeners(listener);
+    updateConfigs();
+    verifyNonbatchedListeners(listener);
+    removeListeners(listener);
 
+    System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test (dependsOnMethods = {"testNonBatchedListener", "testBatchedListener", "testMixedListener"})
+  public void testEnableBatchedListenerByJavaProperty() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    System.setProperty("isAsyncBatchModeEnabled", "true");
+
+    final Listener listener = new Listener();
+    addListeners(listener);
     updateConfigs();
+    verifyBatchedListeners(listener);
 
-    Boolean result = TestHelper.verify(new TestHelper.Verifier() {
-      @Override public boolean verify() {
-        return (listener._instanceConfigChangedCount == _numNode) && (
-            listener._idealStateChangedCount == _numResource);
-      }
-    }, 12000);
+    System.setProperty("isAsyncBatchModeEnabled", "false");
+    removeListeners(listener);
 
-    Thread.sleep(50);
+    System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
 
-    Assert.assertTrue(result,
-        "non batched: instance: " + listener._instanceConfigChangedCount + ", idealstate:
"
-            + listener._idealStateChangedCount + "\nbatched: instance: ");
+  @Test (dependsOnMethods = {"testNonBatchedListener", "testBatchedListener", "testMixedListener"})
+  public void testDisableBatchedListenerByAnnotation() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    System.setProperty("isAsyncBatchModeEnabled", "true");
+
+    final Listener listener = new BatchDisableddListener();
+    addListeners(listener);
+    updateConfigs();
+    verifyNonbatchedListeners(listener);
+
+    System.setProperty("isAsyncBatchModeEnabled", "false");
+    removeListeners(listener);
 
     System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -148,17 +181,9 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
 
     final BatchedListener batchListener = new BatchedListener();
     addListeners(batchListener);
-
     updateConfigs();
-
-    Thread.sleep(4000);
-
-    boolean result = (batchListener._instanceConfigChangedCount < _numNode/2) &&
(
-        batchListener._idealStateChangedCount < _numResource/2);
-
-    Assert.assertTrue(result,
-        "batched: instance: " + batchListener._instanceConfigChangedCount + ", idealstate:
"
-            + batchListener._idealStateChangedCount);
+    verifyBatchedListeners(batchListener);
+    removeListeners(batchListener);
 
     System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -170,26 +195,59 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
 
     final MixedListener mixedListener = new MixedListener();
     addListeners(mixedListener);
-
     updateConfigs();
 
     Thread.sleep(4000);
-
     boolean result = (mixedListener._instanceConfigChangedCount == _numNode) && (
         mixedListener._idealStateChangedCount < _numResource/2);
 
-    Assert.assertTrue(result,
-        "Mixed: instance: " + mixedListener._instanceConfigChangedCount + ", idealstate:
"
-            + mixedListener._idealStateChangedCount);
+    Assert.assertTrue(result, "instance callbacks: " + mixedListener._instanceConfigChangedCount
+        + ", idealstate callbacks " + mixedListener._idealStateChangedCount + "\ninstance
count: "
+        + _numNode + ", idealstate counts: " + _numResource);
+
+    removeListeners(mixedListener);
 
     System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  private void verifyNonbatchedListeners(final Listener listener) throws Exception {
+    Boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return (listener._instanceConfigChangedCount == _numNode) && (
+            listener._idealStateChangedCount == _numResource);
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result,
+        "instance callbacks: " + listener._instanceConfigChangedCount + ", idealstate callbacks
"
+            + listener._idealStateChangedCount + "\ninstance count: " + _numNode
+            + ", idealstate counts: " + _numResource);
+  }
+
+  private void verifyBatchedListeners(Listener batchListener) throws InterruptedException
{
+    Thread.sleep(3000);
+    boolean result = (batchListener._instanceConfigChangedCount < _numNode / 2) &&
(
+        batchListener._idealStateChangedCount < _numResource / 2);
+
+    Assert.assertTrue(result, "instance callbacks: " + batchListener._instanceConfigChangedCount
+        + ", idealstate callbacks " + batchListener._idealStateChangedCount + "\ninstance
count: "
+        + _numNode + ", idealstate counts: " + _numResource);
+
+  }
+
   private void addListeners(Listener listener) throws Exception {
     _manager.addInstanceConfigChangeListener(listener);
     _manager.addIdealStateChangeListener(listener);
   }
 
+  private void removeListeners(Listener listener) throws Exception {
+    _manager.removeListener(new PropertyKey.Builder(_manager.getClusterName()).instanceConfigs(),
+        listener);
+    _manager
+        .removeListener(new PropertyKey.Builder(_manager.getClusterName()).idealStates(),
listener);
+  }
+
   private void updateConfigs() throws InterruptedException {
     final Random r = new Random(System.currentTimeMillis());
     // test change content


Mime
View raw message