helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch master updated: Async write operation should not throw Exception for serializing error (#845)
Date Tue, 03 Mar 2020 01:26:15 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 496d573  Async write operation should not throw Exception for serializing error (#845)
496d573 is described below

commit 496d573811a8ffddac31b8a3081d504cfaf134d1
Author: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com>
AuthorDate: Mon Mar 2 17:25:25 2020 -0800

    Async write operation should not throw Exception for serializing error (#845)
    
    This change will make the async write operations return error through the async callback
instead of throwing exceptions. This change will fix the batch write/create failure due to
one single node serializing failure.
    In addition, according to the serializer interface definition, change ZK related serializers
to throw ZkMarshallingError instead of ZkClientException.
---
 .../apache/helix/manager/zk/TestRawZkClient.java   |  68 ++++++
 .../helix/manager/zk/TestZNRecordSizeLimit.java    | 233 ++++++++++-----------
 .../serializer/ZNRecordJacksonSerializer.java      |   7 +-
 .../datamodel/serializer/ZNRecordSerializer.java   |   8 +-
 .../serializer/ZNRecordStreamingSerializer.java    |   7 +-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  61 +++---
 6 files changed, 229 insertions(+), 155 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index a713996..bf1a82c 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.manager.zk;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,6 +42,9 @@ import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
+import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
@@ -745,4 +749,68 @@ public class TestRawZkClient extends ZkUnitTestBase {
     // Recover zk server for later tests.
     _zkServer.start();
   }
+
+  @Test
+  public void testAsyncWriteOperations() {
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    String originSizeLimit =
+        System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
"2000");
+    try {
+      zkClient.setZkSerializer(new ZNRecordSerializer());
+
+      ZNRecord oversizeZNRecord = new ZNRecord("Oversize");
+      StringBuilder sb = new StringBuilder(1204);
+      Random ran = new Random();
+      for (int i = 0; i < 1024; i++) {
+        sb.append(ran.nextInt(26) + 'a');
+      }
+      String buf = sb.toString();
+      for (int i = 0; i < 1024; i++) {
+        oversizeZNRecord.setSimpleField(Integer.toString(i), buf);
+      }
+
+      // ensure /tmp exists for the test
+      if (!zkClient.exists("/tmp")) {
+        zkClient.create("/tmp", null, CreateMode.PERSISTENT);
+      }
+
+      org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.CreateCallbackHandler
+          createCallback = new org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.CreateCallbackHandler();
+      zkClient.asyncCreate("/tmp/async", null, CreateMode.PERSISTENT, createCallback);
+      createCallback.waitForSuccess();
+      Assert.assertEquals(createCallback.getRc(), 0);
+      Assert.assertTrue(zkClient.exists("/tmp/async"));
+
+      // try to create oversize node, should fail
+      zkClient.asyncCreate("/tmp/asyncOversize", oversizeZNRecord, CreateMode.PERSISTENT,
+          createCallback);
+      createCallback.waitForSuccess();
+      Assert.assertEquals(createCallback.getRc(), KeeperException.Code.MarshallingError);
+      Assert.assertFalse(zkClient.exists("/tmp/asyncOversize"));
+
+      ZNRecord normalZNRecord = new ZNRecord("normal");
+      normalZNRecord.setSimpleField("key", buf);
+
+      org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.SetDataCallbackHandler
+          setDataCallbackHandler = new org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.SetDataCallbackHandler();
+      zkClient.asyncSetData("/tmp/async", normalZNRecord, -1, setDataCallbackHandler);
+      setDataCallbackHandler.waitForSuccess();
+      Assert.assertEquals(setDataCallbackHandler.getRc(), 0);
+
+      zkClient.asyncSetData("/tmp/async", oversizeZNRecord, -1, setDataCallbackHandler);
+      setDataCallbackHandler.waitForSuccess();
+      Assert.assertEquals(setDataCallbackHandler.getRc(), KeeperException.Code.MarshallingError);
+      Assert.assertEquals(zkClient.readData("/tmp/async"), normalZNRecord);
+    } finally {
+      if (originSizeLimit == null) {
+        System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      } else {
+        System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+            originSizeLimit);
+      }
+      zkClient.delete("/tmp/async");
+      zkClient.delete("/tmp/asyncOversize");
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
index 30d6349..8c8d649 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -35,6 +35,7 @@ import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -306,139 +307,136 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     final String thresholdProperty =
         System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
 
-    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    try {
+      ZNRecordSerializer serializer = new ZNRecordSerializer();
 
-    String root = getShortClassName();
+      String root = getShortClassName();
 
-    byte[] buf = new byte[1024];
-    for (int i = 0; i < 1024; i++) {
-      buf[i] = 'a';
-    }
-    String bufStr = new String(buf);
+      byte[] buf = new byte[1024];
+      for (int i = 0; i < 1024; i++) {
+        buf[i] = 'a';
+      }
+      String bufStr = new String(buf);
 
-    // 1. legal-sized data gets written to zk
-    // write a znode of size less than writeSizeLimit
-    int rawZnRecordSize = 700;
-    int writeSizeLimitKb = 800;
-    int writeSizeLimit = writeSizeLimitKb * 1024;
-    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-        String.valueOf(writeSizeLimit));
-
-    final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
-    for (int i = 0; i < rawZnRecordSize; i++) {
-      normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
-    }
+      // 1. legal-sized data gets written to zk
+      // write a znode of size less than writeSizeLimit
+      int rawZnRecordSize = 700;
+      int writeSizeLimitKb = 800;
+      int writeSizeLimit = writeSizeLimitKb * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
 
-    String path = "/" + root + "/normal";
-    _gZkClient.createPersistent(path, true);
-    _gZkClient.writeData(path, normalSizeRecord);
+      final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
+      for (int i = 0; i < rawZnRecordSize; i++) {
+        normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
+      }
 
-    ZNRecord record = _gZkClient.readData(path);
+      String path = "/" + root + "/normal";
+      _gZkClient.createPersistent(path, true);
+      _gZkClient.writeData(path, normalSizeRecord);
 
-    // Successfully reads the same data.
-    Assert.assertEquals(normalSizeRecord, record);
+      ZNRecord record = _gZkClient.readData(path);
 
-    int length = serializer.serialize(record).length;
+      // Successfully reads the same data.
+      Assert.assertEquals(normalSizeRecord, record);
 
-    // Less than writeSizeLimit so it is written to ZK.
-    Assert.assertTrue(length < writeSizeLimit);
+      int length = serializer.serialize(record).length;
 
-    // 2. Large size data is not allowed to write to ZK
-    // Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit.
-    rawZnRecordSize = 2000;
-    // Set the writeSizeLimit to very small so serialized data size exceeds the writeSizeLimit.
-    writeSizeLimitKb = 1;
-    writeSizeLimit = writeSizeLimitKb * 1024;
-    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-        String.valueOf(writeSizeLimit));
+      // Less than writeSizeLimit so it is written to ZK.
+      Assert.assertTrue(length < writeSizeLimit);
 
-    final ZNRecord largeRecord = new ZNRecord("large-size");
-    for (int i = 0; i < rawZnRecordSize; i++) {
-      largeRecord.setSimpleField(Integer.toString(i), bufStr);
-    }
+      // 2. Large size data is not allowed to write to ZK
+      // Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit.
+      rawZnRecordSize = 2000;
+      // Set the writeSizeLimit to very small so serialized data size exceeds the writeSizeLimit.
+      writeSizeLimitKb = 1;
+      writeSizeLimit = writeSizeLimitKb * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
 
-    path = "/" + root + "/large";
-    _gZkClient.createPersistent(path, true);
+      final ZNRecord largeRecord = new ZNRecord("large-size");
+      for (int i = 0; i < rawZnRecordSize; i++) {
+        largeRecord.setSimpleField(Integer.toString(i), bufStr);
+      }
 
-    try {
-      _gZkClient.writeData(path, largeRecord);
-      Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!");
-    } catch (ZkClientException expected) {
-      Assert.assertTrue(
-          expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
-    }
+      path = "/" + root + "/large";
+      _gZkClient.createPersistent(path, true);
 
-    // test ZkDataAccessor
-    ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
-    admin.addCluster(root, true);
-    InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
-    admin.addInstance(root, instanceConfig);
+      try {
+        _gZkClient.writeData(path, largeRecord);
+        Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!");
+      } catch (ZkMarshallingError expected) {
+        Assert.assertTrue(
+            expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
+      }
 
-    // Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit.
-    writeSizeLimitKb = 10;
-    writeSizeLimit = writeSizeLimitKb * 1024;
-    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-        String.valueOf(writeSizeLimit));
+      // test ZkDataAccessor
+      ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+      admin.addCluster(root, true);
+      InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+      admin.addInstance(root, instanceConfig);
 
-    // oversized data should not create any new data on zk
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(ZK_ADDR));
-    Builder keyBuilder = accessor.keyBuilder();
+      // Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit.
+      writeSizeLimitKb = 10;
+      writeSizeLimit = writeSizeLimitKb * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
 
-    IdealState idealState = new IdealState("currentState");
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
+      // oversized data should not create any new data on zk
+      ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(ZK_ADDR));
+      Builder keyBuilder = accessor.keyBuilder();
 
-    for (int i = 0; i < 1024; i++) {
-      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
-    }
-    boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-    Assert.assertTrue(succeed);
-    HelixProperty property = accessor.getProperty(
-        keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1"));
-    Assert.assertNull(property);
+      IdealState idealState = new IdealState("currentState");
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
 
-    // legal sized data gets written to zk
-    idealState.getRecord().getSimpleFields().clear();
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
+      for (int i = 0; i < 1024; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
+      boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+      Assert.assertTrue(succeed);
+      HelixProperty property = accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918",
"session_1", "partition_1"));
+      Assert.assertNull(property);
 
-    for (int i = 0; i < 900; i++) {
-      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
-    }
-    succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
-    Assert.assertTrue(succeed);
-    record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
-    Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
+      // legal sized data gets written to zk
+      idealState.getRecord().getSimpleFields().clear();
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
 
-    // Set small write size limit so writing does not succeed.
-    writeSizeLimitKb = 1;
-    writeSizeLimit = writeSizeLimitKb * 1024;
-    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-        String.valueOf(writeSizeLimit));
+      for (int i = 0; i < 900; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
+      succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+      Assert.assertTrue(succeed);
+      record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+      Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
 
-    // oversized data should not update existing data on zk
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
-    for (int i = 900; i < 1024; i++) {
-      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
-    }
+      // Set small write size limit so writing does not succeed.
+      writeSizeLimitKb = 1;
+      writeSizeLimit = writeSizeLimitKb * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
 
-    succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
-    Assert.assertFalse(succeed,
-        "Update property should not succeed because data exceeds znode write limit!");
+      // oversized data should not update existing data on zk
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
+      for (int i = 900; i < 1024; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
 
-    // Delete the nodes.
-    deletePath(_gZkClient, "/" + root);
+      succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
+      Assert.assertFalse(succeed,
+          "Update property should not succeed because data exceeds znode write limit!");
 
-    // Reset: add the properties back to system properties if they were originally available.
-    if (thresholdProperty != null) {
-      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-          thresholdProperty);
-    } else {
-      System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      // Delete the nodes.
+      deletePath(_gZkClient, "/" + root);
+    } finally {
+      // Reset: add the properties back to system properties if they were originally available.
+      if (thresholdProperty != null) {
+        System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+            thresholdProperty);
+      } else {
+        System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      }
     }
   }
 
