helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [helix] branch master updated: Fix the watcher leakage issue (#688)
Date Fri, 07 Feb 2020 23:56:44 GMT
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 85f246a  Fix the watcher leakage issue (#688)
85f246a is described below

commit 85f246a65348914eee4cd1097e492cf6e50d12af
Author: Yi Wang <i3.wangyi@gmail.com>
AuthorDate: Fri Feb 7 15:56:34 2020 -0800

    Fix the watcher leakage issue (#688)
    
    Fix the zk watcher leakage issue when znode is deleted
        - Leverage the EventType (NodeDeleted) to verify if the path is deleted
        - If it's deleted, no need to re-install the watcher
---
 .../helix/manager/zk/zookeeper/ZkClient.java       |  38 ++---
 .../controller/TestWatcherLeakageOnController.java | 102 ++++++++++++++
 .../org/apache/helix/manager/zk/TestZKWatch.java   | 153 +++++++++++++++++++++
 .../helix/manager/zk/client/TestHelixZkClient.java |   2 +
 4 files changed, 278 insertions(+), 17 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index a28ea83..453c8c3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -226,6 +226,12 @@ public class ZkClient implements Watcher {
     }
   }
 
+  /**
+   * Subscribe the path and the listener will handle data events of the path
+   * WARNING: if the path is created after deletion, users need to re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   */
   public void subscribeDataChanges(String path, IZkDataListener listener) {
     Set<IZkDataListenerEntry> listenerEntries;
     synchronized (_dataListener) {
@@ -903,12 +909,8 @@ public class ZkClient implements Watcher {
         || event.getType() == Event.EventType.NodeDeleted
         || event.getType() == Event.EventType.NodeCreated
         || event.getType() == Event.EventType.NodeChildrenChanged;
-
-    if (event.getType() == Event.EventType.NodeDeleted) {
-      if (LOG.isDebugEnabled()) {
-        String path = event.getPath();
-        LOG.debug(path);
-      }
+    if (event.getType() == EventType.NodeDeleted) {
+      LOG.debug("Path {} is deleted", event.getPath());
     }
 
     getEventLock().lock();
@@ -959,11 +961,12 @@ public class ZkClient implements Watcher {
   }
 
   private void fireAllEvents() {
+    //TODO: During handling new session, if the path is deleted, watcher leakage could still
happen
     for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet())
{
-      fireChildChangedEvents(entry.getKey(), entry.getValue());
+      fireChildChangedEvents(entry.getKey(), entry.getValue(), true);
     }
     for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet())
{
-      fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty());
+      fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true);
     }
   }
 
@@ -1248,6 +1251,7 @@ public class ZkClient implements Watcher {
 
   private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
     final String path = event.getPath();
+    final boolean pathExists = event.getType() != EventType.NodeDeleted;
 
     if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated
         || event.getType() == EventType.NodeDeleted) {
@@ -1255,7 +1259,7 @@ public class ZkClient implements Watcher {
       if (childListeners != null && !childListeners.isEmpty()) {
         // TODO recording child changed event propagation latency as well. Note this change
will
         // introduce additional ZK access.
-        fireChildChangedEvents(path, childListeners);
+        fireChildChangedEvents(path, childListeners, pathExists);
       }
     }
 
@@ -1263,13 +1267,13 @@ public class ZkClient implements Watcher {
         || event.getType() == EventType.NodeCreated) {
       Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
       if (listeners != null && !listeners.isEmpty()) {
-        fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime));
+        fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
pathExists);
       }
     }
   }
 
   private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners,
