This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 370e277966f75a7fba45f5b96f7608c127b2905c
Author: Junkai Xue <jxue@linkedin.com>
AuthorDate: Mon May 4 13:02:13 2020 -0700
Revert "Add async call retry to resolve the transient ZK connection issue. (#970)"
This reverts commit 96ebb27c23004a7a69dc4799b14586ff82d53c9e.
---
.../apache/helix/zookeeper/zkclient/ZkClient.java | 96 ++---
.../callback/CancellableZkAsyncCallback.java | 8 -
.../callback/ZkAsyncCallMonitorContext.java | 46 ---
.../zkclient/callback/ZkAsyncCallbacks.java | 153 +++-----
.../zkclient/callback/ZkAsyncRetryCallContext.java | 49 ---
.../zkclient/callback/ZkAsyncRetryThread.java | 57 ---
.../apache/helix/zookeeper/impl/ZkTestBase.java | 2 +-
.../impl/client/TestZkClientAsyncRetry.java | 405 ---------------------
8 files changed, 81 insertions(+), 735 deletions(-)
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 89f9e32..562143f 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -27,10 +27,7 @@ import javax.management.JMException;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryThread;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
@@ -99,10 +96,6 @@ public class ZkClient implements Watcher {
private PathBasedZkSerializer _pathBasedZkSerializer;
private ZkClientMonitor _monitor;
- // To automatically retry the async operation, we need a separate thread other than the
- // ZkEventThread. Otherwise the retry request might block the normal event processing.
- protected final ZkAsyncRetryThread _asyncCallRetryThread;
-
private class IZkDataListenerEntry {
final IZkDataListener _dataListener;
final boolean _prefetchData;
@@ -190,9 +183,6 @@ public class ZkClient implements Watcher {
_operationRetryTimeoutInMillis = operationRetryTimeout;
_isNewSessionEventFired = false;
- _asyncCallRetryThread = new ZkAsyncRetryThread(zkConnection.getServers());
- _asyncCallRetryThread.start();
-
connect(connectionTimeout, this);
// initiate monitor
@@ -1746,23 +1736,15 @@ public class ZkClient implements Watcher {
data = (datat == null ? null : serialize(datat, path));
} catch (ZkMarshallingError e) {
cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
- new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
return;
}
- doAsyncCreate(path, data, mode, startT, cb);
- }
-
- private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
- final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb) {
retryUntilConnected(() -> {
((ZkConnection) getConnection()).getZookeeper()
- .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0,
false) {
- @Override
- protected void doRetry() {
- doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb);
- }
- });
+ .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ // Arrays.asList(DEFAULT_ACL),
+ mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+ data == null ? 0 : data.length, false));
return null;
});
}
@@ -1776,66 +1758,50 @@ public class ZkClient implements Watcher {
data = serialize(datat, path);
} catch (ZkMarshallingError e) {
cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
- new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
return;
}
- doAsyncSetData(path, data, version, startT, cb);
- }
-
- private void doAsyncSetData(final String path, byte[] data, final int version, final long
startT,
- final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
retryUntilConnected(() -> {
((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT,
- data == null ? 0 : data.length, false) {
- @Override
- protected void doRetry() {
- doAsyncSetData(path, data, version, System.currentTimeMillis(), cb);
- }
- });
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+ data == null ? 0 : data.length, false));
return null;
});
}
public void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler
cb) {
final long startT = System.currentTimeMillis();
- retryUntilConnected(() -> {
- ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true)
{
- @Override
- protected void doRetry() {
- asyncGetData(path, cb);
- }
- });
- return null;
+ retryUntilConnected(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
+ return null;
+ }
});
}
public void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler
cb) {
final long startT = System.currentTimeMillis();
- retryUntilConnected(() -> {
- ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true)
{
- @Override
- protected void doRetry() {
- asyncExists(path, cb);
- }
- });
- return null;
+ retryUntilConnected(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
+ return null;
+ }
});
}
public void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler
cb) {
final long startT = System.currentTimeMillis();
- retryUntilConnected(() -> {
- ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false)
{
- @Override
- protected void doRetry() {
- asyncDelete(path, cb);
- }
- });
- return null;
+ retryUntilConnected(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false));
+ return null;
+ }
});
}
@@ -1989,10 +1955,6 @@ public class ZkClient implements Watcher {
return;
}
setShutdownTrigger(true);
- if (_asyncCallRetryThread != null) {
- _asyncCallRetryThread.interrupt();
- _asyncCallRetryThread.join(2000);
- }
_eventThread.interrupt();
_eventThread.join(2000);
if (isManagingZkConnection()) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
deleted file mode 100644
index 27d92e8..0000000
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/CancellableZkAsyncCallback.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.helix.zookeeper.zkclient.callback;
-
-public interface CancellableZkAsyncCallback {
- /**
- * Notify all the callers that are waiting for the callback to cancel the wait.
- */
- void notifyCallers();
-}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
deleted file mode 100644
index bf2fd44..0000000
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.apache.helix.zookeeper.zkclient.callback;
-
-import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
-
-public class ZkAsyncCallMonitorContext {
- private final long _startTimeMilliSec;
- private final ZkClientMonitor _monitor;
- private final boolean _isRead;
- private int _bytes;
-
- /**
- * @param monitor ZkClient monitor for update the operation result.
- * @param startTimeMilliSec Operation initialization time.
- * @param bytes The data size in bytes that is involved in the operation.
- * @param isRead True if the operation is readonly.
- */
- public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec,
int bytes,
- boolean isRead) {
- _monitor = monitor;
- _startTimeMilliSec = startTimeMilliSec;
- _bytes = bytes;
- _isRead = isRead;
- }
-
- /**
- * Update the operated data size in bytes.
- * @param bytes
- */
- void setBytes(int bytes) {
- _bytes = bytes;
- }
-
- /**
- * Record the operation result into the specified ZkClient monitor.
- * @param path
- */
- void recordAccess(String path) {
- if (_monitor != null) {
- if (_isRead) {
- _monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.READ);
- } else {
- _monitor.record(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.WRITE);
- }
- }
- }
-}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 70dbab4..04c4058 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -31,35 +31,41 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class ZkAsyncCallbacks {
private static Logger LOG = LoggerFactory.getLogger(ZkAsyncCallbacks.class);
- public static final int UNKNOWN_RET_CODE = 255;
public static class GetDataCallbackHandler extends DefaultCallback implements DataCallback
{
public byte[] _data;
public Stat _stat;
@Override
+ public void handle() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (rc == 0) {
_data = data;
_stat = stat;
// update ctx with data size
- if (_data != null && ctx != null && ctx instanceof ZkAsyncCallMonitorContext)
{
- ((ZkAsyncCallMonitorContext) ctx).setBytes(_data.length);
+ if (_data != null && ctx != null && ctx instanceof ZkAsyncCallContext)
{
+ ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx;
+ zkCtx._bytes = _data.length;
}
}
callback(rc, path, ctx);
}
+ }
+
+ public static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
{
+ Stat _stat;
@Override
public void handle() {
// TODO Auto-generated method stub
}
- }
-
- public static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
{
- Stat _stat;
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
@@ -72,15 +78,15 @@ public class ZkAsyncCallbacks {
public Stat getStat() {
return _stat;
}
+ }
+
+ public static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
{
+ public Stat _stat;
@Override
public void handle() {
// TODO Auto-generated method stub
}
- }
-
- public static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
{
- public Stat _stat;
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
@@ -89,11 +95,6 @@ public class ZkAsyncCallbacks {
}
callback(rc, path, ctx);
}
-
- @Override
- public void handle() {
- // TODO Auto-generated method stub
- }
}
public static class CreateCallbackHandler extends DefaultCallback implements StringCallback
{
@@ -121,66 +122,44 @@ public class ZkAsyncCallbacks {
}
/**
- * Default callback for zookeeper async api.
+ * Default callback for zookeeper async api
*/
- public static abstract class DefaultCallback implements CancellableZkAsyncCallback {
- AtomicBoolean _isOperationDone = new AtomicBoolean(false);
- int _rc = UNKNOWN_RET_CODE;
+ public static abstract class DefaultCallback {
+ AtomicBoolean _lock = new AtomicBoolean(false);
+ int _rc = -1;
public void callback(int rc, String path, Object ctx) {
if (rc != 0 && LOG.isDebugEnabled()) {
LOG.debug(this + ", rc:" + Code.get(rc) + ", path: " + path);
}
- if (ctx != null && ctx instanceof ZkAsyncCallMonitorContext) {
- ((ZkAsyncCallMonitorContext) ctx).recordAccess(path);
- }
-
- _rc = rc;
-
- // If retry is requested by passing the retry callback context, do retry if necessary.
- if (needRetry(rc)) {
- if (ctx != null && ctx instanceof ZkAsyncRetryCallContext) {
- try {
- if (((ZkAsyncRetryCallContext) ctx).requestRetry()) {
- // The retry operation will be done asynchronously. Once it is done, the same
callback
- // handler object shall be triggered to ensure the result is notified to the
right
- // caller(s).
- return;
- } else {
- LOG.warn(
- "Cannot request to retry the operation. The retry request thread may have
been stopped.");
- }
- } catch (Throwable t) {
- LOG.error("Failed to request to retry the operation.", t);
+ if (ctx != null && ctx instanceof ZkAsyncCallContext) {
+ ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx;
+ if (zkCtx._monitor != null) {
+ if (zkCtx._isRead) {
+ zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec,
+ ZkClientMonitor.AccessType.READ);
+ } else {
+ zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec,
+ ZkClientMonitor.AccessType.WRITE);
}
- } else {
- LOG.warn(
- "The provided callback context {} is not ZkAsyncRetryCallContext. Skip retrying.",
- ctx.getClass().getName());
}
}
- // If operation is done successfully or no retry needed, notify the caller(s).
- try {
- handle();
- } finally {
- markOperationDone();
- }
- }
+ _rc = rc;
+ handle();
- public boolean isOperationDone() {
- return _isOperationDone.get();
+ synchronized (_lock) {
+ _lock.set(true);
+ _lock.notify();
+ }
}
- /**
- * The blocking call that return true once the operation has been completed without retrying.
- */
public boolean waitForSuccess() {
try {
- synchronized (_isOperationDone) {
- while (!_isOperationDone.get()) {
- _isOperationDone.wait();
+ synchronized (_lock) {
+ while (!_lock.get()) {
+ _lock.wait();
}
}
} catch (InterruptedException e) {
@@ -193,52 +172,22 @@ public class ZkAsyncCallbacks {
return _rc;
}
- @Override
- public void notifyCallers() {
- LOG.warn("The callback {} has been cancelled.", this);
- markOperationDone();
- }
-
- /**
- * Additional callback handling.
- */
abstract public void handle();
+ }
- private void markOperationDone() {
- synchronized (_isOperationDone) {
- _isOperationDone.set(true);
- _isOperationDone.notifyAll();
- }
- }
+ public static class ZkAsyncCallContext {
+ private long _startTimeMilliSec;
+ private int _bytes;
+ private ZkClientMonitor _monitor;
+ private boolean _isRead;
- /**
- * @param rc the return code
- * @return true if the error is transient and the operation may succeed when being retried.
- */
- private boolean needRetry(int rc) {
- try {
- switch (Code.get(rc)) {
- /** Connection to the server has been lost */
- case CONNECTIONLOSS:
- /** The session has been expired by the server */
- case SESSIONEXPIRED:
- /** Session moved to another server, so operation is ignored */
- case SESSIONMOVED:
- return true;
- default:
- return false;
- }
- } catch (ClassCastException | NullPointerException ex) {
- LOG.error("Failed to handle unknown return code {}. Skip retrying.", rc, ex);
- return false;
- }
+ public ZkAsyncCallContext(final ZkClientMonitor monitor, long startTimeMilliSec, int
bytes,
+ boolean isRead) {
+ _monitor = monitor;
+ _startTimeMilliSec = startTimeMilliSec;
+ _bytes = bytes;
+ _isRead = isRead;
}
}
- @Deprecated
- public static class ZkAsyncCallContext extends ZkAsyncCallMonitorContext {
- ZkAsyncCallContext(ZkClientMonitor monitor, long startTimeMilliSec, int bytes, boolean
isRead) {
- super(monitor, startTimeMilliSec, bytes, isRead);
- }
- }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
deleted file mode 100644
index 4a9402f..0000000
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.helix.zookeeper.zkclient.callback;
-
-import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class ZkAsyncRetryCallContext extends ZkAsyncCallMonitorContext {
- private static Logger LOG = LoggerFactory.getLogger(ZkAsyncRetryCallContext.class);
- private final ZkAsyncRetryThread _retryThread;
- private final CancellableZkAsyncCallback _cancellableCallback;
-
- /**
- * @param retryThread The thread that executes the retry operation.
- * Note that retry in the ZkEventThread is not allowed to avoid
dead lock.
- * @param callback Cancellable asynchronous callback to notify when the retry
is cancelled.
- * @param monitor ZkClient monitor for update the operation result.
- * @param startTimeMilliSec Operation initialization time.
- * @param bytes The data size in bytes that is involved in the operation.
- * @param isRead True if the operation is readonly.
- */
- public ZkAsyncRetryCallContext(final ZkAsyncRetryThread retryThread,
- final CancellableZkAsyncCallback callback, final ZkClientMonitor monitor,
- long startTimeMilliSec, int bytes, boolean isRead) {
- super(monitor, startTimeMilliSec, bytes, isRead);
- _retryThread = retryThread;
- _cancellableCallback = callback;
- }
-
- /**
- * Request a retry.
- *
- * @return True if the request was sent successfully.
- */
- boolean requestRetry() {
- return _retryThread.sendRetryRequest(this);
- }
-
- /**
- * Notify the pending callback that retry has been cancelled.
- */
- void cancel() {
- _cancellableCallback.notifyCallers();
- }
-
- /**
- * The actual retry operation logic.
- */
- protected abstract void doRetry() throws Exception;
-}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
deleted file mode 100644
index c59d423..0000000
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryThread.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.helix.zookeeper.zkclient.callback;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZkAsyncRetryThread extends Thread {
- private static Logger LOG = LoggerFactory.getLogger(ZkAsyncRetryThread.class);
- private BlockingQueue<ZkAsyncRetryCallContext> _retryContexts = new LinkedBlockingQueue<>();
- private volatile boolean _isReady = true;
-
- public ZkAsyncRetryThread(String name) {
- setDaemon(true);
- setName("ZkClient-AsyncCallback-Retry-" + getId() + "-" + name);
- }
-
- @Override
- public void run() {
- LOG.info("Starting ZkClient AsyncCallback retry thread.");
- try {
- while (!isInterrupted()) {
- ZkAsyncRetryCallContext context = _retryContexts.take();
- try {
- context.doRetry();
- } catch (InterruptedException | ZkInterruptedException e) {
- // if interrupted, stop retrying and interrupt the thread.
- context.cancel();
- interrupt();
- } catch (Throwable e) {
- LOG.error("Error retrying callback " + context, e);
- }
- }
- } catch (InterruptedException e) {
- LOG.info("ZkClient AsyncCallback retry thread is interrupted.");
- }
- synchronized (this) {
- // Mark ready to be false, so no new requests will be sent.
- _isReady = false;
- // Notify to all the callers waiting for the result.
- for (ZkAsyncRetryCallContext context : _retryContexts) {
- context.cancel();
- }
- }
- LOG.info("Terminate ZkClient AsyncCallback retry thread.");
- }
-
- synchronized boolean sendRetryRequest(ZkAsyncRetryCallContext context) {
- if (_isReady) {
- _retryContexts.add(context);
- return true;
- }
- return false;
- }
-}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
index 2b8b1b3..51eda80 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
@@ -58,7 +58,7 @@ public class ZkTestBase {
* Multiple ZK references
*/
// The following maps hold ZK connect string as keys
- protected static final Map<String, ZkServer> _zkServerMap = new HashMap<>();
+ protected final Map<String, ZkServer> _zkServerMap = new HashMap<>();
protected static int _numZk = 1; // Initial value
@BeforeSuite
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
deleted file mode 100644
index 4e5b06f..0000000
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
+++ /dev/null
@@ -1,405 +0,0 @@
-package org.apache.helix.zookeeper.impl.client;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
-import org.apache.helix.zookeeper.impl.ZkTestBase;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext;
-import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.UNKNOWN_RET_CODE;
-import static org.apache.zookeeper.KeeperException.Code.CONNECTIONLOSS;
-
-/**
- * Note this is a whitebox test to test the async operation callback/context.
- * We don't have a good way to simulate an async ZK operation failure in the server side
yet.
- */
-public class TestZkClientAsyncRetry extends ZkTestBase {
- private final String TEST_ROOT = String.format("/%s", getClass().getSimpleName());
- private final String NODE_PATH = TEST_ROOT + "/async";
-
- private org.apache.helix.zookeeper.zkclient.ZkClient _zkClient;
- private String _zkServerAddress;
-
- @BeforeClass
- public void beforeClass() {
- _zkClient = _zkServerMap.values().iterator().next().getZkClient();
- _zkServerAddress = _zkClient.getServers();
- _zkClient.createPersistent(TEST_ROOT);
- }
-
- @AfterClass
- public void afterClass() {
- _zkClient.deleteRecursively(TEST_ROOT);
- _zkClient.close();
- }
-
- private boolean waitAsyncOperation(ZkAsyncCallbacks.DefaultCallback callback, long timeout)
{
- final boolean[] ret = { false };
- Thread waitThread = new Thread(() -> ret[0] = callback.waitForSuccess());
- waitThread.start();
- try {
- waitThread.join(timeout);
- waitThread.interrupt();
- return ret[0];
- } catch (InterruptedException e) {
- return false;
- }
- }
-
- @Test
- public void testAsyncRetryCategories() {
- MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
- try {
- ZNRecord tmpRecord = new ZNRecord("tmpRecord");
- tmpRecord.setSimpleField("foo", "bar");
- // Loop all possible error codes to test async create.
- // Only connectivity issues will be retried, the other issues will be return error
immediately.
- for (KeeperException.Code code : KeeperException.Code.values()) {
- if (code == KeeperException.Code.OK) {
- continue;
- }
- ZkAsyncCallbacks.CreateCallbackHandler createCallback =
- new ZkAsyncCallbacks.CreateCallbackHandler();
- Assert.assertEquals(createCallback.getRc(), UNKNOWN_RET_CODE);
- testZkClient.setAsyncCallRC(code.intValue());
- if (code == CONNECTIONLOSS || code == KeeperException.Code.SESSIONEXPIRED
- || code == KeeperException.Code.SESSIONMOVED) {
- // Async create will be pending due to the mock error rc is retryable.
- testZkClient.asyncCreate(NODE_PATH, null, CreateMode.PERSISTENT, createCallback);
- Assert.assertFalse(createCallback.isOperationDone());
- Assert.assertEquals(createCallback.getRc(), code.intValue());
- // Change the mock response
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- // Async retry will succeed now. Wait until the operation is successfully done
and verify.
- Assert.assertTrue(waitAsyncOperation(createCallback, 1000));
- Assert.assertEquals(createCallback.getRc(), KeeperException.Code.OK.intValue());
- Assert.assertTrue(testZkClient.exists(NODE_PATH));
- Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
- } else {
- // Async create will fail due to the mock error rc is not recoverable.
- testZkClient.asyncCreate(NODE_PATH, null, CreateMode.PERSISTENT, createCallback);
- Assert.assertTrue(waitAsyncOperation(createCallback, 1000));
- Assert.assertEquals(createCallback.getRc(), code.intValue());
- Assert.assertEquals(testZkClient.getAndResetRetryCount(), 0);
- }
- testZkClient.delete(NODE_PATH);
- Assert.assertFalse(testZkClient.exists(NODE_PATH));
- }
- } finally {
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- testZkClient.close();
- _zkClient.delete(NODE_PATH);
- }
- }
-
- @Test(dependsOnMethods = "testAsyncRetryCategories")
- public void testAsyncWriteRetry() {
- MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
- try {
- ZNRecord tmpRecord = new ZNRecord("tmpRecord");
- tmpRecord.setSimpleField("foo", "bar");
- testZkClient.createPersistent(NODE_PATH, tmpRecord);
-
- // 1. Test async set retry
- ZkAsyncCallbacks.SetDataCallbackHandler setCallback =
- new ZkAsyncCallbacks.SetDataCallbackHandler();
- Assert.assertEquals(setCallback.getRc(), UNKNOWN_RET_CODE);
-
- tmpRecord.setSimpleField("test", "data");
- testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
- // Async set will be pending due to the mock error rc is retryable.
- testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback);
- Assert.assertFalse(setCallback.isOperationDone());
- Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue());
- // Change the mock return code.
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- // Async retry will succeed now. Wait until the operation is successfully done and
verify.
- Assert.assertTrue(waitAsyncOperation(setCallback, 1000));
- Assert.assertEquals(setCallback.getRc(), KeeperException.Code.OK.intValue());
- Assert.assertEquals(((ZNRecord) testZkClient.readData(NODE_PATH)).getSimpleField("test"),
- "data");
- Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
-
- // 2. Test async delete
- ZkAsyncCallbacks.DeleteCallbackHandler deleteCallback =
- new ZkAsyncCallbacks.DeleteCallbackHandler();
- Assert.assertEquals(deleteCallback.getRc(), UNKNOWN_RET_CODE);
-
- testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
- // Async delete will be pending due to the mock error rc is retryable.
- testZkClient.asyncDelete(NODE_PATH, deleteCallback);
- Assert.assertFalse(deleteCallback.isOperationDone());
- Assert.assertEquals(deleteCallback.getRc(), CONNECTIONLOSS.intValue());
- // Change the mock return code.
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- // Async retry will succeed now. Wait until the operation is successfully done and
verify.
- Assert.assertTrue(waitAsyncOperation(deleteCallback, 1000));
- Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.OK.intValue());
- Assert.assertFalse(testZkClient.exists(NODE_PATH));
- Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
- } finally {
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- testZkClient.close();
- _zkClient.delete(NODE_PATH);
- }
- }
-
- @Test(dependsOnMethods = "testAsyncWriteRetry")
- public void testAsyncReadRetry() {
- MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
- try {
- ZNRecord tmpRecord = new ZNRecord("tmpRecord");
- tmpRecord.setSimpleField("foo", "bar");
- testZkClient.createPersistent(NODE_PATH, tmpRecord);
-
- // 1. Test async exist check
- ZkAsyncCallbacks.ExistsCallbackHandler existsCallback =
- new ZkAsyncCallbacks.ExistsCallbackHandler();
- Assert.assertEquals(existsCallback.getRc(), UNKNOWN_RET_CODE);
-
- testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
- // Async exist check will be pending due to the mock error rc is retryable.
- testZkClient.asyncExists(NODE_PATH, existsCallback);
- Assert.assertFalse(existsCallback.isOperationDone());
- Assert.assertEquals(existsCallback.getRc(), CONNECTIONLOSS.intValue());
- // Change the mock return code.
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- // Async retry will succeed now. Wait until the operation is successfully done and
verify.
- Assert.assertTrue(waitAsyncOperation(existsCallback, 1000));
- Assert.assertEquals(existsCallback.getRc(), KeeperException.Code.OK.intValue());
- Assert.assertTrue(existsCallback._stat != null);
- Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
-
- // 2. Test async get
- ZkAsyncCallbacks.GetDataCallbackHandler getCallback =
- new ZkAsyncCallbacks.GetDataCallbackHandler();
- Assert.assertEquals(getCallback.getRc(), UNKNOWN_RET_CODE);
-
- testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
- // Async get will be pending due to the mock error rc is retryable.
- testZkClient.asyncGetData(NODE_PATH, getCallback);
- Assert.assertFalse(getCallback.isOperationDone());
- Assert.assertEquals(getCallback.getRc(), CONNECTIONLOSS.intValue());
- // Change the mock return code.
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- // Async retry will succeed now. Wait until the operation is successfully done and
verify.
- Assert.assertTrue(waitAsyncOperation(getCallback, 1000));
- Assert.assertEquals(getCallback.getRc(), KeeperException.Code.OK.intValue());
- ZNRecord record = testZkClient.deserialize(getCallback._data, NODE_PATH);
- Assert.assertEquals(record.getSimpleField("foo"), "bar");
- Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
- } finally {
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- testZkClient.close();
- _zkClient.delete(NODE_PATH);
- }
- }
-
- @Test(dependsOnMethods = "testAsyncReadRetry")
- public void testAsyncRequestCleanup() {
- int cbCount = 10;
- MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
- try {
- ZNRecord tmpRecord = new ZNRecord("tmpRecord");
- tmpRecord.setSimpleField("foo", "bar");
- testZkClient.createPersistent(NODE_PATH, tmpRecord);
-
- // Create 10 async exists check requests
- ZkAsyncCallbacks.ExistsCallbackHandler[] existsCallbacks =
- new ZkAsyncCallbacks.ExistsCallbackHandler[cbCount];
- for (int i = 0; i < cbCount; i++) {
- existsCallbacks[i] = new ZkAsyncCallbacks.ExistsCallbackHandler();
- }
- testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
- // All async exist check calls will be pending due to the mock error rc is retryable.
- for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
- testZkClient.asyncExists(NODE_PATH, cb);
- Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
- }
- // Wait for a while, no callback finishes
- Assert.assertFalse(waitAsyncOperation(existsCallbacks[0], 1000));
- for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
- Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
- Assert.assertFalse(cb.isOperationDone());
- }
- testZkClient.close();
- // All callback retry will be cancelled because the zkclient is closed.
- for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
- Assert.assertTrue(waitAsyncOperation(cb, 1000));
- Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
- }
- Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
- } finally {
- testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
- testZkClient.close();
- _zkClient.delete(NODE_PATH);
- }
- }
-
- /**
- * Mock client to whitebox test async functionality.
- */
- class MockAsyncZkClient extends ZkClient {
- private static final long RETRY_INTERVAL_MS = 500;
- private long _retryCount = 0;
-
- /**
- * If the specified return code is OK, call the real function.
- * Otherwise, trigger the callback with the specified RC without triggering the real
ZK call.
- */
- private int _asyncCallRetCode = KeeperException.Code.OK.intValue();
-
- public MockAsyncZkClient(String zkAddress) {
- super(zkAddress);
- setZkSerializer(new ZNRecordSerializer());
- }
-
- public void setAsyncCallRC(int rc) {
- _asyncCallRetCode = rc;
- }
-
- public long getAndResetRetryCount() {
- long tmpCount = _retryCount;
- _retryCount = 0;
- return tmpCount;
- }
-
- @Override
- public void asyncCreate(String path, Object datat, CreateMode mode,
- ZkAsyncCallbacks.CreateCallbackHandler cb) {
- if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
- super.asyncCreate(path, datat, mode, cb);
- return;
- } else {
- cb.processResult(_asyncCallRetCode, path,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
- @Override
- protected void doRetry() {
- _retryCount++;
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- throw new ZkInterruptedException(e);
- }
- asyncCreate(path, datat, mode, cb);
- }
- }, null);
- }
- }
-
- @Override
- public void asyncSetData(String path, Object datat, int version,
- ZkAsyncCallbacks.SetDataCallbackHandler cb) {
- if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
- super.asyncSetData(path, datat, version, cb);
- return;
- } else {
- cb.processResult(_asyncCallRetCode, path,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
- @Override
- protected void doRetry() {
- _retryCount++;
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- throw new ZkInterruptedException(e);
- }
- asyncSetData(path, datat, version, cb);
- }
- }, null);
- }
- }
-
- @Override
- public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) {
- if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
- super.asyncGetData(path, cb);
- return;
- } else {
- cb.processResult(_asyncCallRetCode, path,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, true) {
- @Override
- protected void doRetry() {
- _retryCount++;
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- throw new ZkInterruptedException(e);
- }
- asyncGetData(path, cb);
- }
- }, null, null);
- }
- }
-
- @Override
- public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) {
- if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
- super.asyncExists(path, cb);
- return;
- } else {
- cb.processResult(_asyncCallRetCode, path,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, true) {
- @Override
- protected void doRetry() {
- _retryCount++;
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- throw new ZkInterruptedException(e);
- }
- asyncExists(path, cb);
- }
- }, null);
- }
- }
-
- @Override
- public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) {
- if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
- super.asyncDelete(path, cb);
- return;
- } else {
- cb.processResult(_asyncCallRetCode, path,
- new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, false) {
- @Override
- protected void doRetry() {
- _retryCount++;
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- throw new ZkInterruptedException(e);
- }
- asyncDelete(path, cb);
- }
- });
- }
- }
- }
-}
|