helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] 01/01: Async write operation should not throw Exception for serializing error (#845) (#999)
Date Thu, 07 May 2020 17:14:53 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 35cfaebe69bb5ffaed3365c191dce4a83245a747
Author: Jiajun Wang <jjwang@linkedin.com>
AuthorDate: Thu May 7 10:08:10 2020 -0700

    Async write operation should not throw Exception for serializing error (#845) (#999)
    
    This change will make the async write operations return error through the async callback
instead of throwing exceptions. This change will fix the batch write/create failure due to
one single node serializing failure.
    In addition, according to the serializer interface definition, change ZK related serializers
to throw ZkMarshallingError instead of ZkClientException.
---
 .../helix/manager/zk/ZNRecordSerializer.java       |   8 +-
 .../manager/zk/ZNRecordStreamingSerializer.java    |   7 +-
 .../helix/manager/zk/zookeeper/ZkClient.java       |  51 +++---
 .../apache/helix/manager/zk/TestRawZkClient.java   |  68 ++++++++
 .../zk/TestZNRecordSerializeWriteSizeLimit.java    |   3 +-
 .../helix/manager/zk/TestZNRecordSizeLimit.java    | 182 +++++++++++----------
 6 files changed, 200 insertions(+), 119 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
index 416388b..1eb16b7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
@@ -24,8 +24,8 @@ import java.io.ByteArrayOutputStream;
 import java.util.List;
 import java.util.Map;
 
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.util.GZipCompressionUtil;
 import org.apache.helix.util.ZNRecordUtil;
@@ -57,7 +57,7 @@ public class ZNRecordSerializer implements ZkSerializer {
       // null is NOT an instance of any class
       LOG.error("Input object must be of type ZNRecord but it is " + data
           + ". Will not write to zk");
-      throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
+      throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + data +
")");
     }
 
     ZNRecord record = (ZNRecord) data;
@@ -96,7 +96,7 @@ public class ZNRecordSerializer implements ZkSerializer {
       LOG.error(
           "Exception during data serialization. ZNRecord ID: {} will not be written to zk.",
           record.getId(), e);
-      throw new HelixException(e);
+      throw new ZkMarshallingError(e);
     }
 
     int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
@@ -104,7 +104,7 @@ public class ZNRecordSerializer implements ZkSerializer {
       LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id:
{}."
               + " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit,
           isCompressed, record.getId());
-      throw new HelixException(
+      throw new ZkMarshallingError(
           "Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit
               + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId());
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index 0c0af09..efe9e64 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.commons.codec.binary.Base64;
-import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.util.GZipCompressionUtil;
 import org.apache.helix.util.ZNRecordUtil;
@@ -64,7 +63,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       // null is NOT an instance of any class
       LOG.error("Input object must be of type ZNRecord but it is " + data
           + ". Will not write to zk");
-      throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
+      throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + data +
")");
     }
 
     // apply retention policy on list field
@@ -165,7 +164,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       LOG.error(
           "Exception during data serialization. ZNRecord ID: {} will not be written to zk.",
           record.getId(), e);
-      throw new HelixException(e);
+      throw new ZkMarshallingError(e);
     }
     // check size
     int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
@@ -173,7 +172,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id:
{}."
               + " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit,
           isCompressed, record.getId());
-      throw new HelixException(
+      throw new ZkMarshallingError(
           "Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit
               + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId());
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 66f26d6..1b4d1ca 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -34,6 +34,7 @@ import org.I0Itec.zkclient.ZkLock;
 import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.I0Itec.zkclient.exception.ZkTimeoutException;
@@ -1400,17 +1401,22 @@ public class ZkClient implements Watcher {
   public void asyncCreate(final String path, Object datat, final CreateMode mode,
       final ZkAsyncCallbacks.CreateCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    final byte[] data = (datat == null ? null : serialize(datat, path));
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().create(path, data,
-            ZooDefs.Ids.OPEN_ACL_UNSAFE,
-            // Arrays.asList(DEFAULT_ACL),
-            mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                data == null ? 0 : data.length, false));
-        return null;
-      }
+    byte[] data = null;
+    try {
+      data = (datat == null ? null : serialize(datat, path));
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+      return;
+    }
+    final byte[] finalData = data;
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper()
+          .create(path, finalData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+              // Arrays.asList(DEFAULT_ACL),
+              mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+                  finalData == null ? 0 : finalData.length, false));
+      return null;
     });
   }
 
