helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: Fix race-condition issue that could block ZkClient event thread in CallbackHandler.
Date Thu, 19 Apr 2018 18:39:06 GMT
Repository: helix
Updated Branches:
  refs/heads/master f99d9477f -> 82cfd15b5


Fix race-condition issue that could block ZkClient event thread in CallbackHandler.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/82cfd15b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/82cfd15b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/82cfd15b

Branch: refs/heads/master
Commit: 82cfd15b5b324674f4f5c3a50e156378460228c5
Parents: f99d947
Author: Lei Xia <lxia@linkedin.com>
Authored: Mon Apr 16 11:38:26 2018 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Thu Apr 19 11:36:17 2018 -0700

----------------------------------------------------------------------
 .../helix/common/DedupEventProcessor.java       |   4 +-
 .../helix/manager/zk/CallbackHandler.java       | 205 ++++++++++---------
 .../manager/zk/zookeeper/ZkEventThread.java     |   2 +
 3 files changed, 109 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/82cfd15b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
index b656364..7f3525b 100644
--- a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
@@ -52,7 +52,7 @@ public abstract class DedupEventProcessor<T, E> extends Thread {
         logger.error(_processorName + " thread failed while running the controller pipeline",
t);
       }
     }
-    logger.info("END " + _processorName + " thread");
+    logger.info("END " + _processorName + " thread for cluster " + _clusterName);
   }
 
   protected abstract void handleEvent(E event);
@@ -62,7 +62,7 @@ public abstract class DedupEventProcessor<T, E> extends Thread {
   }
 
   public void shutdown() {
-    _eventQueue.clear();
     this.interrupt();
+    _eventQueue.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/82cfd15b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 22a1a46..5890fb8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -27,8 +27,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -87,6 +85,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
   }
 
+  // processor to handle async zk event resubscription.
+  private static DedupEventProcessor SubscribeChangeEventProcessor;
+
   private final String _path;
   private final Object _listener;
   private final Set<EventType> _eventTypes;
@@ -96,24 +97,36 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
   private final AtomicLong _lastNotificationTimeStamp;
   private final HelixManager _manager;
   private final PropertyKey _propertyKey;
-  BlockingQueue<NotificationContext> _queue = new LinkedBlockingQueue<>(1000);
   private boolean _batchModeEnabled = false;
   private boolean _preFetchEnabled = true;
   private HelixCallbackMonitor _monitor;
-  private Thread _batchProcessThread;  // TODO: change this to use DedupEventProcessor -
Lei.
+
+  // TODO: make this be per _manager or per _listener instaed of per callbackHandler -- Lei
+  private CallbackProcessor _batchCallbackProcessor;
   private boolean _watchChild = true;  // Whether we should subscribe to the child znode's