@@ -516,7 +514,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       try {
         zkClient.writeData(path, largeRecord);
         Assert.fail("Data should not written to ZK because data size exceeds writeSizeLimit!");
-      } catch (ZkClientException expected) {
+      } catch (ZkMarshallingError expected) {
         Assert.assertTrue(
             expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
       }
@@ -588,14 +586,13 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       deletePath(zkClient, "/" + root);
     } finally {
       zkClient.close();
-    }
-
-    // Reset: add the properties back to system properties if they were originally available.
-    if (thresholdProperty != null) {
-      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-          thresholdProperty);
-    } else {
-      System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      // Reset: add the properties back to system properties if they were originally available.
+      if (thresholdProperty != null) {
+        System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+            thresholdProperty);
+      } else {
+        System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      }
     }
   }
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordJacksonSerializer.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordJacksonSerializer.java
index a30829a..73b0d88 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordJacksonSerializer.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordJacksonSerializer.java
@@ -22,7 +22,6 @@ package org.apache.helix.zookeeper.datamodel.serializer;
 import java.io.IOException;
 
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -39,14 +38,14 @@ public class ZNRecordJacksonSerializer implements ZkSerializer {
   public byte[] serialize(Object record) throws ZkMarshallingError {
     if (!(record instanceof ZNRecord)) {
       // null is NOT an instance of any class
-      throw new ZkClientException("Input object is not of type ZNRecord (was " + record +
")");
+      throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + record
+ ")");
     }
     ZNRecord znRecord = (ZNRecord) record;
 
     try {
       return OBJECT_MAPPER.writeValueAsBytes(znRecord);
     } catch (IOException e) {
-      throw new ZkClientException(
+      throw new ZkMarshallingError(
           String.format("Exception during serialization. ZNRecord id: %s", znRecord.getId()),
e);
     }
   }
@@ -62,7 +61,7 @@ public class ZNRecordJacksonSerializer implements ZkSerializer {
     try {
       record = OBJECT_MAPPER.readValue(bytes, ZNRecord.class);
     } catch (IOException e) {
-      throw new ZkClientException("Exception during deserialization!", e);
+      throw new ZkMarshallingError("Exception during deserialization!", e);
     }
     return record;
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
index bf2c4bf..7be3ace 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
@@ -25,9 +25,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.util.ZNRecordUtil;
+import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -58,7 +58,7 @@ public class ZNRecordSerializer implements ZkSerializer {
       // null is NOT an instance of any class
       LOG.error("Input object must be of type ZNRecord but it is " + data
           + ". Will not write to zk");
-      throw new ZkClientException("Input object is not of type ZNRecord (was " + data + ")");
+      throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + data +
")");
     }
 
     ZNRecord record = (ZNRecord) data;
@@ -97,7 +97,7 @@ public class ZNRecordSerializer implements ZkSerializer {
       LOG.error(
           "Exception during data serialization. ZNRecord ID: {} will not be written to zk.",
           record.getId(), e);
-      throw new ZkClientException(e);
+      throw new ZkMarshallingError(e);
     }
 
     int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
@@ -105,7 +105,7 @@ public class ZNRecordSerializer implements ZkSerializer {
       LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id:
{}."
               + " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit,
           isCompressed, record.getId());
-      throw new ZkClientException(
+      throw new ZkMarshallingError(
           "Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit
               + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId());
     }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordStreamingSerializer.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordStreamingSerializer.java
index 5c8579c..604ca88 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordStreamingSerializer.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordStreamingSerializer.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.util.ZNRecordUtil;
 import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
@@ -64,7 +63,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       // null is NOT an instance of any class
       LOG.error("Input object must be of type ZNRecord but it is " + data
           + ". Will not write to zk");
-      throw new ZkClientException("Input object is not of type ZNRecord (was " + data + ")");
+      throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + data +
")");
     }
 
     // apply retention policy on list field