@@ -1418,15 +1424,20 @@ public class ZkClient implements Watcher {
   public void asyncSetData(final String path, Object datat, final int version,
       final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    final byte[] data = serialize(datat, path);
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                data == null ? 0 : data.length, false));
-        return null;
-      }
+    byte[] data = null;
+    try {
+      data = serialize(datat, path);
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+      return;
+    }
+    final byte[] finalData = data;
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper().setData(path, finalData, version, cb,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+              finalData == null ? 0 : finalData.length, false));
+      return null;
     });
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index 95a55c4..ce109b7 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.manager.zk;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -32,7 +33,9 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
@@ -40,6 +43,7 @@ import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -344,4 +348,68 @@ public class TestRawZkClient extends ZkUnitTestBase {
       zkServer.shutdown();
     }
   }
+
+  @Test
+  public void testAsyncWriteOperations() {
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    String originSizeLimit =
+        System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
"2000");
+    try {
+      zkClient.setZkSerializer(new ZNRecordSerializer());
+
+      ZNRecord oversizeZNRecord = new ZNRecord("Oversize");
+      StringBuilder sb = new StringBuilder(1204);
+      Random ran = new Random();
+      for (int i = 0; i < 1024; i++) {
+        sb.append(ran.nextInt(26) + 'a');
+      }
+      String buf = sb.toString();
+      for (int i = 0; i < 1024; i++) {
+        oversizeZNRecord.setSimpleField(Integer.toString(i), buf);
+      }
+
+      // ensure /tmp exists for the test
+      if (!zkClient.exists("/tmp")) {
+        zkClient.create("/tmp", null, CreateMode.PERSISTENT);
+      }
+
+      ZkAsyncCallbacks.CreateCallbackHandler
+          createCallback = new ZkAsyncCallbacks.CreateCallbackHandler();
+      zkClient.asyncCreate("/tmp/async", null, CreateMode.PERSISTENT, createCallback);
+      createCallback.waitForSuccess();
+      Assert.assertEquals(createCallback.getRc(), 0);
+      Assert.assertTrue(zkClient.exists("/tmp/async"));
+
+      // try to create oversize node, should fail
+      zkClient.asyncCreate("/tmp/asyncOversize", oversizeZNRecord, CreateMode.PERSISTENT,
+          createCallback);
+      createCallback.waitForSuccess();
+      Assert.assertEquals(createCallback.getRc(), KeeperException.Code.MarshallingError);
+      Assert.assertFalse(zkClient.exists("/tmp/asyncOversize"));
+
+      ZNRecord normalZNRecord = new ZNRecord("normal");
+      normalZNRecord.setSimpleField("key", buf);
+
+      ZkAsyncCallbacks.SetDataCallbackHandler
+          setDataCallbackHandler = new ZkAsyncCallbacks.SetDataCallbackHandler();
+      zkClient.asyncSetData("/tmp/async", normalZNRecord, -1, setDataCallbackHandler);
+      setDataCallbackHandler.waitForSuccess();
+      Assert.assertEquals(setDataCallbackHandler.getRc(), 0);
+
+      zkClient.asyncSetData("/tmp/async", oversizeZNRecord, -1, setDataCallbackHandler);
+      setDataCallbackHandler.waitForSuccess();
+      Assert.assertEquals(setDataCallbackHandler.getRc(), KeeperException.Code.MarshallingError);
+      Assert.assertEquals(zkClient.readData("/tmp/async"), normalZNRecord);
+    } finally {
+      if (originSizeLimit == null) {
+        System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      } else {
+        System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+            originSizeLimit);
+      }
+      zkClient.delete("/tmp/async");
+      zkClient.delete("/tmp/asyncOversize");
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
index 7dca363..671aba2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
@@ -22,6 +22,7 @@ package org.apache.helix.manager.zk;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
 import org.apache.helix.SystemPropertyKeys;