data change.
-  private static DedupEventProcessor SubscribeChangeEventProcessor;
+
+  // indicated whether this CallbackHandler is ready to serve event callback from ZkClient.
+  private boolean _ready = false;
 
   static {
     SubscribeChangeEventProcessor =
-        new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>("",
+        new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>("Singleton",
             "CallbackHanlder-AsycSubscribe") {
-          @Override protected void handleEvent(SubscribeChangeEvent event) {
-            logger.info("Resubscribe change to " + event.path + " for listener " + event.listener);
+          @Override
+          protected void handleEvent(SubscribeChangeEvent event) {
+            logger.info("Resubscribe change listener to path: " + event.path + ", for listener:
"
+                + event.listener + ", watchChild: " + event.watchChild);
             try {
-              event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild);
+              if (event.handler.isReady()) {
+                event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild);
+              } else {
+                logger.info(
+                    "CallbackHandler is not ready, stop subscribing changes listener to path:
"
+                        + event.path + ", for listener: " + event.listener + ", watchChild:
"
+                        + event.watchChild);
+              }
             } catch (Exception e) {
-              logger.error("Failed to resubscribe change to " + event.path + " for listener
"
+              logger.error("Failed to resubscribe change to path: " + event.path + " for
listener "
                   + event.listener, e);
             }
           }
@@ -139,6 +152,25 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     }
   }
 
+  class CallbackProcessor
+      extends DedupEventProcessor<NotificationContext.Type, NotificationContext> {
+    private CallbackHandler _handler;
+
+    public CallbackProcessor(CallbackHandler handler) {
+      super(_manager.getClusterName(), "CallbackProcessor");
+      _handler = handler;
+    }
+
+    @Override
+    protected void handleEvent(NotificationContext event) {
+      try {
+        _handler.invoke(event);
+      } catch (Exception e) {
+        logger.warn("Exception in callback processing thread. Skipping callback", e);
+      }
+    }
+  }
+
   /**
    * maintain the expected notification types
    * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
@@ -172,7 +204,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     _eventTypes = new HashSet<>(Arrays.asList(eventTypes));
     _changeType = changeType;
     _lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
-    _queue = new LinkedBlockingQueue<>(1000);
     _monitor = monitor;
 
     if (_changeType == MESSAGE || _changeType == MESSAGES_CONTROLLER || _changeType == CONTROLLER)
{
@@ -183,9 +214,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
 
     parseListenerProperties();
 
-    logger.info("isAsyncBatchModeEnabled: " + _batchModeEnabled);
-    logger.info("isPreFetchEnabled: " + _preFetchEnabled);
-
     init();
   }
 
@@ -194,9 +222,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class);
     PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class);
 
-    String asyncBatchModeEnabled = System.getProperty("helix.callbackhandler.isAsyncBatchModeEnabled");
+    String asyncBatchModeEnabled =
+        System.getProperty("helix.callbackhandler.isAsyncBatchModeEnabled");
     if (asyncBatchModeEnabled == null) {
-      // for backcompatible, the old property name is deprecated.
+      // for back-compatible, the old property name is deprecated.
       asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled");
     }
 
@@ -279,62 +308,22 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     return _path;
   }
 
-  class CallbackProcessor extends Thread {
-    private CallbackHandler _handler;
-
-    CallbackProcessor(CallbackHandler handler) {
-      super("CallbackHandler-batchprocess");
-      _handler = handler;
-    }
-
-    public void run() {
-      logger.info(
-          "start batch callback handle thread for path:" + _handler.getPath() + ", listener:
"
-              + _listener);
-      while (!Thread.interrupted()) {
-        try {
-          NotificationContext notificationToProcess = _queue.take();
-          int mergedCallbacks = 0;
-          // remove all elements in the queue that have the same type
-          while (true) {
-            NotificationContext nextItem = _queue.peek();
-            if (nextItem != null && notificationToProcess.getType() == nextItem.getType())
{
-              notificationToProcess = _queue.take();
-              mergedCallbacks++;
-            } else {
-              break;
-            }
-          }
-          try {
-            logger.info(
-                "Num callbacks merged for path:" + _handler.getPath() + " : " + mergedCallbacks);
-            _handler.invoke(notificationToProcess);
-          } catch (Exception e) {
-            logger.warn("Exception in callback processing thread. Skipping callback", e);
-          }
-        } catch (InterruptedException e) {
-          logger.warn(
-              "Interrupted exception in callback processing thread. Exiting thread for listener
"
-                  + _listener + ", new callbacks will not be processed", e);
-          break;
-        }
-      }
-
-      logger.warn(
-          "Exiting batch callback processing thread for listener : " + _listener + ", path:
"
-              + _path);
-    }
-  }
-
   public void enqueueTask(NotificationContext changeContext)
       throws Exception {
     //async mode only applicable to CALLBACK from ZK, During INIT and FINALIZE invoke the
callback's immediately.
     if (_batchModeEnabled && changeContext.getType() == NotificationContext.Type.CALLBACK)
{
       logger.debug("Enqueuing callback");
-      _queue.put(changeContext);
+      if (!isReady()) {
+        logger.info(
+            "CallbackHandler is not ready, ignore change callback from path: "
+                + _path + ", for listener: " + _listener);
+      } else {
+        _batchCallbackProcessor.queueEvent(changeContext.getType(), changeContext);
+      }
     } else {
       invoke(changeContext);
     }
+
     if (_monitor != null) {
       _monitor.increaseCallbackUnbatchedCounters();
     }
@@ -367,7 +356,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
         // put SubscribeForChange run in async thread to reduce the latency of zk callback
handling.
         subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
       }
-      _expectTypes = nextNotificationType.get(type);
 
       if (_changeType == IDEAL_STATE) {
         IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
@@ -464,11 +452,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
       }
       _zkClient.subscribeChildChanges(path, this);
     } else if (callbackType == NotificationContext.Type.FINALIZE) {
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ",
listener: "
-                + _listener);
-      }
+      logger.info(
+          _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener:
" + _listener);
+
       _zkClient.unsubscribeChildChanges(path, this);
     }
   }
@@ -483,11 +469,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
       }
       _zkClient.subscribeDataChanges(path, this);
     } else if (callbackType == NotificationContext.Type.FINALIZE) {
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener:
"
-                + _listener);
-      }
+      logger.info(
+          _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener:
"
+              + _listener);
+
       _zkClient.unsubscribeDataChanges(path, this);
     }
   }
@@ -501,21 +486,23 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
 
   private void subscribeForChanges(NotificationContext.Type callbackType, String path,
       boolean watchChild) {
+    logger.info(
+        "Subscribing changes listener to path: " + path + ", type: " + callbackType + ",
listener: "
+            + _listener);
+
     long start = System.currentTimeMillis();
     if (_eventTypes.contains(EventType.NodeDataChanged) || _eventTypes
         .contains(EventType.NodeCreated) || _eventTypes.contains(EventType.NodeDeleted))
{
-      logger.info("Subscribing data change listener to path:" + path + " for listener: "
+ _listener);
+      logger.info("Subscribing data change listener to path: " + path);
       subscribeDataChange(path, callbackType);
     }
 
     if (_eventTypes.contains(EventType.NodeChildrenChanged)) {
-      logger.info(
-          "Subscribing child change listener to path:" + path + " for listener: " + _listener);
+      logger.info("Subscribing child change listener to path:" + path);
       subscribeChildChange(path, callbackType);
       if (watchChild) {
-        logger.info(
-            "Subscribing data change listener to all children for path:" + path + " for listener:
"
-                + _listener);
+        logger.info("Subscribing data change listener to all children for path:" + path);
+
         try {
           switch (_changeType) {
           case CURRENT_STATE:
@@ -581,9 +568,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
    * exists
    */
   public void init() {
+    logger.info("initializing CallbackHandler: " + this.toString() + " content: " + getContent());
+
     if (_batchModeEnabled) {
-      _batchProcessThread = new CallbackProcessor(this);
-      _batchProcessThread.start();
+      if (_batchCallbackProcessor != null) {
+        _batchCallbackProcessor.shutdown();
+      }
+      _batchCallbackProcessor = new CallbackProcessor(this);
+      _batchCallbackProcessor.start();
     }
 
     updateNotificationTime(System.nanoTime());
@@ -591,7 +583,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.INIT);
       changeContext.setChangeType(_changeType);
-      enqueueTask(changeContext);
+      _ready = true;
+      invoke(changeContext);
     } catch (Exception e) {
       String msg = "Exception while invoking init callback for listener:" + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
@@ -628,18 +621,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     try {
       updateNotificationTime(System.nanoTime());
       if (dataPath != null && dataPath.startsWith(_path)) {
-        if (logger.isDebugEnabled()) {
-          logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath
-              + ", listener: " + _listener);
-        }
+        logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath
+            + ", listener: " + _listener);
         _zkClient.unsubscribeDataChanges(dataPath, this);
 
         // only needed for bucketized parent, but OK if we don't have child-change
         // watch on the bucketized parent path
-        if (logger.isDebugEnabled()) {
-          logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " +
dataPath
-              + ", listener: " + _listener);
-        }
+        logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath
+            + ", listener: " + _listener);
         _zkClient.unsubscribeChildChanges(dataPath, this);
         // No need to invoke() since this event will handled by child-change on parent-node
       }
