helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [2/5] helix git commit: Add new config name for batch callback handing in CallbackHandler.
Date Fri, 06 Apr 2018 00:04:05 GMT
Add new config name for batch callback handing in CallbackHandler.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/788680d4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/788680d4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/788680d4

Branch: refs/heads/master
Commit: 788680d4bbb913303eb0420343b41e677102e51a
Parents: 1bf6d60
Author: Lei Xia <lxia@linkedin.com>
Authored: Sat Mar 24 19:59:49 2018 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Thu Apr 5 16:41:20 2018 -0700

----------------------------------------------------------------------
 .../helix/manager/zk/CallbackHandler.java       | 80 ++++++++++++--------
 .../helix/TestListenerCallbackBatchMode.java    | 15 +++-
 2 files changed, 61 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/788680d4/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index f6ec59c..958f989 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -99,6 +99,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
   private boolean _batchModeEnabled = false;
   private boolean _preFetchEnabled = true;
   private HelixCallbackMonitor _monitor;
+  private Thread _batchProcessThread;  // TODO: change this to use DedupEventProcessor -
Lei.
 
   /**
    * maintain the expected notification types
@@ -142,9 +143,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     logger.info("isAsyncBatchModeEnabled: " + _batchModeEnabled);
     logger.info("isPreFetchEnabled: " + _preFetchEnabled);
 
-    if (_batchModeEnabled) {
-      new Thread(new CallbackInvoker(this)).start();
-    }
     init();
   }
 
@@ -153,7 +151,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class);
     PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class);
 
-    String asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled");
+    String asyncBatchModeEnabled = System.getProperty("helix.callbackhandler.isAsyncBatchModeEnabled");
+    if (asyncBatchModeEnabled == null) {
+      // for backcompatible, the old property name is deprecated.
+      asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled");
+    }
+
     if (asyncBatchModeEnabled != null) {
       _batchModeEnabled = Boolean.parseBoolean(asyncBatchModeEnabled);
       logger.info("isAsyncBatchModeEnabled by default: " + _batchModeEnabled);
@@ -233,15 +236,19 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     return _path;
   }
 
-  class CallbackInvoker implements Runnable {
-    private CallbackHandler handler;
+  class CallbackProcessor extends Thread {
+    private CallbackHandler _handler;
 
-    CallbackInvoker(CallbackHandler handler) {
-      this.handler = handler;
+    CallbackProcessor(CallbackHandler handler) {
+      super("CallbackHandler-batchprocess");
+      _handler = handler;
     }
 
     public void run() {
-      while (true) {
+      logger.info(
+          "start batch callback handle thread for path:" + _handler.getPath() + ", listener:
"
+              + _listener);
+      while (!Thread.interrupted()) {
         try {
           NotificationContext notificationToProcess = _queue.take();
           int mergedCallbacks = 0;
@@ -257,18 +264,22 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
           }
           try {
             logger.info(
-                "Num callbacks merged for path:" + handler.getPath() + " : " + mergedCallbacks);
-            handler.invoke(notificationToProcess);
+                "Num callbacks merged for path:" + _handler.getPath() + " : " + mergedCallbacks);
+            _handler.invoke(notificationToProcess);
           } catch (Exception e) {
             logger.warn("Exception in callback processing thread. Skipping callback", e);
           }
         } catch (InterruptedException e) {
           logger.warn(
-              "Interrupted exception in callback processing thread. Exiting thread, new callbacks
will not be processed",
-              e);
+              "Interrupted exception in callback processing thread. Exiting thread for listener
"
+                  + _listener + ", new callbacks will not be processed", e);
           break;
         }
       }
+
+      logger.warn(
+          "Exiting batch callback processing thread for listener : " + _listener + ", path:
"
+              + _path);
     }
   }
 
@@ -287,31 +298,30 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
   }
 
   public void invoke(NotificationContext changeContext) throws Exception {
+    Type type = changeContext.getType();
+    long start = System.currentTimeMillis();
+
     // This allows the listener to work with one change at a time
     synchronized (_manager) {
-      Type type = changeContext.getType();
-      if (!_expectTypes.contains(type)) {
-        logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " +
_path
-            + ", expected types: " + _expectTypes + " but was " + type);
-
-        return;
-      }
-      _expectTypes = nextNotificationType.get(type);
-
-      // Builder keyBuilder = _accessor.keyBuilder();
-      long start = System.currentTimeMillis();
       if (logger.isInfoEnabled()) {
         logger.info(
             Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:" + _listener
-                .getClass().getCanonicalName());
+                + " type: " + type);
       }
 
+      if (!_expectTypes.contains(type)) {
+        logger.warn(
+            "Callback handler received event in wrong order. Listener: " + _listener + ",
path: "
+                + _path + ", expected types: " + _expectTypes + " but was " + type);
+        return;
+      }
+      _expectTypes = nextNotificationType.get(type);
+
       if (_changeType == IDEAL_STATE) {
         IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true);
         List<IdealState> idealStates = preFetch(_propertyKey);
         idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
-
       } else if (_changeType == ChangeType.INSTANCE_CONFIG) {
         subscribeForChanges(changeContext, _path, true);
         if (_listener instanceof ConfigChangeListener) {
@@ -380,15 +390,16 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
         ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
         subscribeForChanges(changeContext, _path, false);
         controllerChangelistener.onControllerChange(changeContext);
+      } else {
+        logger.warn("Unknown change type: " + _changeType);
       }
 
       long end = System.currentTimeMillis();
       if (logger.isInfoEnabled()) {
         logger.info(
             Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:" + _listener
-                .getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
+                + " type: " + type + " Took: " + (end - start) + "ms");
       }
-
       if (_monitor != null) {
         _monitor.increaseCallbackCounters(end - start);
       }
@@ -424,7 +435,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
           _manager.getInstanceName() + " subscribe data-change. path: " + path + ", listener:
"
               + _listener);
       _zkClient.subscribeDataChanges(path, this);
-
     } else if (type == NotificationContext.Type.FINALIZE) {
       logger.info(
           _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener:
"
@@ -462,9 +472,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
           case CURRENT_STATE:
           case IDEAL_STATE:
           case EXTERNAL_VIEW:
-            case TARGET_EXTERNAL_VIEW:{
+          case TARGET_EXTERNAL_VIEW: {
             // check if bucketized
-            BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+            BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
             List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
             for (ZNRecord record : records) {
               HelixProperty property = new HelixProperty(record);
@@ -522,6 +532,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
    * exists
    */
   public void init() {
+    if (_batchModeEnabled) {
+      _batchProcessThread = new CallbackProcessor(this);
+      _batchProcessThread.start();
+    }
+
     updateNotificationTime(System.nanoTime());
     try {
       NotificationContext changeContext = new NotificationContext(_manager);
@@ -621,6 +636,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
       changeContext.setType(NotificationContext.Type.FINALIZE);
       changeContext.setChangeType(_changeType);
       enqueueTask(changeContext);
+      if (_batchProcessThread != null) {
+        _batchProcessThread.interrupt();
+      }
     } catch (Exception e) {
       String msg = "Exception while resetting the listener:" + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);

http://git-wip-us.apache.org/repos/asf/helix/blob/788680d4/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
index 8cc3b0f..8c6d8a0 100644
--- a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
+++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
@@ -145,7 +145,7 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
 
     System.setProperty("isAsyncBatchModeEnabled", "true");
 
-    final Listener listener = new Listener();
+    Listener listener = new Listener();
     addListeners(listener);
     updateConfigs();
     verifyBatchedListeners(listener);
@@ -153,6 +153,16 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
     System.setProperty("isAsyncBatchModeEnabled", "false");
     removeListeners(listener);
 
+    System.setProperty("helix.callbackhandler.isAsyncBatchModeEnabled", "true");
+
+    listener = new Listener();
+    addListeners(listener);
+    updateConfigs();
+    verifyBatchedListeners(listener);
+
+    System.setProperty("helix.callbackhandler.isAsyncBatchModeEnabled", "false");
+    removeListeners(listener);
+
     System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
   }
 
@@ -221,8 +231,7 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
     Thread.sleep(50);
     Assert.assertTrue(result,
         "instance callbacks: " + listener._instanceConfigChangedCount + ", idealstate callbacks
"
-            + listener._idealStateChangedCount + "\ninstance count: " + _numNode
-            + ", idealstate counts: " + _numResource);
+            + listener._idealStateChangedCount + "\ninstance count: " + _numNode + ", idealstate
counts: " + _numResource);
   }
 
   private void verifyBatchedListeners(Listener batchListener) throws InterruptedException
{


Mime
View raw message