helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch master updated: fix TestRawZkClient unstableness (#1295)
Date Fri, 02 Oct 2020 00:28:37 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new f2e68c8  fix TestRawZkClient unstableness (#1295)
f2e68c8 is described below

commit f2e68c83175de01bb5613e7742d498ccd1264ebc
Author: kaisun2000 <52840222+kaisun2000@users.noreply.github.com>
AuthorDate: Thu Oct 1 17:28:28 2020 -0700

    fix TestRawZkClient unstableness (#1295)
    
    ZkClient connects to ZooKeeper before monitor bean initialization and registration. This
causes race condition that fails metrics test. Fix this issue by properly construct the object.
    
    Co-authored-by: Kai Sun <ksun@ksun-mn1.linkedin.biz>
---
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 29 ++++++++++-----
 .../zookeeper/zkclient/metric/ZkClientMonitor.java | 43 +++++++++++++++++-----
 .../zkclient/metric/ZkClientPathMonitor.java       |  1 -
 .../apache/helix/zookeeper/impl/TestHelper.java    |  2 +-
 .../zookeeper/impl/client/TestRawZkClient.java     | 38 +++++++++++--------
 5 files changed, 77 insertions(+), 36 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 180fcdd..2f26f8b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -222,20 +222,22 @@ public class ZkClient implements Watcher {
     _asyncCallRetryThread.start();
     LOG.debug("ZkClient created with uid {}, _asyncCallRetryThread id {}", _uid, _asyncCallRetryThread.getId());
 
+    if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null
&& !monitorType
+        .isEmpty()) {
+      _monitor =
+          new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly,
+              _eventThread);
+    } else {
+      LOG.info("ZkClient monitor key or type is not provided. Skip monitoring.");
+    }
+
     connect(connectionTimeout, this);
 
-    // initiate monitor
     try {
-      if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null
&& !monitorType
-          .isEmpty()) {
-        _monitor =
-            new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly,
-                _eventThread);
+      if (_monitor != null) {
         _monitor.register();
-      } else {
-        LOG.info("ZkClient monitor key or type is not provided. Skip monitoring.");
       }
-    } catch (JMException e) {
+    } catch (JMException e){
       LOG.error("Error in creating ZkClientMonitor", e);
     }
   }
@@ -1284,6 +1286,7 @@ public class ZkClient implements Watcher {
         });
   }
 
