helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] 01/01: Async write operation should not throw Exception for serializing error.
Date Mon, 02 Mar 2020 19:57:35 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch async
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 9685c1705832d1fe99d8f6b53b4da465c07f8a0d
Author: Jiajun Wang <jjwang@linkedin.com>
AuthorDate: Tue Feb 25 14:56:01 2020 -0800

    Async write operation should not throw Exception for serializing error.
    
    This change will make the async write operations return error through the async callback
instead of throwing exceptions. This change will fix the batch write/create failure due to
one single node serializing failure.
---
 .../apache/helix/manager/zk/TestRawZkClient.java   | 52 ++++++++++++++++++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 61 +++++++++++++---------
 2 files changed, 88 insertions(+), 25 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index a713996..5d60af0 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.manager.zk;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,6 +42,8 @@ import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
@@ -745,4 +748,53 @@ public class TestRawZkClient extends ZkUnitTestBase {
     // Recover zk server for later tests.
     _zkServer.start();
   }
+
+  @Test
+  public void testAsyncWriteOperations() {
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    // TODO disable the compress here
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    ZNRecord oversizeZNRecord = new ZNRecord("Oversize");
+    StringBuilder sb = new StringBuilder(1204);
+    Random ran = new Random();
+    for (int i = 0; i < 1024; i++) {
+      sb.append(ran.nextInt(26) + 'a');
+    }
+    String buf = sb.toString();
+    for (int i = 0; i < 1024; i++) {
+      oversizeZNRecord.setSimpleField(Integer.toString(i), buf);
+    }
+
+    // ensure /tmp exists for the test
+    if (!zkClient.exists("/tmp")) {
+      zkClient.create("/tmp", null, CreateMode.PERSISTENT);
+    }
+
+    org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.CreateCallbackHandler
+        createCallback =
+        new org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.CreateCallbackHandler();
+    zkClient.asyncCreate( "/tmp/async", null, CreateMode.PERSISTENT, createCallback);
+    createCallback.waitForSuccess();
+    Assert.assertEquals(createCallback.getRc(), 0);
+
+    // try to create oversize node, should fail
+    zkClient.asyncCreate( "/tmp/asyncOversize", oversizeZNRecord, CreateMode.PERSISTENT,
createCallback);
+    createCallback.waitForSuccess();
+    Assert.assertEquals(createCallback.getRc(), KeeperException.Code.MarshallingError);
+
+    ZNRecord normalZNRecord = new ZNRecord("normal");
+    normalZNRecord.setSimpleField("key", buf);
+
+    org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.SetDataCallbackHandler
+        setDataCallbackHandler =
+        new org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.SetDataCallbackHandler();
+    zkClient.asyncSetData( "/tmp/async", normalZNRecord, -1, setDataCallbackHandler);
+    createCallback.waitForSuccess();
+    Assert.assertEquals(setDataCallbackHandler.getRc(), 0);
+
+    zkClient.asyncSetData( "/tmp/async", oversizeZNRecord, -1, setDataCallbackHandler);
+    createCallback.waitForSuccess();
+    Assert.assertEquals(setDataCallbackHandler.getRc(), KeeperException.Code.MarshallingError);
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 0507c3f..3424f06 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -26,19 +26,20 @@ import javax.management.JMException;
 
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
-import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
-import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
 import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
-import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -1720,17 +1721,22 @@ public class ZkClient implements Watcher {
   public void asyncCreate(final String path, Object datat, final CreateMode mode,
       final ZkAsyncCallbacks.CreateCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    final byte[] data = (datat == null ? null : serialize(datat, path));
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper()
-            .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                // Arrays.asList(DEFAULT_ACL),
-                mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                    data == null ? 0 : data.length, false));
-        return null;
-      }
+    byte[] data = null;
+    try {
+      data = (datat == null ? null : serialize(datat, path));
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+      return;
+    }
+    final byte[] finalData = data;
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper()
+          .create(path, finalData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+              // Arrays.asList(DEFAULT_ACL),
+              mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+                  finalData == null ? 0 : finalData.length, false));
+      return null;
     });
   }
 
@@ -1738,15 +1744,20 @@ public class ZkClient implements Watcher {
   public void asyncSetData(final String path, Object datat, final int version,
       final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    final byte[] data = serialize(datat, path);
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                data == null ? 0 : data.length, false));
-        return null;
-      }
+    byte[] data = null;
+    try {
+      data = serialize(datat, path);
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+      return;
+    }
+    final byte[] finalData = data;
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper().setData(path, finalData, version, cb,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+              finalData == null ? 0 : finalData.length, false));
+      return null;
     });
   }
 


Mime
View raw message