helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [helix] 01/03: Revert "Add async call retry to resolve the transient ZK connection issue. (#970)"
Date Mon, 04 May 2020 21:29:42 GMT
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 370e277966f75a7fba45f5b96f7608c127b2905c
Author: Junkai Xue <jxue@linkedin.com>
AuthorDate: Mon May 4 13:02:13 2020 -0700

    Revert "Add async call retry to resolve the transient ZK connection issue. (#970)"
    
    This reverts commit 96ebb27c23004a7a69dc4799b14586ff82d53c9e.
---
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  96 ++---
 .../callback/CancellableZkAsyncCallback.java       |   8 -
 .../callback/ZkAsyncCallMonitorContext.java        |  46 ---
 .../zkclient/callback/ZkAsyncCallbacks.java        | 153 +++-----
 .../zkclient/callback/ZkAsyncRetryCallContext.java |  49 ---
 .../zkclient/callback/ZkAsyncRetryThread.java      |  57 ---
 .../apache/helix/zookeeper/impl/ZkTestBase.java    |   2 +-
 .../impl/client/TestZkClientAsyncRetry.java        | 405 ---------------------
 8 files changed, 81 insertions(+), 735 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 89f9e32..562143f 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -27,10 +27,7 @@ import javax.management.JMException;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryThread;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
@@ -99,10 +96,6 @@ public class ZkClient implements Watcher {
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private ZkClientMonitor _monitor;
 
-  // To automatically retry the async operation, we need a separate thread other than the
-  // ZkEventThread. Otherwise the retry request might block the normal event processing.
-  protected final ZkAsyncRetryThread _asyncCallRetryThread;
-
   private class IZkDataListenerEntry {
     final IZkDataListener _dataListener;
     final boolean _prefetchData;
@@ -190,9 +183,6 @@ public class ZkClient implements Watcher {
     _operationRetryTimeoutInMillis = operationRetryTimeout;
     _isNewSessionEventFired = false;
 
-    _asyncCallRetryThread = new ZkAsyncRetryThread(zkConnection.getServers());
-    _asyncCallRetryThread.start();
-
     connect(connectionTimeout, this);
 
     // initiate monitor
@@ -1746,23 +1736,15 @@ public class ZkClient implements Watcher {
       data = (datat == null ? null : serialize(datat, path));
     } catch (ZkMarshallingError e) {
       cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
-          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
       return;
     }
-    doAsyncCreate(path, data, mode, startT, cb);
-  }
-
-  private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
-      final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb) {
     retryUntilConnected(() -> {
       ((ZkConnection) getConnection()).getZookeeper()
-          .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
-              new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0,
false) {
-                @Override
-                protected void doRetry() {
-                  doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb);
-                }
-              });
+          .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+              // Arrays.asList(DEFAULT_ACL),
+              mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+                  data == null ? 0 : data.length, false));
       return null;
     });
   }
@@ -1776,66 +1758,50 @@ public class ZkClient implements Watcher {
       data = serialize(datat, path);
     } catch (ZkMarshallingError e) {
       cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
-          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
       return;
     }
-    doAsyncSetData(path, data, version, startT, cb);
-  }
-
-  private void doAsyncSetData(final String path, byte[] data, final int version, final long
startT,
-      final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
     retryUntilConnected(() -> {
       ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
-          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT,
-              data == null ? 0 : data.length, false) {
-            @Override
-            protected void doRetry() {
-              doAsyncSetData(path, data, version, System.currentTimeMillis(), cb);
-            }
-          });
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+              data == null ? 0 : data.length, false));
       return null;
     });
   }
 
   public void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler
cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(() -> {
-      ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
-          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true)
{
-            @Override
-            protected void doRetry() {
-              asyncGetData(path, cb);
-            }
-          });
-      return null;
+    retryUntilConnected(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
+            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
+        return null;
+      }
     });
   }
 
   public void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler
cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(() -> {
-      ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
-          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true)
{
-            @Override
-            protected void doRetry() {
-              asyncExists(path, cb);
-            }
-          });
-      return null;
+    retryUntilConnected(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
+            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
+        return null;
+      }
     });
   }
 
   public void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler
cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(() -> {
-      ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
-          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false)
{
-            @Override
-            protected void doRetry() {
-              asyncDelete(path, cb);
-            }
-          });
-      return null;
+    retryUntilConnected(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
+            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false));
+        return null;
+      }
     });
   }
 
@@ -1989,10 +1955,6 @@ public class ZkClient implements Watcher {
         return;
       }
       setShutdownTrigger(true);
-      if (_asyncCallRetryThread != null) {
-        _asyncCallRetryThread.interrupt();
-        _asyncCallRetryThread.join(2000);
-      }
       _eventThread.interrupt();
       _eventThread.join(2000);
       if (isManagingZkConnection()) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
deleted file mode 100644
index 27d92e8..0000000
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.helix.zookeeper.zkclient.callback;
-
-public interface CancellableZkAsyncCallback {
-  /**
-   * Notify all the callers that are waiting for the callback to cancel the wait.
-   */
-  void notifyCallers();
-}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
deleted file mode 100644
index bf2fd44..0000000
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.apache.helix.zookeeper.zkclient.callback;
-
-import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
-
-public class ZkAsyncCallMonitorContext {
-  private final long _startTimeMilliSec;
-  private final ZkClientMonitor _monitor;
-  private final boolean _isRead;
-  private int _bytes;
-
-  /**
-   * @param monitor           ZkClient monitor for update the operation result.
-   * @param startTimeMilliSec Operation initialization time.
-   * @param bytes             The data size in bytes that is involved in the operation.
-   * @param isRead            True if the operation is readonly.
-   */
-  public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec,
int bytes,
-      boolean isRead) {
-    _monitor = monitor;
-    _startTimeMilliSec = startTimeMilliSec;
-    _bytes = bytes;
-    _isRead = isRead;
-  }
-
-  /**
-   * Update the operated data size in bytes.
-   * @param bytes
-   */
-  void setBytes(int bytes) {
-    _bytes = bytes;
-  }
-
-  /**
-   * Record the operation result into the specified ZkClient monitor.
-   * @param path
-   */
-  void recordAccess(String path) {
-    if (_monitor != null) {
-      if (_isRead) {
-        _monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.READ);
-      } else {
-        _monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.WRITE);
-      }
-    }
-  }
-}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 70dbab4..04c4058 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -31,35 +31,41 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class ZkAsyncCallbacks {
   private static Logger LOG = LoggerFactory.getLogger(ZkAsyncCallbacks.class);
-  public static final int UNKNOWN_RET_CODE = 255;
 
   public static class GetDataCallbackHandler extends DefaultCallback implements DataCallback
{
     public byte[] _data;
     public Stat _stat;
 
     @Override
+    public void handle() {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
       if (rc == 0) {
         _data = data;
         _stat = stat;
         // update ctx with data size
-        if (_data != null && ctx != null && ctx instanceof ZkAsyncCallMonitorContext)
{
-          ((ZkAsyncCallMonitorContext) ctx).setBytes(_data.length);
+        if (_data != null && ctx != null && ctx instanceof ZkAsyncCallContext)
{
+          ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx;
+          zkCtx._bytes = _data.length;
         }
       }
       callback(rc, path, ctx);
     }
+  }
+
+  public static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
{
+    Stat _stat;
 
     @Override
     public void handle() {
       // TODO Auto-generated method stub
     }
-  }
-
-  public static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
{
-    Stat _stat;
 
     @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
@@ -72,15 +78,15 @@ public class ZkAsyncCallbacks {
     public Stat getStat() {
       return _stat;
     }
+  }
+
+  public static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
{
+    public Stat _stat;
 
     @Override
     public void handle() {
       // TODO Auto-generated method stub
     }
-  }
-
-  public static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
{
-    public Stat _stat;
 
     @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
@@ -89,11 +95,6 @@ public class ZkAsyncCallbacks {
       }
       callback(rc, path, ctx);
     }
-
-    @Override
-    public void handle() {
-      // TODO Auto-generated method stub
-    }
   }
 
   public static class CreateCallbackHandler extends DefaultCallback implements StringCallback
{
@@ -121,66 +122,44 @@ public class ZkAsyncCallbacks {
   }
 
   /**
-   * Default callback for zookeeper async api.
+   * Default callback for zookeeper async api
    */
-  public static abstract class DefaultCallback implements CancellableZkAsyncCallback {
-    AtomicBoolean _isOperationDone = new AtomicBoolean(false);
-    int _rc = UNKNOWN_RET_CODE;
+  public static abstract class DefaultCallback {
+    AtomicBoolean _lock = new AtomicBoolean(false);
+    int _rc = -1;
 
     public void callback(int rc, String path, Object ctx) {
       if (rc != 0 && LOG.isDebugEnabled()) {
         LOG.debug(this + ", rc:" + Code.get(rc) + ", path: " + path);
       }
 
-      if (ctx != null && ctx instanceof ZkAsyncCallMonitorContext) {
-        ((ZkAsyncCallMonitorContext) ctx).recordAccess(path);
-      }
-
-      _rc = rc;
-
-      // If retry is requested by passing the retry callback context, do retry if necessary.
-      if (needRetry(rc)) {
-        if (ctx != null && ctx instanceof ZkAsyncRetryCallContext) {
-          try {
-            if (((ZkAsyncRetryCallContext) ctx).requestRetry()) {
-              // The retry operation will be done asynchronously. Once it is done, the same
callback
-              // handler object shall be triggered to ensure the result is notified to the
right
-              // caller(s).
-              return;
-            } else {
-              LOG.warn(
-                  "Cannot request to retry the operation. The retry request thread may have
been stopped.");
-            }
-          } catch (Throwable t) {
-            LOG.error("Failed to request to retry the operation.", t);
+      if (ctx != null && ctx instanceof ZkAsyncCallContext) {
+        ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx;
+        if (zkCtx._monitor != null) {
+          if (zkCtx._isRead) {
+            zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec,
+                ZkClientMonitor.AccessType.READ);
+          } else {
+            zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec,
+                ZkClientMonitor.AccessType.WRITE);
           }
-        } else {
-          LOG.warn(
-              "The provided callback context {} is not ZkAsyncRetryCallContext. Skip retrying.",
-              ctx.getClass().getName());
         }
       }
 
-      // If operation is done successfully or no retry needed, notify the caller(s).
-      try {
-        handle();
-      } finally {
-        markOperationDone();
-      }
-    }
+      _rc = rc;
+      handle();
 
-    public boolean isOperationDone() {
-      return _isOperationDone.get();
+      synchronized (_lock) {
+        _lock.set(true);
+        _lock.notify();
+      }
     }
 
-    /**
-     * The blocking call that return true once the operation has been completed without retrying.
-     */
     public boolean waitForSuccess() {
       try {
-        synchronized (_isOperationDone) {
-          while (!_isOperationDone.get()) {
-            _isOperationDone.wait();
+        synchronized (_lock) {
+          while (!_lock.get()) {
+            _lock.wait();
           }
         }
       } catch (InterruptedException e) {
@@ -193,52 +172,22 @@ public class ZkAsyncCallbacks {
       return _rc;
     }
 
-    @Override
-    public void notifyCallers() {
-      LOG.warn("The callback {} has been cancelled.", this);
-      markOperationDone();
-    }
-
-    /**
-     * Additional callback handling.
-     */
     abstract public void handle();
+  }
 
-    private void markOperationDone() {
-      synchronized (_isOperationDone) {
-        _isOperationDone.set(true);
-        _isOperationDone.notifyAll();
-      }
-    }
+  public static class ZkAsyncCallContext {
+    private long _startTimeMilliSec;
+    private int _bytes;
+    private ZkClientMonitor _monitor;
+    private boolean _isRead;
 
-    /**
-     * @param rc the return code
-     * @return true if the error is transient and the operation may succeed when being retried.
-     */
-    private boolean needRetry(int rc) {
-      try {
-        switch (Code.get(rc)) {
-        /** Connection to the server has been lost */
-        case CONNECTIONLOSS:
-          /** The session has been expired by the server */
-        case SESSIONEXPIRED:
-          /** Session moved to another server, so operation is ignored */
-        case SESSIONMOVED:
-          return true;
-        default:
-          return false;
-        }
-      } catch (ClassCastException | NullPointerException ex) {
-        LOG.error("Failed to handle unknown return code {}. Skip retrying.", rc, ex);
-        return false;
-      }
+    public ZkAsyncCallContext(final ZkClientMonitor monitor, long startTimeMilliSec, int
bytes,
+        boolean isRead) {
+      _monitor = monitor;
+      _startTimeMilliSec = startTimeMilliSec;
+      _bytes = bytes;
+      _isRead = isRead;
     }
   }
 
-  @Deprecated
-  public static class ZkAsyncCallContext extends ZkAsyncCallMonitorContext {
-    ZkAsyncCallContext(ZkClientMonitor monitor, long startTimeMilliSec, int bytes, boolean
isRead) {
-      super(monitor, startTimeMilliSec, bytes, isRead);
-    }
-  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
deleted file mode 100644
index 4a9402f..0000000
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.helix.zookeeper.zkclient.callback;
-
-import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class ZkAsyncRetryCallContext extends ZkAsyncCallMonitorContext {
-  private static Logger LOG = LoggerFactory.getLogger(ZkAsyncRetryCallContext.class);
-  private final ZkAsyncRetryThread _retryThread;
-  private final CancellableZkAsyncCallback _cancellableCallback;
-
-  /**
-   * @param retryThread       The thread that executes the retry operation.
-   *                          Note that retry in the ZkEventThread is not allowed to avoid
dead lock.
-   * @param callback          Cancellable asynchronous callback to notify when the retry
is cancelled.
-   * @param monitor           ZkClient monitor for update the operation result.
-   * @param startTimeMilliSec Operation initialization time.
-   * @param bytes             The data size in bytes that is involved in the operation.
-   * @param isRead            True if the operation is readonly.
-   */
-  public ZkAsyncRetryCallContext(final ZkAsyncRetryThread retryThread,
-      final CancellableZkAsyncCallback callback, final ZkClientMonitor monitor,
-      long startTimeMilliSec, int bytes, boolean isRead) {
-    super(monitor, startTimeMilliSec, bytes, isRead);
-    _retryThread = retryThread;
-    _cancellableCallback = callback;
-  }
-
-  /**
-   * Request a retry.
-   *
-   * @return True if the request was sent successfully.
-   */
-  boolean requestRetry() {
-    return _retryThread.sendRetryRequest(this);
-  }
-
-  /**
-   * Notify the pending callback that retry has been cancelled.
-   */
-  void cancel() {
-    _cancellableCallback.notifyCallers();
-  }
-
-  /**
-   * The actual retry operation logic.
-   */
-  protected abstract void doRetry() throws Exception;
-}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
deleted file mode 100644
index c59d423..0000000
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.helix.zookeeper.zkclient.callback;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZkAsyncRetryThread extends Thread {
-  private static Logger LOG = LoggerFactory.getLogger(ZkAsyncRetryThread.class);
-  private BlockingQueue<ZkAsyncRetryCallContext> _retryContexts = new LinkedBlockingQueue<>();
-  private volatile boolean _isReady = true;
-
-  public ZkAsyncRetryThread(String name) {
-    setDaemon(true);
-    setName("ZkClient-AsyncCallback-Retry-" + getId() + "-" + name);
-  }
-
-  @Override
-  public void run() {
-    LOG.info("Starting ZkClient AsyncCallback retry thread.");
-    try {
-      while (!isInterrupted()) {
-        ZkAsyncRetryCallContext context = _retryContexts.take();
-        try {
-          context.doRetry();
-        } catch (InterruptedException | ZkInterruptedException e) {
-          // if interrupted, stop retrying and interrupt the thread.
-          context.cancel();
-          interrupt();
-        } catch (Throwable e) {
-          LOG.error("Error retrying callback " + context, e);
-        }
-      }
-    } catch (InterruptedException e) {
-      LOG.info("ZkClient AsyncCallback retry thread is interrupted.");
-    }
-    synchronized (this) {
-      // Mark ready to be false, so no new requests will be sent.
-      _isReady = false;
-      // Notify to all the callers waiting for the result.
-      for (ZkAsyncRetryCallContext context : _retryContexts) {
-        context.cancel();
-      }
-    }
-    LOG.info("Terminate ZkClient AsyncCallback retry thread.");
-  }
-
-  synchronized boolean sendRetryRequest(ZkAsyncRetryCallContext context) {
-    if (_isReady) {
-      _retryContexts.add(context);
-      return true;
-    }
-    return false;
-  }
-}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
index 2b8b1b3..51eda80 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
@@ -58,7 +58,7 @@ public class ZkTestBase {
    * Multiple ZK references
    */
   // The following maps hold ZK connect string as keys
-  protected static final Map<String, ZkServer> _zkServerMap = new HashMap<>();
+  protected final Map<String, ZkServer> _zkServerMap = new HashMap<>();
   protected static int _numZk = 1; // Initial value
 
   @BeforeSuite
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
deleted file mode 100644
index 4e5b06f..0000000
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
+++ /dev/null
@@ -1,405 +0,0 @@
-package org.apache.helix.zookeeper.impl.client;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
-import org.apache.helix.zookeeper.impl.ZkTestBase;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext;
-import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.UNKNOWN_RET_CODE;
-import static org.apache.zookeeper.KeeperException.Code.CONNECTIONLOSS;
-
-/**
- * Note this is a whitebox test to test the async operation callback/context.
- * We don't have a good way to simulate an async ZK operation failure in the server side
yet.
- */
-public class TestZkClientAsyncRetry extends ZkTestBase {
-  private final String TEST_ROOT = String.format("/%s", getClass().getSimpleName());
-  private final String NODE_PATH = TEST_ROOT + "/async";
-
-  private org.apache.helix.zookeeper.zkclient.ZkClient _zkClient;
-  private String _zkServerAddress;
-
-  @BeforeClass
-  public void beforeClass() {
-    _zkClient = _zkServerMap.values().iterator().next().getZkClient();
-    _zkServerAddress = _zkClient.getServers();
-    _zkClient.createPersistent(TEST_ROOT);
-  }
-
-  @AfterClass
-  public void afterClass() {
-    _zkClient.deleteRecursively(TEST_ROOT);
-    _zkClient.close();
-  }
-
-  private boolean waitAsyncOperation(ZkAsyncCallbacks.DefaultCallback callback, long timeout)
{
-    final boolean[] ret = { false };
-    Thread waitThread = new Thread(() -> ret[0] = callback.waitForSuccess());
-    waitThread.start();
-    try {
-      waitThread.join(timeout);
-      waitThread.interrupt();
-      return ret[0];
-    } catch (InterruptedException e) {
-      return false;
-    }
-  }
-
-  @Test
-  public void testAsyncRetryCategories() {
-    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
-    try {
-      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
-      tmpRecord.setSimpleField("foo", "bar");
-      // Loop all possible error codes to test async create.
-      // Only connectivity issues will be retried, the other issues will be return error
immediately.
-      for (KeeperException.Code code : KeeperException.Code.values()) {
-        if (code == KeeperException.Code.OK) {
-          continue;
-        }
-        ZkAsyncCallbacks.CreateCallbackHandler createCallback =
-            new ZkAsyncCallbacks.CreateCallbackHandler();
-        Assert.assertEquals(createCallback.getRc(), UNKNOWN_RET_CODE);
-        testZkClient.setAsyncCallRC(code.intValue());
-        if (code == CONNECTIONLOSS || code == KeeperException.Code.SESSIONEXPIRED
-            || code == KeeperException.Code.SESSIONMOVED) {
-          // Async create will be pending due to the mock error rc is retryable.
-          testZkClient.asyncCreate(NODE_PATH, null, CreateMode.PERSISTENT, createCallback);
-          Assert.assertFalse(createCallback.isOperationDone());
-          Assert.assertEquals(createCallback.getRc(), code.intValue());
-          // Change the mock response
-          testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-          // Async retry will succeed now. Wait until the operation is successfully done
and verify.
-          Assert.assertTrue(waitAsyncOperation(createCallback, 1000));
-          Assert.assertEquals(createCallback.getRc(), KeeperException.Code.OK.intValue());
-          Assert.assertTrue(testZkClient.exists(NODE_PATH));
-          Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
-        } else {
-          // Async create will fail due to the mock error rc is not recoverable.
-          testZkClient.asyncCreate(NODE_PATH, null, CreateMode.PERSISTENT, createCallback);
-          Assert.assertTrue(waitAsyncOperation(createCallback, 1000));
-          Assert.assertEquals(createCallback.getRc(), code.intValue());
-          Assert.assertEquals(testZkClient.getAndResetRetryCount(), 0);
-        }
-        testZkClient.delete(NODE_PATH);
-        Assert.assertFalse(testZkClient.exists(NODE_PATH));
-      }
-    } finally {
-      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-      testZkClient.close();
-      _zkClient.delete(NODE_PATH);
-    }
-  }
-
-  @Test(dependsOnMethods = "testAsyncRetryCategories")
-  public void testAsyncWriteRetry() {
-    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
-    try {
-      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
-      tmpRecord.setSimpleField("foo", "bar");
-      testZkClient.createPersistent(NODE_PATH, tmpRecord);
-
-      // 1. Test async set retry
-      ZkAsyncCallbacks.SetDataCallbackHandler setCallback =
-          new ZkAsyncCallbacks.SetDataCallbackHandler();
-      Assert.assertEquals(setCallback.getRc(), UNKNOWN_RET_CODE);
-
-      tmpRecord.setSimpleField("test", "data");
-      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
-      // Async set will be pending due to the mock error rc is retryable.
-      testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback);
-      Assert.assertFalse(setCallback.isOperationDone());
-      Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue());
-      // Change the mock return code.
-      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-      // Async retry will succeed now. Wait until the operation is successfully done and
verify.
-      Assert.assertTrue(waitAsyncOperation(setCallback, 1000));
-      Assert.assertEquals(setCallback.getRc(), KeeperException.Code.OK.intValue());
-      Assert.assertEquals(((ZNRecord) testZkClient.readData(NODE_PATH)).getSimpleField("test"),
-          "data");
-      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
-
-      // 2. Test async delete
-      ZkAsyncCallbacks.DeleteCallbackHandler deleteCallback =
-          new ZkAsyncCallbacks.DeleteCallbackHandler();
-      Assert.assertEquals(deleteCallback.getRc(), UNKNOWN_RET_CODE);
-
-      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
-      // Async delete will be pending due to the mock error rc is retryable.
-      testZkClient.asyncDelete(NODE_PATH, deleteCallback);
-      Assert.assertFalse(deleteCallback.isOperationDone());
-      Assert.assertEquals(deleteCallback.getRc(), CONNECTIONLOSS.intValue());
-      // Change the mock return code.
-      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-      // Async retry will succeed now. Wait until the operation is successfully done and
verify.
-      Assert.assertTrue(waitAsyncOperation(deleteCallback, 1000));
-      Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.OK.intValue());
-      Assert.assertFalse(testZkClient.exists(NODE_PATH));
-      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
-    } finally {
-      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-      testZkClient.close();
-      _zkClient.delete(NODE_PATH);
-    }
-  }
-
-  @Test(dependsOnMethods = "testAsyncWriteRetry")
-  public void testAsyncReadRetry() {
-    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
-    try {
-      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
-      tmpRecord.setSimpleField("foo", "bar");
-      testZkClient.createPersistent(NODE_PATH, tmpRecord);
-
-      // 1. Test async exist check
-      ZkAsyncCallbacks.ExistsCallbackHandler existsCallback =
-          new ZkAsyncCallbacks.ExistsCallbackHandler();
-      Assert.assertEquals(existsCallback.getRc(), UNKNOWN_RET_CODE);
-
-      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
-      // Async exist check will be pending due to the mock error rc is retryable.
-      testZkClient.asyncExists(NODE_PATH, existsCallback);
-      Assert.assertFalse(existsCallback.isOperationDone());
-      Assert.assertEquals(existsCallback.getRc(), CONNECTIONLOSS.intValue());
-      // Change the mock return code.
-      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-      // Async retry will succeed now. Wait until the operation is successfully done and
verify.
-      Assert.assertTrue(waitAsyncOperation(existsCallback, 1000));
-      Assert.assertEquals(existsCallback.getRc(), KeeperException.Code.OK.intValue());
-      Assert.assertTrue(existsCallback._stat != null);
-      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
-
-      // 2. Test async get
-      ZkAsyncCallbacks.GetDataCallbackHandler getCallback =
-          new ZkAsyncCallbacks.GetDataCallbackHandler();
-      Assert.assertEquals(getCallback.getRc(), UNKNOWN_RET_CODE);
-
-      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
-      // Async get will be pending due to the mock error rc is retryable.
-      testZkClient.asyncGetData(NODE_PATH, getCallback);
-      Assert.assertFalse(getCallback.isOperationDone());
-      Assert.assertEquals(getCallback.getRc(), CONNECTIONLOSS.intValue());
-      // Change the mock return code.
-      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-      // Async retry will succeed now. Wait until the operation is successfully done and
verify.
-      Assert.assertTrue(waitAsyncOperation(getCallback, 1000));
-      Assert.assertEquals(getCallback.getRc(), KeeperException.Code.OK.intValue());
-      ZNRecord record = testZkClient.deserialize(getCallback._data, NODE_PATH);
-      Assert.assertEquals(record.getSimpleField("foo"), "bar");
-      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
-    } finally {
-      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-      testZkClient.close();
-      _zkClient.delete(NODE_PATH);
-    }
-  }
-
-  @Test(dependsOnMethods = "testAsyncReadRetry")
-  public void testAsyncRequestCleanup() {
-    int cbCount = 10;
-    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
-    try {
-      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
-      tmpRecord.setSimpleField("foo", "bar");
-      testZkClient.createPersistent(NODE_PATH, tmpRecord);
-
-      // Create 10 async exists check requests
-      ZkAsyncCallbacks.ExistsCallbackHandler[] existsCallbacks =
-          new ZkAsyncCallbacks.ExistsCallbackHandler[cbCount];
-      for (int i = 0; i < cbCount; i++) {
-        existsCallbacks[i] = new ZkAsyncCallbacks.ExistsCallbackHandler();
-      }
-      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
-      // All async exist check calls will be pending due to the mock error rc is retryable.
-      for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
-        testZkClient.asyncExists(NODE_PATH, cb);
-        Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
-      }
-      // Wait for a while, no callback finishes
-      Assert.assertFalse(waitAsyncOperation(existsCallbacks[0], 1000));
-      for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
-        Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
-        Assert.assertFalse(cb.isOperationDone());
-      }
-      testZkClient.close();
-      // All callback retry will be cancelled because the zkclient is closed.
-      for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
-        Assert.assertTrue(waitAsyncOperation(cb, 1000));
-        Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
-      }
-      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
-    } finally {
-      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
-      testZkClient.close();
-      _zkClient.delete(NODE_PATH);
-    }
-  }
-
-  /**
-   * Mock client to whitebox test async functionality.
-   */
-  class MockAsyncZkClient extends ZkClient {
-    private static final long RETRY_INTERVAL_MS = 500;
-    private long _retryCount = 0;
-
-    /**
-     * If the specified return code is OK, call the real function.
-     * Otherwise, trigger the callback with the specified RC without triggering the real
ZK call.
-     */
-    private int _asyncCallRetCode = KeeperException.Code.OK.intValue();
-
-    public MockAsyncZkClient(String zkAddress) {
-      super(zkAddress);
-      setZkSerializer(new ZNRecordSerializer());
-    }
-
-    public void setAsyncCallRC(int rc) {
-      _asyncCallRetCode = rc;
-    }
-
-    public long getAndResetRetryCount() {
-      long tmpCount = _retryCount;
-      _retryCount = 0;
-      return tmpCount;
-    }
-
-    @Override
-    public void asyncCreate(String path, Object datat, CreateMode mode,
-        ZkAsyncCallbacks.CreateCallbackHandler cb) {
-      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
-        super.asyncCreate(path, datat, mode, cb);
-        return;
-      } else {
-        cb.processResult(_asyncCallRetCode, path,
-            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
-              @Override
-              protected void doRetry() {
-                _retryCount++;
-                try {
-                  Thread.sleep(RETRY_INTERVAL_MS);
-                } catch (InterruptedException e) {
-                  throw new ZkInterruptedException(e);
-                }
-                asyncCreate(path, datat, mode, cb);
-              }
-            }, null);
-      }
-    }
-
-    @Override
-    public void asyncSetData(String path, Object datat, int version,
-        ZkAsyncCallbacks.SetDataCallbackHandler cb) {
-      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
-        super.asyncSetData(path, datat, version, cb);
-        return;
-      } else {
-        cb.processResult(_asyncCallRetCode, path,
-            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
-              @Override
-              protected void doRetry() {
-                _retryCount++;
-                try {
-                  Thread.sleep(RETRY_INTERVAL_MS);
-                } catch (InterruptedException e) {
-                  throw new ZkInterruptedException(e);
-                }
-                asyncSetData(path, datat, version, cb);
-              }
-            }, null);
-      }
-    }
-
-    @Override
-    public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) {
-      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
-        super.asyncGetData(path, cb);
-        return;
-      } else {
-        cb.processResult(_asyncCallRetCode, path,
-            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, true) {
-              @Override
-              protected void doRetry() {
-                _retryCount++;
-                try {
-                  Thread.sleep(RETRY_INTERVAL_MS);
-                } catch (InterruptedException e) {
-                  throw new ZkInterruptedException(e);
-                }
-                asyncGetData(path, cb);
-              }
-            }, null, null);
-      }
-    }
-
-    @Override
-    public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) {
-      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
-        super.asyncExists(path, cb);
-        return;
-      } else {
-        cb.processResult(_asyncCallRetCode, path,
-            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, true) {
-              @Override
-              protected void doRetry() {
-                _retryCount++;
-                try {
-                  Thread.sleep(RETRY_INTERVAL_MS);
-                } catch (InterruptedException e) {
-                  throw new ZkInterruptedException(e);
-                }
-                asyncExists(path, cb);
-              }
-            }, null);
-      }
-    }
-
-    @Override
-    public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) {
-      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
-        super.asyncDelete(path, cb);
-        return;
-      } else {
-        cb.processResult(_asyncCallRetCode, path,
-            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
-              @Override
-              protected void doRetry() {
-                _retryCount++;
-                try {
-                  Thread.sleep(RETRY_INTERVAL_MS);
-                } catch (InterruptedException e) {
-                  throw new ZkInterruptedException(e);
-                }
-                asyncDelete(path, cb);
-              }
-            });
-      }
-    }
-  }
-}


Mime
View raw message