@@ -146,7 +147,7 @@ public class TestZNRecordSerializeWriteSizeLimit {
 
       Assert.assertEquals(bytes.length >= limit, exceptionExpected);
       Assert.assertFalse(exceptionExpected);
-    } catch (HelixException ex) {
+    } catch (ZkMarshallingError ex) {
       Assert.assertTrue(exceptionExpected, "Should not throw ZkClientException.");
       Assert.assertTrue(ex.getMessage().contains(" is greater than " + limit + " bytes"));
       // No need to verify following asserts as bytes data is not returned.
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
index bc440b1..abb13b2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -21,6 +21,8 @@ package org.apache.helix.manager.zk;
 
 import java.util.Arrays;
 import java.util.Date;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey.Builder;
@@ -304,15 +306,16 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     final String thresholdProperty =
         System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
 
-    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    try {
+      ZNRecordSerializer serializer = new ZNRecordSerializer();
 
-    String root = getShortClassName();
+      String root = getShortClassName();
 
-    byte[] buf = new byte[1024];
-    for (int i = 0; i < 1024; i++) {
-      buf[i] = 'a';
-    }
-    String bufStr = new String(buf);
+      byte[] buf = new byte[1024];
+      for (int i = 0; i < 1024; i++) {
+        buf[i] = 'a';
+      }
+      String bufStr = new String(buf);
 
     // 1. legal-sized data gets written to zk
     // write a znode of size less than writeSizeLimit
@@ -322,24 +325,24 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
         String.valueOf(writeSizeLimit));
 
-    final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
-    for (int i = 0; i < rawZnRecordSize; i++) {
-      normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
-    }
+      final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
+      for (int i = 0; i < rawZnRecordSize; i++) {
+        normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
+      }
 
-    String path = "/" + root + "/normal";
-    _gZkClient.createPersistent(path, true);
-    _gZkClient.writeData(path, normalSizeRecord);
+      String path = "/" + root + "/normal";
+      _gZkClient.createPersistent(path, true);
+      _gZkClient.writeData(path, normalSizeRecord);
 
-    ZNRecord record = _gZkClient.readData(path);
+      ZNRecord record = _gZkClient.readData(path);
 
-    // Successfully reads the same data.
-    Assert.assertEquals(normalSizeRecord, record);
+      // Successfully reads the same data.
+      Assert.assertEquals(normalSizeRecord, record);
 
-    int length = serializer.serialize(record).length;
+      int length = serializer.serialize(record).length;
 
-    // Less than writeSizeLimit so it is written to ZK.
-    Assert.assertTrue(length < writeSizeLimit);
+      // Less than writeSizeLimit so it is written to ZK.
+      Assert.assertTrue(length < writeSizeLimit);
 
     // 2. Large size data is not allowed to write to ZK
     // Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit.
@@ -350,27 +353,27 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
         String.valueOf(writeSizeLimit));
 
-    final ZNRecord largeRecord = new ZNRecord("large-size");
-    for (int i = 0; i < rawZnRecordSize; i++) {
-      largeRecord.setSimpleField(Integer.toString(i), bufStr);
-    }
+      final ZNRecord largeRecord = new ZNRecord("large-size");
+      for (int i = 0; i < rawZnRecordSize; i++) {
+        largeRecord.setSimpleField(Integer.toString(i), bufStr);
+      }
 
-    path = "/" + root + "/large";
-    _gZkClient.createPersistent(path, true);
+      path = "/" + root + "/large";
+      _gZkClient.createPersistent(path, true);
 