@@ -654,10 +643,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
   public void handleChildChange(String parentPath, List<String> currentChilds) {
     if (logger.isDebugEnabled()) {
       logger.debug(
-          "Data change callback: child changed, path: " + parentPath + ", current child count:
" + (
-              currentChilds != null
-                  ? currentChilds.size()
-                  : 0));
+          "Data change callback: child changed, path: " + parentPath + ", current child count:
"
+              + currentChilds.size());
     }
 
     try {
@@ -687,14 +674,16 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
    * Invoke the listener for the last time so that the listener could clean up resources
    */
   public void reset() {
+    logger.info("Resetting CallbackHandler: " + this.toString());
     try {
+      _ready = false;
+      if (_batchCallbackProcessor != null) {
+        _batchCallbackProcessor.shutdown();
+      }
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.FINALIZE);
       changeContext.setChangeType(_changeType);
-      enqueueTask(changeContext);
-      if (_batchProcessThread != null) {
-        _batchProcessThread.interrupt();
-      }
+      invoke(changeContext);
     } catch (Exception e) {
       String msg = "Exception while resetting the listener:" + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
@@ -713,4 +702,20 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     }
   }
 
+  public boolean isReady() {
+    return _ready;
+  }
+
+  public String getContent() {
+    return "CallbackHandler{" +
+        "_watchChild=" + _watchChild +
+        ", _preFetchEnabled=" + _preFetchEnabled +
+        ", _batchModeEnabled=" + _batchModeEnabled +
+        ", _path='" + _path + '\'' +
+        ", _listener=" + _listener +
+        ", _changeType=" + _changeType +
+        ", _manager=" + _manager +
+        ", _zkClient=" + _zkClient +
+        '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/82cfd15b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
index a16f27c..8572191 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
@@ -82,6 +82,8 @@ public class ZkEventThread extends Thread {
     } catch (InterruptedException e) {
       LOG.info("Terminate ZkClient event thread.");
     }
+
+    LOG.info("Terminate ZkClient event thread.");
   }
 
   public void send(ZkEvent event) {


Mime
View raw message