-      final OptionalLong notificationTime) {
+      final OptionalLong notificationTime, boolean pathExists) {
     try {
       final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       // Trigger listener callbacks
@@ -1278,12 +1282,11 @@ public class ZkClient implements Watcher {
             + listener.getDataListener() + " prefetch data: " + listener.isPrefetchData())
{
           @Override
           public void run() throws Exception {
-            // Reinstall watch before listener callbacks to check the znode status
             if (!pathStatRecord.pathChecked()) {
-              pathStatRecord.recordPathStat(getStat(path, true), notificationTime);
+              // getStat will re-install watcher only when the path exists
+              pathStatRecord.recordPathStat(getStat(path, pathExists), notificationTime);
             }
             if (!pathStatRecord.pathExists()) {
-              // no znode found at the path, trigger data deleted handler.
               listener.getDataListener().handleDataDeleted(path);
             } else {
               Object data = null;
@@ -1292,6 +1295,7 @@ public class ZkClient implements Watcher {
                   LOG.debug("Prefetch data for path: {}", path);
                 }
                 try {
+                  // TODO: the data is redundantly read multiple times when multiple listeners
exist
                   data = readData(path, null, true);
                 } catch (ZkNoNodeException e) {
                   LOG.warn("Prefetch data for path: {} failed.", path, e);
@@ -1309,21 +1313,21 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners)
{
+  private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners,
boolean pathExists) {
     try {
       final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       for (final IZkChildListener listener : childListeners) {
         _eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener)
{
           @Override
           public void run() throws Exception {
-            // Reinstall watch before listener callbacks to check the znode status
             if (!pathStatRecord.pathChecked()) {
-              pathStatRecord.recordPathStat(getStat(path, hasListeners(path)),
+              pathStatRecord.recordPathStat(getStat(path, hasListeners(path) && pathExists),
                   OptionalLong.empty());
             }
             List<String> children = null;
             if (pathStatRecord.pathExists()) {
               try {
+                //TODO: duplicate reads when multiple child listener exists
                 children = getChildren(path);
               } catch (ZkNoNodeException e) {
                 LOG.warn("Get children under path: {} failed.", path, e);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestWatcherLeakageOnController.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestWatcherLeakageOnController.java
new file mode 100644
index 0000000..5fdca52
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestWatcherLeakageOnController.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.controller;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestWatcherLeakageOnController extends ZkTestBase {
+  private final String CLASS_NAME = getShortClassName();
+  private final String TEST_RESOURCE = "TestResource";
+  private final String CLUSTER_NAME = "TestCluster-" + CLASS_NAME;
+  private ZkHelixClusterVerifier _clusterVerifier;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, "TestInstance");
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_RESOURCE, 10, "MasterSlave");
+  }
+
+  @AfterClass
+  public void afterClass() {
+    deleteCluster(CLUSTER_NAME);
+  }
+
+  @Test
+  public void testWatcherOnResourceDeletion() throws Exception {
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "TestController");
+    controller.syncStart();
+    Map<String, List<String>> zkWatches = ZkTestHelper.getZkWatch(controller.getZkClient());
+
+    List<String> dataWatchesBefore = zkWatches.get("dataWatches");
+
+    _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, TEST_RESOURCE);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    zkWatches = ZkTestHelper.getZkWatch(controller.getZkClient());
+    List<String> dataWatchesAfter = zkWatches.get("dataWatches");
+
+    Assert.assertEquals(dataWatchesBefore.size() - dataWatchesAfter.size(), 1);
+    dataWatchesBefore.removeAll(dataWatchesAfter);
+    // The data watch on [/TestCluster-TestWatcherLeakageOnController/IDEALSTATES/TestResource]
should be removed
+    Assert.assertTrue(dataWatchesBefore.get(0).contains(TEST_RESOURCE));
+
+    controller.syncStop();
+  }
+
+  @Test
+  public void testWatcherOnResourceAddition() throws Exception {
+    String tmpResource = "tmpResource";
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "TestController");
+    controller.syncStart();
+    Map<String, List<String>> zkWatches = ZkTestHelper.getZkWatch(controller.getZkClient());
+
+    List<String> dataWatchesBefore = zkWatches.get("dataWatches");
+
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, tmpResource, 10, "MasterSlave");
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    zkWatches = ZkTestHelper.getZkWatch(controller.getZkClient());
+    List<String> dataWatchesAfter = zkWatches.get("dataWatches");
+
+    Assert.assertEquals(dataWatchesAfter.size() - dataWatchesBefore.size(), 1);
+    dataWatchesAfter.removeAll(dataWatchesBefore);
+    // The data watch on [/TestCluster-TestWatcherLeakageOnController/IDEALSTATES/tmpResource]
should be added
+    Assert.assertTrue(dataWatchesAfter.get(0).contains(tmpResource));
+
+    controller.syncStop();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java
new file mode 100644
index 0000000..9d247d4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java
@@ -0,0 +1,153 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKWatch extends ZkUnitTestBase {
+  private ZkClient _zkClient;
+
+  @BeforeClass
+  public void beforeClass() {
+    _zkClient = new ZkClient(ZK_ADDR);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _zkClient.close();
+  }
+
+  @Test
+  public void testSubscribeDataChange() throws Exception {
+    String existPath = "/existPath";
+    _zkClient.createPersistent(existPath);
+    final CountDownLatch deleteCondition = new CountDownLatch(1);
+    final IZkDataListener dataListener = new IZkDataListener() {
+      @Override
+      public void handleDataChange(String s, Object o) throws Exception {
+
+      }
+
+      @Override
+      public void handleDataDeleted(String path) throws Exception {
+        deleteCondition.countDown();
+        _zkClient.unsubscribeDataChanges(path, this);
+      }
+    };
+    _zkClient.subscribeDataChanges(existPath, dataListener);
+
+    Assert.assertEquals(_zkClient.numberOfListeners(), 1);
+    Map<String, List<String>> zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+    Assert.assertEquals(zkWatch.get("dataWatches").size(), 1);
+    Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+    // remove the zk node, the NodeDeleted event will be processed
+    _zkClient.delete(existPath);
+    deleteCondition.await();
+    zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+    Assert.assertEquals(zkWatch.get("dataWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+
+    Assert.assertEquals(_zkClient.numberOfListeners(), 0);
+  }
+
+  @Test(dependsOnMethods = "testSubscribeDataChange")
+  public void testSubscribeChildChange() throws Exception {
+    String parentPath = "/tmp";
+    String childPath = parentPath + "/childNode";
+    _zkClient.createPersistent(childPath, true);
+    final CountDownLatch deleteCondition = new CountDownLatch(1);
+
+    IZkChildListener childListener = new IZkChildListener() {
+      @Override
+      public void handleChildChange(String parentPath, List<String> childrenPaths)
throws Exception {
+        deleteCondition.countDown();
+        _zkClient.unsubscribeChildChanges(parentPath, this);
+      }
+    };
+    _zkClient.subscribeChildChanges(parentPath, childListener);
+    Map<String, List<String>> zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+    Assert.assertEquals(zkWatch.get("dataWatches").size(), 1);
+    Assert.assertEquals(zkWatch.get("dataWatches").get(0), parentPath);
+    Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("childWatches").size(), 1);
+    Assert.assertEquals(zkWatch.get("childWatches").get(0), parentPath);
+
+    // Delete the child node
+    _zkClient.delete(childPath);
+
+    deleteCondition.await();
+    zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+    // Expectation: the child listener should still exist
+    Assert.assertEquals(zkWatch.get("dataWatches").size(), 1);
+    Assert.assertEquals(zkWatch.get("dataWatches").get(0), parentPath);
+    Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("childWatches").size(), 1);
+    Assert.assertEquals(zkWatch.get("childWatches").get(0), parentPath);
+    Assert.assertEquals(_zkClient.numberOfListeners(), 0);
+
+    // delete the parent path
+    _zkClient.delete(parentPath);
+    zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+    Assert.assertEquals(zkWatch.get("dataWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+  }
+
+  @Test(dependsOnMethods = "testSubscribeChildChange")
+  public void testSubscribeDataChangeOnNonExistPath() throws Exception {
+    String nonExistPath = "/nonExistPath";
+    IZkDataListener dataListener = new IZkDataListener() {
+      @Override
+      public void handleDataChange(String s, Object o) throws Exception {
+
+      }
+
+      @Override
+      public void handleDataDeleted(String s) throws Exception {
+
+      }
+    };
+    _zkClient.subscribeDataChanges(nonExistPath, dataListener);
+    Map<String, List<String>> zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+    Assert.assertEquals(zkWatch.get("dataWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("existWatches").size(), 1);
+    Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+    // cleanup (unsubscribe will not clean up the watcher on ZK server
+    _zkClient.unsubscribeDataChanges(nonExistPath, dataListener);
+    zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+    Assert.assertEquals(zkWatch.get("dataWatches").size(), 0);
+    Assert.assertEquals(zkWatch.get("existWatches").size(), 1);
+    Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
index f9c8acb..796d393 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
@@ -163,6 +163,8 @@ public class TestHelixZkClient extends ZkUnitTestBase {
       // expected to be here.
     }
 
+    // client B needs to re-install the data watch
+    sharedZkClientB.watchForData(TEST_PATH);
     // Now modify using client B, and client A won't get notification.
     sharedZkClientB.createPersistent(TEST_PATH, true);
     Assert.assertTrue(TestHelper.verify(() -> notificationCountB[0] == 2, 1000));


Mime
View raw message