helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch master updated: Add async call retry to resolve the transient ZK connection issue. (#970)
Date Mon, 04 May 2020 19:36:21 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 96ebb27  Add async call retry to resolve the transient ZK connection issue. (#970)
96ebb27 is described below

commit 96ebb27c23004a7a69dc4799b14586ff82d53c9e
Author: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com>
AuthorDate: Mon May 4 12:36:13 2020 -0700

    Add async call retry to resolve the transient ZK connection issue. (#970)
    
    If any exceptions happen during the async call, the current design will fail the operation
and may eventually return a partial result.
    This change makes the ZkClient retry operation if the error is because of a temporary
ZK connection issue (CONNECTIONLOSS, SESSIONEXPIRED, SESSIONMOVED).
    So the async call has a larger chance to finish the operation. Note that if the exception
is due to business logic, the async call will still fail and the right return code will be
sent to the callback handler.
---
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  96 +++--
 .../callback/CancellableZkAsyncCallback.java       |   8 +
 .../callback/ZkAsyncCallMonitorContext.java        |  46 +++
 .../zkclient/callback/ZkAsyncCallbacks.java        | 153 +++++---
 .../zkclient/callback/ZkAsyncRetryCallContext.java |  49 +++
 .../zkclient/callback/ZkAsyncRetryThread.java      |  57 +++
 .../apache/helix/zookeeper/impl/ZkTestBase.java    |   2 +-
 .../impl/client/TestZkClientAsyncRetry.java        | 405 +++++++++++++++++++++
 8 files changed, 735 insertions(+), 81 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 562143f..89f9e32 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -27,7 +27,10 @@ import javax.management.JMException;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryThread;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
@@ -96,6 +99,10 @@ public class ZkClient implements Watcher {
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private ZkClientMonitor _monitor;
 
+  // To automatically retry the async operation, we need a separate thread other than the
+  // ZkEventThread. Otherwise the retry request might block the normal event processing.
+  protected final ZkAsyncRetryThread _asyncCallRetryThread;
+
   private class IZkDataListenerEntry {
     final IZkDataListener _dataListener;
     final boolean _prefetchData;
@@ -183,6 +190,9 @@ public class ZkClient implements Watcher {
     _operationRetryTimeoutInMillis = operationRetryTimeout;
     _isNewSessionEventFired = false;
 
+    _asyncCallRetryThread = new ZkAsyncRetryThread(zkConnection.getServers());
+    _asyncCallRetryThread.start();
+
     connect(connectionTimeout, this);
 
     // initiate monitor
@@ -1736,15 +1746,23 @@ public class ZkClient implements Watcher {
       data = (datat == null ? null : serialize(datat, path));
     } catch (ZkMarshallingError e) {
       cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
-          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
       return;
     }
+    doAsyncCreate(path, data, mode, startT, cb);
+  }
+
+  private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
+      final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb) {
     retryUntilConnected(() -> {
       ((ZkConnection) getConnection()).getZookeeper()
-          .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-              // Arrays.asList(DEFAULT_ACL),
-              mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                  data == null ? 0 : data.length, false));
+          .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
+              new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0,
false) {
+                @Override
+                protected void doRetry() {
+                  doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb);
+                }
+              });
       return null;
     });
   }
@@ -1758,50 +1776,66 @@ public class ZkClient implements Watcher {
       data = serialize(datat, path);
     } catch (ZkMarshallingError e) {
       cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
-          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
       return;
     }
+    doAsyncSetData(path, data, version, startT, cb);
+  }
+
+  private void doAsyncSetData(final String path, byte[] data, final int version, final long
startT,
+      final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
     retryUntilConnected(() -> {
       ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
-          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-              data == null ? 0 : data.length, false));
+          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT,
+              data == null ? 0 : data.length, false) {
+            @Override
+            protected void doRetry() {
+              doAsyncSetData(path, data, version, System.currentTimeMillis(), cb);
+            }
+          });
       return null;
     });
   }
 
   public void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler
cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
-        return null;
-      }
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
+          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true)
{
+            @Override
+            protected void doRetry() {
+              asyncGetData(path, cb);
+            }
+          });
+      return null;
     });
   }
 
   public void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler
cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
-        return null;
-      }
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
+          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true)
{
+            @Override
+            protected void doRetry() {
+              asyncExists(path, cb);
+            }
+          });
+      return null;
     });
   }
 
   public void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler
cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false));
-        return null;
-      }
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
+          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false)
{
+            @Override
+            protected void doRetry() {
+              asyncDelete(path, cb);
+            }
+          });
+      return null;
     });
   }
 
@@ -1955,6 +1989,10 @@ public class ZkClient implements Watcher {
         return;
       }
       setShutdownTrigger(true);
+      if (_asyncCallRetryThread != null) {
+        _asyncCallRetryThread.interrupt();
+        _asyncCallRetryThread.join(2000);
+      }
       _eventThread.interrupt();
       _eventThread.join(2000);
       if (isManagingZkConnection()) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
new file mode 100644
index 0000000..27d92e8
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
@@ -0,0 +1,8 @@
+package org.apache.helix.zookeeper.zkclient.callback;
+
+public interface CancellableZkAsyncCallback {
+  /**
+   * Notify all the callers that are waiting for the callback to cancel the wait.
+   */
+  void notifyCallers();
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
new file mode 100644
index 0000000..bf2fd44
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
@@ -0,0 +1,46 @@
+package org.apache.helix.zookeeper.zkclient.callback;
+
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+
+public class ZkAsyncCallMonitorContext {
+  private final long _startTimeMilliSec;
+  private final ZkClientMonitor _monitor;
+  private final boolean _isRead;
+  private int _bytes;
+
+  /**
+   * @param monitor           ZkClient monitor for update the operation result.
+   * @param startTimeMilliSec Operation initialization time.
+   * @param bytes             The data size in bytes that is involved in the operation.
+   * @param isRead            True if the operation is readonly.
+   */
+  public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec,
int bytes,
+      boolean isRead) {
+    _monitor = monitor;
+    _startTimeMilliSec = startTimeMilliSec;
+    _bytes = bytes;
+    _isRead = isRead;
+  }
+
+  /**
+   * Update the operated data size in bytes.
+   * @param bytes
+   */
+  void setBytes(int bytes) {
+    _bytes = bytes;
+  }
+
+  /**
+   * Record the operation result into the specified ZkClient monitor.
+   * @param path
+   */
+  void recordAccess(String path) {
+    if (_monitor != null) {
+      if (_isRead) {
+        _monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.READ);
+      } else {
+        _monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.WRITE);
+      }
+    }
+  }
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 04c4058..70dbab4 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -31,41 +31,35 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ZkAsyncCallbacks {
   private static Logger LOG = LoggerFactory.getLogger(ZkAsyncCallbacks.class);
+  public static final int UNKNOWN_RET_CODE = 255;
 
   public static class GetDataCallbackHandler extends DefaultCallback implements DataCallback
{
     public byte[] _data;
     public Stat _stat;
 
     @Override
-    public void handle() {
-      // TODO Auto-generated method stub
-    }
-
-    @Override
     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
       if (rc == 0) {
         _data = data;
         _stat = stat;
         // update ctx with data size
-        if (_data != null && ctx != null && ctx instanceof ZkAsyncCallContext)
{
-          ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx;
-          zkCtx._bytes = _data.length;
+        if (_data != null && ctx != null && ctx instanceof ZkAsyncCallMonitorContext)
{
+          ((ZkAsyncCallMonitorContext) ctx).setBytes(_data.length);
         }
       }
       callback(rc, path, ctx);
     }
-  }
-
-  public static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
{
-    Stat _stat;
 
     @Override
     public void handle() {
       // TODO Auto-generated method stub
     }
+  }
+
+  public static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
{
+    Stat _stat;
 
     @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
@@ -78,15 +72,15 @@ public class ZkAsyncCallbacks {
     public Stat getStat() {
       return _stat;
     }
-  }
-
-  public static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
{
-    public Stat _stat;
 
     @Override
     public void handle() {
       // TODO Auto-generated method stub
     }
+  }
+
+  public static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
{
+    public Stat _stat;
 
     @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
@@ -95,6 +89,11 @@ public class ZkAsyncCallbacks {
       }
       callback(rc, path, ctx);
     }
+
+    @Override
+    public void handle() {
+      // TODO Auto-generated method stub
+    }
   }
 
   public static class CreateCallbackHandler extends DefaultCallback implements StringCallback
{
@@ -122,44 +121,66 @@ public class ZkAsyncCallbacks {
   }
 
   /**
-   * Default callback for zookeeper async api
+   * Default callback for zookeeper async api.
    */
-  public static abstract class DefaultCallback {
-    AtomicBoolean _lock = new AtomicBoolean(false);
-    int _rc = -1;
+  public static abstract class DefaultCallback implements CancellableZkAsyncCallback {
+    AtomicBoolean _isOperationDone = new AtomicBoolean(false);
+    int _rc = UNKNOWN_RET_CODE;
 
     public void callback(int rc, String path, Object ctx) {
       if (rc != 0 && LOG.isDebugEnabled()) {
         LOG.debug(this + ", rc:" + Code.get(rc) + ", path: " + path);
       }
 
-      if (ctx != null && ctx instanceof ZkAsyncCallContext) {
-        ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx;
-        if (zkCtx._monitor != null) {
-          if (zkCtx._isRead) {
-            zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec,
-                ZkClientMonitor.AccessType.READ);
-          } else {
-            zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec,
-                ZkClientMonitor.AccessType.WRITE);
-          }
-        }
+      if (ctx != null && ctx instanceof ZkAsyncCallMonitorContext) {
+        ((ZkAsyncCallMonitorContext) ctx).recordAccess(path);
       }
 
       _rc = rc;
-      handle();
 
-      synchronized (_lock) {
-        _lock.set(true);
-        _lock.notify();
+      // If retry is requested by passing the retry callback context, do retry if necessary.
+      if (needRetry(rc)) {
+        if (ctx != null && ctx instanceof ZkAsyncRetryCallContext) {
+          try {
+            if (((ZkAsyncRetryCallContext) ctx).requestRetry()) {
+              // The retry operation will be done asynchronously. Once it is done, the same
callback
+              // handler object shall be triggered to ensure the result is notified to the
right
+              // caller(s).
+              return;
+            } else {
+              LOG.warn(
+                  "Cannot request to retry the operation. The retry request thread may have
been stopped.");
+            }
+          } catch (Throwable t) {
+            LOG.error("Failed to request to retry the operation.", t);
+          }
+        } else {
+          LOG.warn(
+              "The provided callback context {} is not ZkAsyncRetryCallContext. Skip retrying.",
+              ctx.getClass().getName());
+        }
+      }
+
+      // If operation is done successfully or no retry needed, notify the caller(s).
+      try {
+        handle();
+      } finally {
+        markOperationDone();
       }
     }
 
+    public boolean isOperationDone() {
+      return _isOperationDone.get();
+    }
+
+    /**
+     * The blocking call that return true once the operation has been completed without retrying.
+     */
     public boolean waitForSuccess() {
       try {
-        synchronized (_lock) {
-          while (!_lock.get()) {
-            _lock.wait();
+        synchronized (_isOperationDone) {
+          while (!_isOperationDone.get()) {
+            _isOperationDone.wait();
           }
         }
       } catch (InterruptedException e) {
@@ -172,22 +193,52 @@ public class ZkAsyncCallbacks {
       return _rc;
     }
 
+    @Override
+    public void notifyCallers() {
+      LOG.warn("The callback {} has been cancelled.", this);
+      markOperationDone();
+    }
+
+    /**
+     * Additional callback handling.
+     */
     abstract public void handle();
-  }
 
-  public static class ZkAsyncCallContext {
-    private long _startTimeMilliSec;
-    private int _bytes;
-    private ZkClientMonitor _monitor;
-    private boolean _isRead;
+    private void markOperationDone() {
+      synchronized (_isOperationDone) {
+        _isOperationDone.set(true);
+        _isOperationDone.notifyAll();
+      }
+    }
 
-    public ZkAsyncCallContext(final ZkClientMonitor monitor, long startTimeMilliSec, int
bytes,
-        boolean isRead) {
-      _monitor = monitor;
-      _startTimeMilliSec = startTimeMilliSec;
-      _bytes = bytes;
-      _isRead = isRead;
+    /**
+     * @param rc the return code
+     * @return true if the error is transient and the operation may succeed when being retried.
+     */
+    private boolean needRetry(int rc) {
+      try {
+        switch (Code.get(rc)) {
+        /** Connection to the server has been lost */
+        case CONNECTIONLOSS:
+          /** The session has been expired by the server */
+        case SESSIONEXPIRED:
+          /** Session moved to another server, so operation is ignored */
+        case SESSIONMOVED:
+          return true;
+        default:
+          return false;
+        }
+      } catch (ClassCastException | NullPointerException ex) {
+        LOG.error("Failed to handle unknown return code {}. Skip retrying.", rc, ex);
+        return false;
+      }
     }
   }
 
+  @Deprecated
+  public static class ZkAsyncCallContext extends ZkAsyncCallMonitorContext {
+    ZkAsyncCallContext(ZkClientMonitor monitor, long startTimeMilliSec, int bytes, boolean
isRead) {
+      super(monitor, startTimeMilliSec, bytes, isRead);
+    }
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
new file mode 100644
index 0000000..4a9402f
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
@@ -0,0 +1,49 @@
+package org.apache.helix.zookeeper.zkclient.callback;
+
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ZkAsyncRetryCallContext extends ZkAsyncCallMonitorContext {
+  private static Logger LOG = LoggerFactory.getLogger(ZkAsyncRetryCallContext.class);
+  private final ZkAsyncRetryThread _retryThread;
+  private final CancellableZkAsyncCallback _cancellableCallback;
+
+  /**
+   * @param retryThread       The thread that executes the retry operation.
+   *                          Note that retry in the ZkEventThread is not allowed to avoid
dead lock.
+   * @param callback          Cancellable asynchronous callback to notify when the retry
is cancelled.
+   * @param monitor           ZkClient monitor for update the operation result.
+   * @param startTimeMilliSec Operation initialization time.
+   * @param bytes             The data size in bytes that is involved in the operation.
+   * @param isRead            True if the operation is readonly.
+   */
+  public ZkAsyncRetryCallContext(final ZkAsyncRetryThread retryThread,
+      final CancellableZkAsyncCallback callback, final ZkClientMonitor monitor,
+      long startTimeMilliSec, int bytes, boolean isRead) {
+    super(monitor, startTimeMilliSec, bytes, isRead);
+    _retryThread = retryThread;
+    _cancellableCallback = callback;
+  }
+
+  /**
+   * Request a retry.
+   *
+   * @return True if the request was sent successfully.
+   */
+  boolean requestRetry() {
+    return _retryThread.sendRetryRequest(this);
+  }
+
+  /**
+   * Notify the pending callback that retry has been cancelled.
+   */
+  void cancel() {
+    _cancellableCallback.notifyCallers();
+  }
+
+  /**
+   * The actual retry operation logic.
+   */
+  protected abstract void doRetry() throws Exception;
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
new file mode 100644
index 0000000..c59d423
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
@@ -0,0 +1,57 @@
+package org.apache.helix.zookeeper.zkclient.callback;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkAsyncRetryThread extends Thread {
+  private static Logger LOG = LoggerFactory.getLogger(ZkAsyncRetryThread.class);
+  private BlockingQueue<ZkAsyncRetryCallContext> _retryContexts = new LinkedBlockingQueue<>();
+  private volatile boolean _isReady = true;
+
+  public ZkAsyncRetryThread(String name) {
+    setDaemon(true);
+    setName("ZkClient-AsyncCallback-Retry-" + getId() + "-" + name);
+  }
+
+  @Override
+  public void run() {
+    LOG.info("Starting ZkClient AsyncCallback retry thread.");
+    try {
+      while (!isInterrupted()) {
+        ZkAsyncRetryCallContext context = _retryContexts.take();
+        try {
+          context.doRetry();
+        } catch (InterruptedException | ZkInterruptedException e) {
+          // if interrupted, stop retrying and interrupt the thread.
+          context.cancel();
+          interrupt();
+        } catch (Throwable e) {
+          LOG.error("Error retrying callback " + context, e);
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.info("ZkClient AsyncCallback retry thread is interrupted.");
+    }
+    synchronized (this) {
+      // Mark ready to be false, so no new requests will be sent.
+      _isReady = false;
+      // Notify to all the callers waiting for the result.
+      for (ZkAsyncRetryCallContext context : _retryContexts) {
+        context.cancel();
+      }
+    }
+    LOG.info("Terminate ZkClient AsyncCallback retry thread.");
+  }
+
+  synchronized boolean sendRetryRequest(ZkAsyncRetryCallContext context) {
+    if (_isReady) {
+      _retryContexts.add(context);
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
index 51eda80..2b8b1b3 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
@@ -58,7 +58,7 @@ public class ZkTestBase {
    * Multiple ZK references
    */
   // The following maps hold ZK connect string as keys
-  protected final Map<String, ZkServer> _zkServerMap = new HashMap<>();
+  protected static final Map<String, ZkServer> _zkServerMap = new HashMap<>();
   protected static int _numZk = 1; // Initial value
 
   @BeforeSuite
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
new file mode 100644
index 0000000..4e5b06f
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
@@ -0,0 +1,405 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.UNKNOWN_RET_CODE;
+import static org.apache.zookeeper.KeeperException.Code.CONNECTIONLOSS;
+
+/**
+ * Note this is a whitebox test to test the async operation callback/context.
+ * We don't have a good way to simulate an async ZK operation failure in the server side
yet.
+ */
+public class TestZkClientAsyncRetry extends ZkTestBase {
+  private final String TEST_ROOT = String.format("/%s", getClass().getSimpleName());
+  private final String NODE_PATH = TEST_ROOT + "/async";
+
+  private org.apache.helix.zookeeper.zkclient.ZkClient _zkClient;
+  private String _zkServerAddress;
+
+  @BeforeClass
+  public void beforeClass() {
+    _zkClient = _zkServerMap.values().iterator().next().getZkClient();
+    _zkServerAddress = _zkClient.getServers();
+    _zkClient.createPersistent(TEST_ROOT);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _zkClient.deleteRecursively(TEST_ROOT);
+    _zkClient.close();
+  }
+
+  private boolean waitAsyncOperation(ZkAsyncCallbacks.DefaultCallback callback, long timeout)
{
+    final boolean[] ret = { false };
+    Thread waitThread = new Thread(() -> ret[0] = callback.waitForSuccess());
+    waitThread.start();
+    try {
+      waitThread.join(timeout);
+      waitThread.interrupt();
+      return ret[0];
+    } catch (InterruptedException e) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testAsyncRetryCategories() {
+    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+    try {
+      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+      tmpRecord.setSimpleField("foo", "bar");
+      // Loop all possible error codes to test async create.
+      // Only connectivity issues will be retried, the other issues will be return error
immediately.
+      for (KeeperException.Code code : KeeperException.Code.values()) {
+        if (code == KeeperException.Code.OK) {
+          continue;
+        }
+        ZkAsyncCallbacks.CreateCallbackHandler createCallback =
+            new ZkAsyncCallbacks.CreateCallbackHandler();
+        Assert.assertEquals(createCallback.getRc(), UNKNOWN_RET_CODE);
+        testZkClient.setAsyncCallRC(code.intValue());
+        if (code == CONNECTIONLOSS || code == KeeperException.Code.SESSIONEXPIRED
+            || code == KeeperException.Code.SESSIONMOVED) {
+          // Async create will be pending due to the mock error rc is retryable.
+          testZkClient.asyncCreate(NODE_PATH, null, CreateMode.PERSISTENT, createCallback);
+          Assert.assertFalse(createCallback.isOperationDone());
+          Assert.assertEquals(createCallback.getRc(), code.intValue());
+          // Change the mock response
+          testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+          // Async retry will succeed now. Wait until the operation is successfully done
and verify.
+          Assert.assertTrue(waitAsyncOperation(createCallback, 1000));
+          Assert.assertEquals(createCallback.getRc(), KeeperException.Code.OK.intValue());
+          Assert.assertTrue(testZkClient.exists(NODE_PATH));
+          Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+        } else {
+          // Async create will fail due to the mock error rc is not recoverable.
+          testZkClient.asyncCreate(NODE_PATH, null, CreateMode.PERSISTENT, createCallback);
+          Assert.assertTrue(waitAsyncOperation(createCallback, 1000));
+          Assert.assertEquals(createCallback.getRc(), code.intValue());
+          Assert.assertEquals(testZkClient.getAndResetRetryCount(), 0);
+        }
+        testZkClient.delete(NODE_PATH);
+        Assert.assertFalse(testZkClient.exists(NODE_PATH));
+      }
+    } finally {
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      testZkClient.close();
+      _zkClient.delete(NODE_PATH);
+    }
+  }
+
+  @Test(dependsOnMethods = "testAsyncRetryCategories")
+  public void testAsyncWriteRetry() {
+    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+    try {
+      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+      tmpRecord.setSimpleField("foo", "bar");
+      testZkClient.createPersistent(NODE_PATH, tmpRecord);
+
+      // 1. Test async set retry
+      ZkAsyncCallbacks.SetDataCallbackHandler setCallback =
+          new ZkAsyncCallbacks.SetDataCallbackHandler();
+      Assert.assertEquals(setCallback.getRc(), UNKNOWN_RET_CODE);
+
+      tmpRecord.setSimpleField("test", "data");
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // Async set will be pending due to the mock error rc is retryable.
+      testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback);
+      Assert.assertFalse(setCallback.isOperationDone());
+      Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue());
+      // Change the mock return code.
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      // Async retry will succeed now. Wait until the operation is successfully done and
verify.
+      Assert.assertTrue(waitAsyncOperation(setCallback, 1000));
+      Assert.assertEquals(setCallback.getRc(), KeeperException.Code.OK.intValue());
+      Assert.assertEquals(((ZNRecord) testZkClient.readData(NODE_PATH)).getSimpleField("test"),
+          "data");
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+
+      // 2. Test async delete
+      ZkAsyncCallbacks.DeleteCallbackHandler deleteCallback =
+          new ZkAsyncCallbacks.DeleteCallbackHandler();
+      Assert.assertEquals(deleteCallback.getRc(), UNKNOWN_RET_CODE);
+
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // Async delete will be pending due to the mock error rc is retryable.
+      testZkClient.asyncDelete(NODE_PATH, deleteCallback);
+      Assert.assertFalse(deleteCallback.isOperationDone());
+      Assert.assertEquals(deleteCallback.getRc(), CONNECTIONLOSS.intValue());
+      // Change the mock return code.
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      // Async retry will succeed now. Wait until the operation is successfully done and
verify.
+      Assert.assertTrue(waitAsyncOperation(deleteCallback, 1000));
+      Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.OK.intValue());
+      Assert.assertFalse(testZkClient.exists(NODE_PATH));
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+    } finally {
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      testZkClient.close();
+      _zkClient.delete(NODE_PATH);
+    }
+  }
+
+  @Test(dependsOnMethods = "testAsyncWriteRetry")
+  public void testAsyncReadRetry() {
+    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+    try {
+      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+      tmpRecord.setSimpleField("foo", "bar");
+      testZkClient.createPersistent(NODE_PATH, tmpRecord);
+
+      // 1. Test async exist check
+      ZkAsyncCallbacks.ExistsCallbackHandler existsCallback =
+          new ZkAsyncCallbacks.ExistsCallbackHandler();
+      Assert.assertEquals(existsCallback.getRc(), UNKNOWN_RET_CODE);
+
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // Async exist check will be pending due to the mock error rc is retryable.
+      testZkClient.asyncExists(NODE_PATH, existsCallback);
+      Assert.assertFalse(existsCallback.isOperationDone());
+      Assert.assertEquals(existsCallback.getRc(), CONNECTIONLOSS.intValue());
+      // Change the mock return code.
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      // Async retry will succeed now. Wait until the operation is successfully done and
verify.
+      Assert.assertTrue(waitAsyncOperation(existsCallback, 1000));
+      Assert.assertEquals(existsCallback.getRc(), KeeperException.Code.OK.intValue());
+      Assert.assertTrue(existsCallback._stat != null);
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+
+      // 2. Test async get
+      ZkAsyncCallbacks.GetDataCallbackHandler getCallback =
+          new ZkAsyncCallbacks.GetDataCallbackHandler();
+      Assert.assertEquals(getCallback.getRc(), UNKNOWN_RET_CODE);
+
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // Async get will be pending due to the mock error rc is retryable.
+      testZkClient.asyncGetData(NODE_PATH, getCallback);
+      Assert.assertFalse(getCallback.isOperationDone());
+      Assert.assertEquals(getCallback.getRc(), CONNECTIONLOSS.intValue());
+      // Change the mock return code.
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      // Async retry will succeed now. Wait until the operation is successfully done and
verify.
+      Assert.assertTrue(waitAsyncOperation(getCallback, 1000));
+      Assert.assertEquals(getCallback.getRc(), KeeperException.Code.OK.intValue());
+      ZNRecord record = testZkClient.deserialize(getCallback._data, NODE_PATH);
+      Assert.assertEquals(record.getSimpleField("foo"), "bar");
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+    } finally {
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      testZkClient.close();
+      _zkClient.delete(NODE_PATH);
+    }
+  }
+
+  @Test(dependsOnMethods = "testAsyncReadRetry")
+  public void testAsyncRequestCleanup() {
+    int cbCount = 10;
+    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+    try {
+      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+      tmpRecord.setSimpleField("foo", "bar");
+      testZkClient.createPersistent(NODE_PATH, tmpRecord);
+
+      // Create 10 async exists check requests
+      ZkAsyncCallbacks.ExistsCallbackHandler[] existsCallbacks =
+          new ZkAsyncCallbacks.ExistsCallbackHandler[cbCount];
+      for (int i = 0; i < cbCount; i++) {
+        existsCallbacks[i] = new ZkAsyncCallbacks.ExistsCallbackHandler();
+      }
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // All async exist check calls will be pending due to the mock error rc is retryable.
+      for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
+        testZkClient.asyncExists(NODE_PATH, cb);
+        Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
+      }
+      // Wait for a while, no callback finishes
+      Assert.assertFalse(waitAsyncOperation(existsCallbacks[0], 1000));
+      for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
+        Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
+        Assert.assertFalse(cb.isOperationDone());
+      }
+      testZkClient.close();
+      // All callback retry will be cancelled because the zkclient is closed.
+      for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
+        Assert.assertTrue(waitAsyncOperation(cb, 1000));
+        Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
+      }
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+    } finally {
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      testZkClient.close();
+      _zkClient.delete(NODE_PATH);
+    }
+  }
+
+  /**
+   * Mock client to whitebox test async functionality.
+   */
+  class MockAsyncZkClient extends ZkClient {
+    private static final long RETRY_INTERVAL_MS = 500;
+    private long _retryCount = 0;
+
+    /**
+     * If the specified return code is OK, call the real function.
+     * Otherwise, trigger the callback with the specified RC without triggering the real
ZK call.
+     */
+    private int _asyncCallRetCode = KeeperException.Code.OK.intValue();
+
+    public MockAsyncZkClient(String zkAddress) {
+      super(zkAddress);
+      setZkSerializer(new ZNRecordSerializer());
+    }
+
+    public void setAsyncCallRC(int rc) {
+      _asyncCallRetCode = rc;
+    }
+
+    public long getAndResetRetryCount() {
+      long tmpCount = _retryCount;
+      _retryCount = 0;
+      return tmpCount;
+    }
+
+    @Override
+    public void asyncCreate(String path, Object datat, CreateMode mode,
+        ZkAsyncCallbacks.CreateCallbackHandler cb) {
+      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
+        super.asyncCreate(path, datat, mode, cb);
+        return;
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
+              @Override
+              protected void doRetry() {
+                _retryCount++;
+                try {
+                  Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                  throw new ZkInterruptedException(e);
+                }
+                asyncCreate(path, datat, mode, cb);
+              }
+            }, null);
+      }
+    }
+
+    @Override
+    public void asyncSetData(String path, Object datat, int version,
+        ZkAsyncCallbacks.SetDataCallbackHandler cb) {
+      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
+        super.asyncSetData(path, datat, version, cb);
+        return;
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
+              @Override
+              protected void doRetry() {
+                _retryCount++;
+                try {
+                  Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                  throw new ZkInterruptedException(e);
+                }
+                asyncSetData(path, datat, version, cb);
+              }
+            }, null);
+      }
+    }
+
+    @Override
+    public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) {
+      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
+        super.asyncGetData(path, cb);
+        return;
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, true) {
+              @Override
+              protected void doRetry() {
+                _retryCount++;
+                try {
+                  Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                  throw new ZkInterruptedException(e);
+                }
+                asyncGetData(path, cb);
+              }
+            }, null, null);
+      }
+    }
+
+    @Override
+    public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) {
+      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
+        super.asyncExists(path, cb);
+        return;
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, true) {
+              @Override
+              protected void doRetry() {
+                _retryCount++;
+                try {
+                  Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                  throw new ZkInterruptedException(e);
+                }
+                asyncExists(path, cb);
+              }
+            }, null);
+      }
+    }
+
+    @Override
+    public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) {
+      if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
+        super.asyncDelete(path, cb);
+        return;
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
+              @Override
+              protected void doRetry() {
+                _retryCount++;
+                try {
+                  Thread.sleep(RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                  throw new ZkInterruptedException(e);
+                }
+                asyncDelete(path, cb);
+              }
+            });
+      }
+    }
+  }
+}


Mime
View raw message