@@ -165,7 +164,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       LOG.error(
           "Exception during data serialization. ZNRecord ID: {} will not be written to zk.",
           record.getId(), e);
-      throw new ZkClientException(e);
+      throw new ZkMarshallingError(e);
     }
     // check size
     int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
@@ -173,7 +172,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id:
{}."
               + " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit,
           isCompressed, record.getId());
-      throw new ZkClientException(
+      throw new ZkMarshallingError(
           "Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit
               + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId());
     }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 0507c3f..3424f06 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -26,19 +26,20 @@ import javax.management.JMException;
 
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
-import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
-import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
 import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
-import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -1720,17 +1721,22 @@ public class ZkClient implements Watcher {
   public void asyncCreate(final String path, Object datat, final CreateMode mode,
       final ZkAsyncCallbacks.CreateCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    final byte[] data = (datat == null ? null : serialize(datat, path));
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper()
-            .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                // Arrays.asList(DEFAULT_ACL),
-                mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                    data == null ? 0 : data.length, false));
-        return null;
-      }
+    byte[] data = null;
+    try {
+      data = (datat == null ? null : serialize(datat, path));
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+      return;
+    }
+    final byte[] finalData = data;
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper()
+          .create(path, finalData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+              // Arrays.asList(DEFAULT_ACL),
+              mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+                  finalData == null ? 0 : finalData.length, false));
+      return null;
     });
   }
 
@@ -1738,15 +1744,20 @@ public class ZkClient implements Watcher {
   public void asyncSetData(final String path, Object datat, final int version,
       final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    final byte[] data = serialize(datat, path);
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                data == null ? 0 : data.length, false));
-        return null;
-      }
+    byte[] data = null;
+    try {
+      data = serialize(datat, path);
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+      return;
+    }
+    final byte[] finalData = data;
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper().setData(path, finalData, version, cb,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+              finalData == null ? 0 : finalData.length, false));
+      return null;
     });
   }
 


Mime
View raw message