This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 85f246a Fix the watcher leakage issue (#688)
85f246a is described below
commit 85f246a65348914eee4cd1097e492cf6e50d12af
Author: Yi Wang <i3.wangyi@gmail.com>
AuthorDate: Fri Feb 7 15:56:34 2020 -0800
Fix the watcher leakage issue (#688)
Fix the zk watcher leakage issue when znode is deleted
- Leverage the EventType (NodeDeleted) to verify if the path is deleted
- If it's deleted, no need to re-install the watcher
---
.../helix/manager/zk/zookeeper/ZkClient.java | 38 ++---
.../controller/TestWatcherLeakageOnController.java | 102 ++++++++++++++
.../org/apache/helix/manager/zk/TestZKWatch.java | 153 +++++++++++++++++++++
.../helix/manager/zk/client/TestHelixZkClient.java | 2 +
4 files changed, 278 insertions(+), 17 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index a28ea83..453c8c3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -226,6 +226,12 @@ public class ZkClient implements Watcher {
}
}
+ /**
+ * Subscribe the path and the listener will handle data events of the path
+ * WARNING: if the path is created after deletion, users need to re-subscribe the path
+ * @param path The zookeeper path
+ * @param listener Instance of {@link IZkDataListener}
+ */
public void subscribeDataChanges(String path, IZkDataListener listener) {
Set<IZkDataListenerEntry> listenerEntries;
synchronized (_dataListener) {
@@ -903,12 +909,8 @@ public class ZkClient implements Watcher {
|| event.getType() == Event.EventType.NodeDeleted
|| event.getType() == Event.EventType.NodeCreated
|| event.getType() == Event.EventType.NodeChildrenChanged;
-
- if (event.getType() == Event.EventType.NodeDeleted) {
- if (LOG.isDebugEnabled()) {
- String path = event.getPath();
- LOG.debug(path);
- }
+ if (event.getType() == EventType.NodeDeleted) {
+ LOG.debug("Path {} is deleted", event.getPath());
}
getEventLock().lock();
@@ -959,11 +961,12 @@ public class ZkClient implements Watcher {
}
private void fireAllEvents() {
+ //TODO: During handling new session, if the path is deleted, watcher leakage could still
happen
for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet())
{
- fireChildChangedEvents(entry.getKey(), entry.getValue());
+ fireChildChangedEvents(entry.getKey(), entry.getValue(), true);
}
for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet())
{
- fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty());
+ fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true);
}
}
@@ -1248,6 +1251,7 @@ public class ZkClient implements Watcher {
private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
final String path = event.getPath();
+ final boolean pathExists = event.getType() != EventType.NodeDeleted;
if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated
|| event.getType() == EventType.NodeDeleted) {
@@ -1255,7 +1259,7 @@ public class ZkClient implements Watcher {
if (childListeners != null && !childListeners.isEmpty()) {
// TODO recording child changed event propagation latency as well. Note this change
will
// introduce additional ZK access.
- fireChildChangedEvents(path, childListeners);
+ fireChildChangedEvents(path, childListeners, pathExists);
}
}
@@ -1263,13 +1267,13 @@ public class ZkClient implements Watcher {
|| event.getType() == EventType.NodeCreated) {
Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
if (listeners != null && !listeners.isEmpty()) {
- fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime));
+ fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
pathExists);
}
}
}
private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners,
- final OptionalLong notificationTime) {
+ final OptionalLong notificationTime, boolean pathExists) {
try {
final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
// Trigger listener callbacks
@@ -1278,12 +1282,11 @@ public class ZkClient implements Watcher {
+ listener.getDataListener() + " prefetch data: " + listener.isPrefetchData())
{
@Override
public void run() throws Exception {
- // Reinstall watch before listener callbacks to check the znode status
if (!pathStatRecord.pathChecked()) {
- pathStatRecord.recordPathStat(getStat(path, true), notificationTime);
+ // getStat will re-install watcher only when the path exists
+ pathStatRecord.recordPathStat(getStat(path, pathExists), notificationTime);
}
if (!pathStatRecord.pathExists()) {
- // no znode found at the path, trigger data deleted handler.
listener.getDataListener().handleDataDeleted(path);
} else {
Object data = null;
@@ -1292,6 +1295,7 @@ public class ZkClient implements Watcher {
LOG.debug("Prefetch data for path: {}", path);
}
try {
+ // TODO: the data is redundantly read multiple times when multiple listeners
exist
data = readData(path, null, true);
} catch (ZkNoNodeException e) {
LOG.warn("Prefetch data for path: {} failed.", path, e);
@@ -1309,21 +1313,21 @@ public class ZkClient implements Watcher {
}
}
- private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners)
{
+ private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners,
boolean pathExists) {
try {
final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
for (final IZkChildListener listener : childListeners) {
_eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener)
{
@Override
public void run() throws Exception {
- // Reinstall watch before listener callbacks to check the znode status
if (!pathStatRecord.pathChecked()) {
- pathStatRecord.recordPathStat(getStat(path, hasListeners(path)),
+ pathStatRecord.recordPathStat(getStat(path, hasListeners(path) && pathExists),
OptionalLong.empty());
}
List<String> children = null;
if (pathStatRecord.pathExists()) {
try {
+ //TODO: duplicate reads when multiple child listener exists
children = getChildren(path);
} catch (ZkNoNodeException e) {
LOG.warn("Get children under path: {} failed.", path, e);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestWatcherLeakageOnController.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestWatcherLeakageOnController.java
new file mode 100644
index 0000000..5fdca52
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestWatcherLeakageOnController.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.controller;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestWatcherLeakageOnController extends ZkTestBase {
+ private final String CLASS_NAME = getShortClassName();
+ private final String TEST_RESOURCE = "TestResource";
+ private final String CLUSTER_NAME = "TestCluster-" + CLASS_NAME;
+ private ZkHelixClusterVerifier _clusterVerifier;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, "TestInstance");
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_RESOURCE, 10, "MasterSlave");
+ }
+
+ @AfterClass
+ public void afterClass() {
+ deleteCluster(CLUSTER_NAME);
+ }
+
+ @Test
+ public void testWatcherOnResourceDeletion() throws Exception {
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "TestController");
+ controller.syncStart();
+ Map<String, List<String>> zkWatches = ZkTestHelper.getZkWatch(controller.getZkClient());
+
+ List<String> dataWatchesBefore = zkWatches.get("dataWatches");
+
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, TEST_RESOURCE);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ zkWatches = ZkTestHelper.getZkWatch(controller.getZkClient());
+ List<String> dataWatchesAfter = zkWatches.get("dataWatches");
+
+ Assert.assertEquals(dataWatchesBefore.size() - dataWatchesAfter.size(), 1);
+ dataWatchesBefore.removeAll(dataWatchesAfter);
+ // The data watch on [/TestCluster-TestWatcherLeakageOnController/IDEALSTATES/TestResource]
should be removed
+ Assert.assertTrue(dataWatchesBefore.get(0).contains(TEST_RESOURCE));
+
+ controller.syncStop();
+ }
+
+ @Test
+ public void testWatcherOnResourceAddition() throws Exception {
+ String tmpResource = "tmpResource";
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "TestController");
+ controller.syncStart();
+ Map<String, List<String>> zkWatches = ZkTestHelper.getZkWatch(controller.getZkClient());
+
+ List<String> dataWatchesBefore = zkWatches.get("dataWatches");
+
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, tmpResource, 10, "MasterSlave");
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ zkWatches = ZkTestHelper.getZkWatch(controller.getZkClient());
+ List<String> dataWatchesAfter = zkWatches.get("dataWatches");
+
+ Assert.assertEquals(dataWatchesAfter.size() - dataWatchesBefore.size(), 1);
+ dataWatchesAfter.removeAll(dataWatchesBefore);
+ // The data watch on [/TestCluster-TestWatcherLeakageOnController/IDEALSTATES/tmpResource]
should be added
+ Assert.assertTrue(dataWatchesAfter.get(0).contains(tmpResource));
+
+ controller.syncStop();
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java
new file mode 100644
index 0000000..9d247d4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java
@@ -0,0 +1,153 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZKWatch extends ZkUnitTestBase {
+ private ZkClient _zkClient;
+
+ @BeforeClass
+ public void beforeClass() {
+ _zkClient = new ZkClient(ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ _zkClient.close();
+ }
+
+ @Test
+ public void testSubscribeDataChange() throws Exception {
+ String existPath = "/existPath";
+ _zkClient.createPersistent(existPath);
+ final CountDownLatch deleteCondition = new CountDownLatch(1);
+ final IZkDataListener dataListener = new IZkDataListener() {
+ @Override
+ public void handleDataChange(String s, Object o) throws Exception {
+
+ }
+
+ @Override
+ public void handleDataDeleted(String path) throws Exception {
+ deleteCondition.countDown();
+ _zkClient.unsubscribeDataChanges(path, this);
+ }
+ };
+ _zkClient.subscribeDataChanges(existPath, dataListener);
+
+ Assert.assertEquals(_zkClient.numberOfListeners(), 1);
+ Map<String, List<String>> zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+ Assert.assertEquals(zkWatch.get("dataWatches").size(), 1);
+ Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+ // remove the zk node, the NodeDeleted event will be processed
+ _zkClient.delete(existPath);
+ deleteCondition.await();
+ zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+ Assert.assertEquals(zkWatch.get("dataWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+
+ Assert.assertEquals(_zkClient.numberOfListeners(), 0);
+ }
+
+ @Test(dependsOnMethods = "testSubscribeDataChange")
+ public void testSubscribeChildChange() throws Exception {
+ String parentPath = "/tmp";
+ String childPath = parentPath + "/childNode";
+ _zkClient.createPersistent(childPath, true);
+ final CountDownLatch deleteCondition = new CountDownLatch(1);
+
+ IZkChildListener childListener = new IZkChildListener() {
+ @Override
+ public void handleChildChange(String parentPath, List<String> childrenPaths)
throws Exception {
+ deleteCondition.countDown();
+ _zkClient.unsubscribeChildChanges(parentPath, this);
+ }
+ };
+ _zkClient.subscribeChildChanges(parentPath, childListener);
+ Map<String, List<String>> zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+ Assert.assertEquals(zkWatch.get("dataWatches").size(), 1);
+ Assert.assertEquals(zkWatch.get("dataWatches").get(0), parentPath);
+ Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("childWatches").size(), 1);
+ Assert.assertEquals(zkWatch.get("childWatches").get(0), parentPath);
+
+ // Delete the child node
+ _zkClient.delete(childPath);
+
+ deleteCondition.await();
+ zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+ // Expectation: the child listener should still exist
+ Assert.assertEquals(zkWatch.get("dataWatches").size(), 1);
+ Assert.assertEquals(zkWatch.get("dataWatches").get(0), parentPath);
+ Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("childWatches").size(), 1);
+ Assert.assertEquals(zkWatch.get("childWatches").get(0), parentPath);
+ Assert.assertEquals(_zkClient.numberOfListeners(), 0);
+
+ // delete the parent path
+ _zkClient.delete(parentPath);
+ zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+ Assert.assertEquals(zkWatch.get("dataWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("existWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+ }
+
+ @Test(dependsOnMethods = "testSubscribeChildChange")
+ public void testSubscribeDataChangeOnNonExistPath() throws Exception {
+ String nonExistPath = "/nonExistPath";
+ IZkDataListener dataListener = new IZkDataListener() {
+ @Override
+ public void handleDataChange(String s, Object o) throws Exception {
+
+ }
+
+ @Override
+ public void handleDataDeleted(String s) throws Exception {
+
+ }
+ };
+ _zkClient.subscribeDataChanges(nonExistPath, dataListener);
+ Map<String, List<String>> zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+ Assert.assertEquals(zkWatch.get("dataWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("existWatches").size(), 1);
+ Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+ // cleanup (unsubscribe will not clean up the watcher on ZK server
+ _zkClient.unsubscribeDataChanges(nonExistPath, dataListener);
+ zkWatch = ZkTestHelper.getZkWatch(_zkClient);
+ Assert.assertEquals(zkWatch.get("dataWatches").size(), 0);
+ Assert.assertEquals(zkWatch.get("existWatches").size(), 1);
+ Assert.assertEquals(zkWatch.get("childWatches").size(), 0);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
index f9c8acb..796d393 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
@@ -163,6 +163,8 @@ public class TestHelixZkClient extends ZkUnitTestBase {
// expected to be here.
}
+ // client B needs to re-install the data watch
+ sharedZkClientB.watchForData(TEST_PATH);
// Now modify using client B, and client A won't get notification.
sharedZkClientB.createPersistent(TEST_PATH, true);
Assert.assertTrue(TestHelper.verify(() -> notificationCountB[0] == 2, 1000));
|