+
   /*
    *  Note, issueSync takes a ZooKeeper (client) object and pass it to doAsyncSync().
    *  The reason we do this is that we want to ensure each new session event is preceded
with exactly
@@ -2157,6 +2160,14 @@ public class ZkClient implements Watcher {
 
       IZkConnection zkConnection = getConnection();
       _eventThread = new ZkEventThread(zkConnection.getServers());
+
+      if (_monitor != null) {
+        boolean result = _monitor.setAndInitZkEventThreadMonitor(_eventThread);
+        if (!result) {
+          LOG.error("register _eventThread monitor failed due to an existing one");
+        }
+      }
+
       _eventThread.start();
 
       LOG.debug("ZkClient {},  _eventThread {}", _uid, _eventThread.getId());
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
index 6717472..6bfc747 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
@@ -38,9 +38,12 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 import org.apache.helix.monitoring.mbeans.exception.MetricException;
 import org.apache.helix.zookeeper.zkclient.ZkEventThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ZkClientMonitor extends DynamicMBeanProvider {
+
   public static final String MONITOR_TYPE = "Type";
   public static final String MONITOR_KEY = "Key";
   protected static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client Monitor";
@@ -83,8 +86,22 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     _expiredSessionCounter = new SimpleDynamicMetric("ExpiredSessionCounter", 0l);
     _dataChangeEventCounter = new SimpleDynamicMetric("DataChangeEventCounter", 0l);
     _outstandingRequestGauge = new SimpleDynamicMetric("OutstandingRequestGauge", 0l);
+
     if (zkEventThread != null) {
-      _zkEventThreadMetric = new ZkThreadMetric(zkEventThread);
+      boolean result = setAndInitZkEventThreadMonitor(zkEventThread);
+      if (!result) {
+        _logger.error("register zkEventThreadMonitor failed due to an existing one.");
+      }
+    }
+
+    for (ZkClientPathMonitor.PredefinedPath path : ZkClientPathMonitor.PredefinedPath.values())
{
+      // If monitor root path only, check if the current path is Root.
+      // Otherwise, add monitors for every path.
+      if (!_monitorRootOnly || path.equals(ZkClientPathMonitor.PredefinedPath.Root)) {
+        _zkClientPathMonitorMap.put(path,
+            new ZkClientPathMonitor(path, _monitorType, _monitorKey, _monitorInstanceName)
+        );
+      }
     }
   }
 
@@ -96,6 +113,14 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
             (monitorKey + (monitorInstanceName == null ? "" : "." + monitorInstanceName)));
   }
 
+  public synchronized boolean setAndInitZkEventThreadMonitor(ZkEventThread zkEventThread)
{
+    if (_zkEventThreadMetric == null) {
+      _zkEventThreadMetric = new ZkThreadMetric(zkEventThread);
+      return true;
+    }
+    return false;
+  }
+
   @Override
   public DynamicMBeanProvider register() throws JMException {
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
@@ -108,15 +133,15 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     }
     doRegister(attributeList, MBEAN_DESCRIPTION,
         getObjectName(_monitorType, _monitorKey, _monitorInstanceName));
-    for (ZkClientPathMonitor.PredefinedPath path : ZkClientPathMonitor.PredefinedPath.values())
{
-      // If monitor root path only, check if the current path is Root.
-      // Otherwise, add monitors for every path.
-      if (!_monitorRootOnly || path.equals(ZkClientPathMonitor.PredefinedPath.Root)) {
-        _zkClientPathMonitorMap.put(path,
-            new ZkClientPathMonitor(path, _monitorType, _monitorKey, _monitorInstanceName)
-                .register());
+    _zkClientPathMonitorMap.values().stream().forEach( monitor -> {
+      if (monitor != null) {
+        try {
+          monitor.register();
+        } catch (JMException e) {
+           _logger.error(" {} failed registration", monitor, e);
+        }
       }
-    }
+    });
     return this;
   }
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
index db3de03..1ab2653 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
@@ -33,7 +33,6 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
-
 public class ZkClientPathMonitor extends DynamicMBeanProvider {
   public static final String MONITOR_PATH = "PATH";
   private final String _sensorName;
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
index 9a6ac02..515da6d 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
@@ -32,7 +32,7 @@ import org.testng.Assert;
 
 public class TestHelper {
   private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
-  public static final long WAIT_DURATION = 20 * 1000L; // 20 seconds
+  public static final long WAIT_DURATION = 60 * 1000L; // 60 seconds
 
   /**
    * Returns a unused random port.
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index a21b714..535d760 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -285,20 +285,26 @@ public class TestRawZkClient extends ZkTestBase {
     Assert.assertTrue(beanServer.isRegistered(idealStatename));
 
     Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(name, "StateChangeEventCounter"),
0);
     Assert.assertEquals((long) beanServer.getAttribute(name, "ExpiredSessionCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"),
0);
-    // account for doAsyncSync()
-    Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackCounter"), 1);
 
-    // Test exists
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"),
0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"),
0);
+    boolean verifyResult = TestHelper.verify(()->{
+      return (long) beanServer.getAttribute(rootname, "ReadCounter") == 1;
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(verifyResult, " did not see first sync() read");
+
+    Assert.assertEquals((long) beanServer.getAttribute(name, "StateChangeEventCounter"),
1);
+
+    long firstLatencyCounter =  (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter");
+    long firstReadLatencyGauge = (long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max");
+    Assert.assertTrue(firstLatencyCounter >= 0);
+    Assert.assertTrue(firstReadLatencyGauge >= 0);
     zkClient.exists(TEST_ROOT);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter")
>= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >=
0);
+
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadCounter") == 2);
+
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter")
>= firstLatencyCounter);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >=
firstReadLatencyGauge);
 
     // Test create
     Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0);
@@ -327,7 +333,7 @@ public class TestRawZkClient extends ZkTestBase {
     Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max")
>= 0);
 
     // Test read
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
0);
@@ -338,7 +344,7 @@ public class TestRawZkClient extends ZkTestBase {
     Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"),
0);
     zkClient.readData(TEST_PATH, new Stat());
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3);
     Assert
         .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 1);
@@ -350,27 +356,27 @@ public class TestRawZkClient extends ZkTestBase {
         >= origIdealStatesReadTotalLatencyCounter);
     Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max")
>= 0);
     zkClient.getChildren(TEST_PATH);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4);
     Assert
         .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 2);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
         TEST_DATA_SIZE);
     zkClient.getStat(TEST_PATH);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5);
     Assert
         .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 3);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
         TEST_DATA_SIZE);
     zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6);
 
     ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
         new ZkAsyncCallbacks.ExistsCallbackHandler();
     zkClient.asyncExists(TEST_PATH, callbackHandler);
     callbackHandler.waitForSuccess();
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 7);
 
     // Test write
     zkClient.writeData(TEST_PATH, TEST_DATA);


Mime
View raw message