-    try {
-      _gZkClient.writeData(path, largeRecord);
-      Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!");
-    } catch (HelixException expected) {
-      Assert.assertTrue(
-          expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
-    }
+      try {
+        _gZkClient.writeData(path, largeRecord);
+        Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!");
+      } catch (ZkMarshallingError expected) {
+        Assert.assertTrue(
+            expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
+      }
 
-    // test ZkDataAccessor
-    ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
-    admin.addCluster(root, true);
-    InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
-    admin.addInstance(root, instanceConfig);
+      // test ZkDataAccessor
+      ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+      admin.addCluster(root, true);
+      InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+      admin.addInstance(root, instanceConfig);
 
     // Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit.
     writeSizeLimitKb = 10;
@@ -383,33 +386,32 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
         new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
-    IdealState idealState = new IdealState("currentState");
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
+      IdealState idealState = new IdealState("currentState");
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
 
-    for (int i = 0; i < 1024; i++) {
-      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
-    }
-    boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-    Assert.assertTrue(succeed);
-    HelixProperty property = accessor.getProperty(
-        keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1"));
-    Assert.assertNull(property);
+      for (int i = 0; i < 1024; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
+      boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+      Assert.assertTrue(succeed);
+      HelixProperty property = accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918",
"session_1", "partition_1"));
+      Assert.assertNull(property);
 
-    // legal sized data gets written to zk
-    idealState.getRecord().getSimpleFields().clear();
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
+      // legal sized data gets written to zk
+      idealState.getRecord().getSimpleFields().clear();
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
 
-    for (int i = 0; i < 900; i++) {
-      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
-    }
-    succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
-    Assert.assertTrue(succeed);
-    record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
-    Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
+      for (int i = 0; i < 900; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
+      succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+      Assert.assertTrue(succeed);
+      record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+      Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
 
     // Set small write size limit so writing does not succeed.
     writeSizeLimitKb = 1;
@@ -417,27 +419,28 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
         String.valueOf(writeSizeLimit));
 
-    // oversized data should not update existing data on zk
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    idealState.setNumPartitions(10);
-    for (int i = 900; i < 1024; i++) {
-      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
-    }
-
-    succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
-    Assert.assertFalse(succeed,
-        "Update property should not succeed because data exceeds znode write limit!");
+      // oversized data should not update existing data on zk
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
+      for (int i = 900; i < 1024; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
 
-    // Delete the nodes.
-    deletePath(_gZkClient, "/" + root);
+      succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
+      Assert.assertFalse(succeed,
+          "Update property should not succeed because data exceeds znode write limit!");
 
-    // Reset: add the properties back to system properties if they were originally available.
-    if (thresholdProperty != null) {
-      System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-          thresholdProperty);
-    } else {
-      System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      // Delete the nodes.
+      deletePath(_gZkClient, "/" + root);
+    } finally {
+      // Reset: add the properties back to system properties if they were originally available.
+      if (thresholdProperty != null) {
+        System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+            thresholdProperty);
+      } else {
+        System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      }
     }
   }
 
@@ -515,7 +518,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       try {
         zkClient.writeData(path, largeRecord);
         Assert.fail("Data should not written to ZK because data size exceeds writeSizeLimit!");
-      } catch (HelixException expected) {
+      } catch (ZkMarshallingError expected) {
         Assert.assertTrue(
             expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
       }
@@ -587,14 +590,13 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       deletePath(zkClient, "/" + root);
     } finally {
       zkClient.close();
-    }
-
-    // Reset: add the properties back to system properties if they were originally available.
-    if (thresholdProperty != null) {
-      System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-          thresholdProperty);
-    } else {
-      System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      // Reset: add the properties back to system properties if they were originally available.
+      if (thresholdProperty != null) {
+        System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+            thresholdProperty);
+      } else {
+        System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      }
     }
   }
 


Mime
View raw message