This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-0.9.x by this push:
new d4c991a Async write operation should not throw Exception for serializing error (#845)
(#999)
d4c991a is described below
commit d4c991a7797797450fc6917cf90db3e82d4218c2
Author: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com>
AuthorDate: Wed May 6 16:36:53 2020 -0700
Async write operation should not throw Exception for serializing error (#845) (#999)
This change will make the async write operations return error through the async callback
instead of throwing exceptions. This change will fix the batch write/create failure due to
one single node serializing failure.
In addition, according to the serializer interface definition, change ZK related serializers
to throw ZkMarshallingError instead of ZkClientException.
---
.../helix/manager/zk/ZNRecordSerializer.java | 8 +-
.../manager/zk/ZNRecordStreamingSerializer.java | 7 +-
.../helix/manager/zk/zookeeper/ZkClient.java | 51 +++---
.../apache/helix/manager/zk/TestRawZkClient.java | 68 ++++++++
.../helix/manager/zk/TestZNRecordSizeLimit.java | 182 +++++++++++----------
5 files changed, 198 insertions(+), 118 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
index 416388b..1eb16b7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
@@ -24,8 +24,8 @@ import java.io.ByteArrayOutputStream;
import java.util.List;
import java.util.Map;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.util.GZipCompressionUtil;
import org.apache.helix.util.ZNRecordUtil;
@@ -57,7 +57,7 @@ public class ZNRecordSerializer implements ZkSerializer {
// null is NOT an instance of any class
LOG.error("Input object must be of type ZNRecord but it is " + data
+ ". Will not write to zk");
- throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
+ throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + data +
")");
}
ZNRecord record = (ZNRecord) data;
@@ -96,7 +96,7 @@ public class ZNRecordSerializer implements ZkSerializer {
LOG.error(
"Exception during data serialization. ZNRecord ID: {} will not be written to zk.",
record.getId(), e);
- throw new HelixException(e);
+ throw new ZkMarshallingError(e);
}
int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
@@ -104,7 +104,7 @@ public class ZNRecordSerializer implements ZkSerializer {
LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id:
{}."
+ " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit,
isCompressed, record.getId());
- throw new HelixException(
+ throw new ZkMarshallingError(
"Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit
+ " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId());
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index 0c0af09..efe9e64 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.codec.binary.Base64;
-import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.util.GZipCompressionUtil;
import org.apache.helix.util.ZNRecordUtil;
@@ -64,7 +63,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
// null is NOT an instance of any class
LOG.error("Input object must be of type ZNRecord but it is " + data
+ ". Will not write to zk");
- throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
+ throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + data +
")");
}
// apply retention policy on list field
@@ -165,7 +164,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
LOG.error(
"Exception during data serialization. ZNRecord ID: {} will not be written to zk.",
record.getId(), e);
- throw new HelixException(e);
+ throw new ZkMarshallingError(e);
}
// check size
int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
@@ -173,7 +172,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id:
{}."
+ " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit,
isCompressed, record.getId());
- throw new HelixException(
+ throw new ZkMarshallingError(
"Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit
+ " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId());
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 66f26d6..1b4d1ca 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -34,6 +34,7 @@ import org.I0Itec.zkclient.ZkLock;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
@@ -1400,17 +1401,22 @@ public class ZkClient implements Watcher {
public void asyncCreate(final String path, Object datat, final CreateMode mode,
final ZkAsyncCallbacks.CreateCallbackHandler cb) {
final long startT = System.currentTimeMillis();
- final byte[] data = (datat == null ? null : serialize(datat, path));
- retryUntilConnected(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- ((ZkConnection) getConnection()).getZookeeper().create(path, data,
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- // Arrays.asList(DEFAULT_ACL),
- mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
- data == null ? 0 : data.length, false));
- return null;
- }
+ byte[] data = null;
+ try {
+ data = (datat == null ? null : serialize(datat, path));
+ } catch (ZkMarshallingError e) {
+ cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+ return;
+ }
+ final byte[] finalData = data;
+ retryUntilConnected(() -> {
+ ((ZkConnection) getConnection()).getZookeeper()
+ .create(path, finalData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ // Arrays.asList(DEFAULT_ACL),
+ mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+ finalData == null ? 0 : finalData.length, false));
+ return null;
});
}
@@ -1418,15 +1424,20 @@ public class ZkClient implements Watcher {
public void asyncSetData(final String path, Object datat, final int version,
final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
final long startT = System.currentTimeMillis();
- final byte[] data = serialize(datat, path);
- retryUntilConnected(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
- new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
- data == null ? 0 : data.length, false));
- return null;
- }
+ byte[] data = null;
+ try {
+ data = serialize(datat, path);
+ } catch (ZkMarshallingError e) {
+ cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+ return;
+ }
+ final byte[] finalData = data;
+ retryUntilConnected(() -> {
+ ((ZkConnection) getConnection()).getZookeeper().setData(path, finalData, version, cb,
+ new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+ finalData == null ? 0 : finalData.length, false));
+ return null;
});
}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index 95a55c4..ce109b7 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.manager.zk;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -32,7 +33,9 @@ import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.manager.zk.zookeeper.ZkConnection;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
@@ -40,6 +43,7 @@ import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -344,4 +348,68 @@ public class TestRawZkClient extends ZkUnitTestBase {
zkServer.shutdown();
}
}
+
+ @Test
+ public void testAsyncWriteOperations() {
+ ZkClient zkClient = new ZkClient(ZK_ADDR);
+ String originSizeLimit =
+ System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
"2000");
+ try {
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ ZNRecord oversizeZNRecord = new ZNRecord("Oversize");
+ StringBuilder sb = new StringBuilder(1204);
+ Random ran = new Random();
+ for (int i = 0; i < 1024; i++) {
+ sb.append(ran.nextInt(26) + 'a');
+ }
+ String buf = sb.toString();
+ for (int i = 0; i < 1024; i++) {
+ oversizeZNRecord.setSimpleField(Integer.toString(i), buf);
+ }
+
+ // ensure /tmp exists for the test
+ if (!zkClient.exists("/tmp")) {
+ zkClient.create("/tmp", null, CreateMode.PERSISTENT);
+ }
+
+ ZkAsyncCallbacks.CreateCallbackHandler
+ createCallback = new ZkAsyncCallbacks.CreateCallbackHandler();
+ zkClient.asyncCreate("/tmp/async", null, CreateMode.PERSISTENT, createCallback);
+ createCallback.waitForSuccess();
+ Assert.assertEquals(createCallback.getRc(), 0);
+ Assert.assertTrue(zkClient.exists("/tmp/async"));
+
+ // try to create oversize node, should fail
+ zkClient.asyncCreate("/tmp/asyncOversize", oversizeZNRecord, CreateMode.PERSISTENT,
+ createCallback);
+ createCallback.waitForSuccess();
+ Assert.assertEquals(createCallback.getRc(), KeeperException.Code.MarshallingError);
+ Assert.assertFalse(zkClient.exists("/tmp/asyncOversize"));
+
+ ZNRecord normalZNRecord = new ZNRecord("normal");
+ normalZNRecord.setSimpleField("key", buf);
+
+ ZkAsyncCallbacks.SetDataCallbackHandler
+ setDataCallbackHandler = new ZkAsyncCallbacks.SetDataCallbackHandler();
+ zkClient.asyncSetData("/tmp/async", normalZNRecord, -1, setDataCallbackHandler);
+ setDataCallbackHandler.waitForSuccess();
+ Assert.assertEquals(setDataCallbackHandler.getRc(), 0);
+
+ zkClient.asyncSetData("/tmp/async", oversizeZNRecord, -1, setDataCallbackHandler);
+ setDataCallbackHandler.waitForSuccess();
+ Assert.assertEquals(setDataCallbackHandler.getRc(), KeeperException.Code.MarshallingError);
+ Assert.assertEquals(zkClient.readData("/tmp/async"), normalZNRecord);
+ } finally {
+ if (originSizeLimit == null) {
+ System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ } else {
+ System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ originSizeLimit);
+ }
+ zkClient.delete("/tmp/async");
+ zkClient.delete("/tmp/asyncOversize");
+ }
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
index bc440b1..abb13b2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -21,6 +21,8 @@ package org.apache.helix.manager.zk;
import java.util.Arrays;
import java.util.Date;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey.Builder;
@@ -304,15 +306,16 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
final String thresholdProperty =
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
- ZNRecordSerializer serializer = new ZNRecordSerializer();
+ try {
+ ZNRecordSerializer serializer = new ZNRecordSerializer();
- String root = getShortClassName();
+ String root = getShortClassName();
- byte[] buf = new byte[1024];
- for (int i = 0; i < 1024; i++) {
- buf[i] = 'a';
- }
- String bufStr = new String(buf);
+ byte[] buf = new byte[1024];
+ for (int i = 0; i < 1024; i++) {
+ buf[i] = 'a';
+ }
+ String bufStr = new String(buf);
// 1. legal-sized data gets written to zk
// write a znode of size less than writeSizeLimit
@@ -322,24 +325,24 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
- final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
- for (int i = 0; i < rawZnRecordSize; i++) {
- normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
- }
+ final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
+ for (int i = 0; i < rawZnRecordSize; i++) {
+ normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
+ }
- String path = "/" + root + "/normal";
- _gZkClient.createPersistent(path, true);
- _gZkClient.writeData(path, normalSizeRecord);
+ String path = "/" + root + "/normal";
+ _gZkClient.createPersistent(path, true);
+ _gZkClient.writeData(path, normalSizeRecord);
- ZNRecord record = _gZkClient.readData(path);
+ ZNRecord record = _gZkClient.readData(path);
- // Successfully reads the same data.
- Assert.assertEquals(normalSizeRecord, record);
+ // Successfully reads the same data.
+ Assert.assertEquals(normalSizeRecord, record);
- int length = serializer.serialize(record).length;
+ int length = serializer.serialize(record).length;
- // Less than writeSizeLimit so it is written to ZK.
- Assert.assertTrue(length < writeSizeLimit);
+ // Less than writeSizeLimit so it is written to ZK.
+ Assert.assertTrue(length < writeSizeLimit);
// 2. Large size data is not allowed to write to ZK
// Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit.
@@ -350,27 +353,27 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
- final ZNRecord largeRecord = new ZNRecord("large-size");
- for (int i = 0; i < rawZnRecordSize; i++) {
- largeRecord.setSimpleField(Integer.toString(i), bufStr);
- }
+ final ZNRecord largeRecord = new ZNRecord("large-size");
+ for (int i = 0; i < rawZnRecordSize; i++) {
+ largeRecord.setSimpleField(Integer.toString(i), bufStr);
+ }
- path = "/" + root + "/large";
- _gZkClient.createPersistent(path, true);
+ path = "/" + root + "/large";
+ _gZkClient.createPersistent(path, true);
- try {
- _gZkClient.writeData(path, largeRecord);
- Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!");
- } catch (HelixException expected) {
- Assert.assertTrue(
- expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
- }
+ try {
+ _gZkClient.writeData(path, largeRecord);
+ Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!");
+ } catch (ZkMarshallingError expected) {
+ Assert.assertTrue(
+ expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
+ }
- // test ZkDataAccessor
- ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
- admin.addCluster(root, true);
- InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
- admin.addInstance(root, instanceConfig);
+ // test ZkDataAccessor
+ ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+ admin.addCluster(root, true);
+ InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+ admin.addInstance(root, instanceConfig);
// Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit.
writeSizeLimitKb = 10;
@@ -383,33 +386,32 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
- IdealState idealState = new IdealState("currentState");
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
- idealState.setNumPartitions(10);
+ IdealState idealState = new IdealState("currentState");
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
- for (int i = 0; i < 1024; i++) {
- idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
- }
- boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
- Assert.assertTrue(succeed);
- HelixProperty property = accessor.getProperty(
- keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1"));
- Assert.assertNull(property);
+ for (int i = 0; i < 1024; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
+ boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+ Assert.assertTrue(succeed);
+ HelixProperty property = accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918",
"session_1", "partition_1"));
+ Assert.assertNull(property);
- // legal sized data gets written to zk
- idealState.getRecord().getSimpleFields().clear();
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
- idealState.setNumPartitions(10);
+ // legal sized data gets written to zk
+ idealState.getRecord().getSimpleFields().clear();
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
- for (int i = 0; i < 900; i++) {
- idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
- }
- succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
- Assert.assertTrue(succeed);
- record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
- Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
+ for (int i = 0; i < 900; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
+ succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+ Assert.assertTrue(succeed);
+ record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+ Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
// Set small write size limit so writing does not succeed.
writeSizeLimitKb = 1;
@@ -417,27 +419,28 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
- // oversized data should not update existing data on zk
- idealState.setStateModelDefRef("MasterSlave");
- idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
- idealState.setNumPartitions(10);
- for (int i = 900; i < 1024; i++) {
- idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
- }
-
- succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
- Assert.assertFalse(succeed,
- "Update property should not succeed because data exceeds znode write limit!");
+ // oversized data should not update existing data on zk
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
+ for (int i = 900; i < 1024; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
- // Delete the nodes.
- deletePath(_gZkClient, "/" + root);
+ succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
+ Assert.assertFalse(succeed,
+ "Update property should not succeed because data exceeds znode write limit!");
- // Reset: add the properties back to system properties if they were originally available.
- if (thresholdProperty != null) {
- System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
- thresholdProperty);
- } else {
- System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ // Delete the nodes.
+ deletePath(_gZkClient, "/" + root);
+ } finally {
+ // Reset: add the properties back to system properties if they were originally available.
+ if (thresholdProperty != null) {
+ System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ thresholdProperty);
+ } else {
+ System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ }
}
}
@@ -515,7 +518,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
try {
zkClient.writeData(path, largeRecord);
Assert.fail("Data should not written to ZK because data size exceeds writeSizeLimit!");
- } catch (HelixException expected) {
+ } catch (ZkMarshallingError expected) {
Assert.assertTrue(
expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
}
@@ -587,14 +590,13 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
deletePath(zkClient, "/" + root);
} finally {
zkClient.close();
- }
-
- // Reset: add the properties back to system properties if they were originally available.
- if (thresholdProperty != null) {
- System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
- thresholdProperty);
- } else {
- System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ // Reset: add the properties back to system properties if they were originally available.
+ if (thresholdProperty != null) {
+ System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ thresholdProperty);
+ } else {
+ System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ }
}
}
|