helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [helix] branch master updated: Add system property options to config write size limit for ZNRecord Serializer (#809)
Date Sat, 29 Feb 2020 17:19:40 GMT
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f3c64b  Add system property options to config write size limit for ZNRecord Serializer
(#809)
0f3c64b is described below

commit 0f3c64be152d07db272d8560a50fcdcedff2e5b6
Author: Huizhi Lu <ihuizhi.lu@gmail.com>
AuthorDate: Sat Feb 29 09:19:31 2020 -0800

    Add system property options to config write size limit for ZNRecord Serializer (#809)
    
    With default value 1 MB of ZNRecord size limit in ZNRecord serializers, serialized data
may still fail to be written to Zookeeper. This commit adds system property options to config
ZNRecord's write size limit and auto compression enabled in ZNRecord serializers.
---
 .../helix/manager/zk/TestZNRecordSizeLimit.java    | 323 ++++++++++++++++++++-
 .../zookeeper/constant/ZkSystemPropertyKeys.java   |  53 ++++
 .../apache/helix/zookeeper/datamodel/ZNRecord.java |  12 +
 .../datamodel/serializer/ZNRecordSerializer.java   |  37 ++-
 .../serializer/ZNRecordStreamingSerializer.java    |  26 +-
 .../apache/helix/zookeeper/util/ZNRecordUtil.java  |  65 +++++
 zookeeper-api/src/test/conf/testng.xml             |   2 +-
 .../TestZNRecordSerializeWriteSizeLimit.java       | 200 +++++++++++++
 8 files changed, 693 insertions(+), 25 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
index 447478c..30d6349 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -25,9 +25,12 @@ import java.util.Date;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
+import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -160,7 +163,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
         + new Date(System.currentTimeMillis()));
   }
 
-  @Test
+  @Test(dependsOnMethods = "testZNRecordSizeLimitUseZNRecordSerializer")
   public void testZNRecordSizeLimitUseZNRecordStreamingSerializer() {
     String className = getShortClassName();
     System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at " +
new Date(
@@ -290,4 +293,322 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new
Date(
         System.currentTimeMillis()));
   }
+
+  /*
+   * Tests ZNRecordSerializer threshold.
+   * Two cases using ZkClient and ZkDataAccessor:
+   * 1. serialized data size is less than threshold and could be written to ZK.
+   * 2. serialized data size is greater than threshold, so ZkClientException is thrown.
+   */
+  @Test(dependsOnMethods = "testZNRecordSizeLimitUseZNRecordStreamingSerializer")
+  public void testZNRecordSerializerWriteSizeLimit() throws Exception {
+    // Backup properties for later resetting.
+    final String thresholdProperty =
+        System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+
+    String root = getShortClassName();
+
+    byte[] buf = new byte[1024];
+    for (int i = 0; i < 1024; i++) {
+      buf[i] = 'a';
+    }
+    String bufStr = new String(buf);
+
+    // 1. legal-sized data gets written to zk
+    // write a znode of size less than writeSizeLimit
+    int rawZnRecordSize = 700;
+    int writeSizeLimitKb = 800;
+    int writeSizeLimit = writeSizeLimitKb * 1024;
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+        String.valueOf(writeSizeLimit));
+
+    final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
+    for (int i = 0; i < rawZnRecordSize; i++) {
+      normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
+    }
+
+    String path = "/" + root + "/normal";
+    _gZkClient.createPersistent(path, true);
+    _gZkClient.writeData(path, normalSizeRecord);
+
+    ZNRecord record = _gZkClient.readData(path);
+
+    // Successfully reads the same data.
+    Assert.assertEquals(normalSizeRecord, record);
+
+    int length = serializer.serialize(record).length;
+
+    // Less than writeSizeLimit so it is written to ZK.
+    Assert.assertTrue(length < writeSizeLimit);
+
+    // 2. Large size data is not allowed to write to ZK
+    // Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit.
+    rawZnRecordSize = 2000;
+    // Set the writeSizeLimit to very small so serialized data size exceeds the writeSizeLimit.
+    writeSizeLimitKb = 1;
+    writeSizeLimit = writeSizeLimitKb * 1024;
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+        String.valueOf(writeSizeLimit));
+
+    final ZNRecord largeRecord = new ZNRecord("large-size");
+    for (int i = 0; i < rawZnRecordSize; i++) {
+      largeRecord.setSimpleField(Integer.toString(i), bufStr);
+    }
+
+    path = "/" + root + "/large";
+    _gZkClient.createPersistent(path, true);
+
+    try {
+      _gZkClient.writeData(path, largeRecord);
+      Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!");
+    } catch (ZkClientException expected) {
+      Assert.assertTrue(
+          expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
+    }
+
+    // test ZkDataAccessor
+    ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+    admin.addCluster(root, true);
+    InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+    admin.addInstance(root, instanceConfig);
+
+    // Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit.
+    writeSizeLimitKb = 10;
+    writeSizeLimit = writeSizeLimitKb * 1024;
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+        String.valueOf(writeSizeLimit));
+
+    // oversized data should not create any new data on zk
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(ZK_ADDR));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    IdealState idealState = new IdealState("currentState");
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    idealState.setNumPartitions(10);
+
+    for (int i = 0; i < 1024; i++) {
+      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+    }
+    boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+    Assert.assertTrue(succeed);
+    HelixProperty property = accessor.getProperty(
+        keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1"));
+    Assert.assertNull(property);
+
+    // legal sized data gets written to zk
+    idealState.getRecord().getSimpleFields().clear();
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    idealState.setNumPartitions(10);
+
+    for (int i = 0; i < 900; i++) {
+      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+    }
+    succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+    Assert.assertTrue(succeed);
+    record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+    Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
+
+    // Set small write size limit so writing does not succeed.
+    writeSizeLimitKb = 1;
+    writeSizeLimit = writeSizeLimitKb * 1024;
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+        String.valueOf(writeSizeLimit));
+
+    // oversized data should not update existing data on zk
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    idealState.setNumPartitions(10);
+    for (int i = 900; i < 1024; i++) {
+      idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+    }
+
+    succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
+    Assert.assertFalse(succeed,
+        "Update property should not succeed because data exceeds znode write limit!");
+
+    // Delete the nodes.
+    deletePath(_gZkClient, "/" + root);
+
+    // Reset: add the properties back to system properties if they were originally available.
+    if (thresholdProperty != null) {
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          thresholdProperty);
+    } else {
+      System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    }
+  }
+
+  /*
+   * Tests ZNRecordStreamingSerializer threshold.
+   * Two cases using ZkClient and ZkDataAccessor:
+   * 1. serialized data size is less than threshold and could be written to ZK.
+   * 2. serialized data size is greater than threshold, so ZkClientException is thrown.
+   */
+  @Test(dependsOnMethods = "testZNRecordSerializerWriteSizeLimit")
+  public void testZNRecordStreamingSerializerWriteSizeLimit() throws Exception {
+    // Backup properties for later resetting.
+    final String thresholdProperty =
+        System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+    ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+
+    try {
+      zkClient.setZkSerializer(serializer);
+
+      String root = getShortClassName();
+
+      byte[] buf = new byte[1024];
+      for (int i = 0; i < 1024; i++) {
+        buf[i] = 'a';
+      }
+      String bufStr = new String(buf);
+
+      // 1. legal-sized data gets written to zk
+      // write a znode of size less than writeSizeLimit
+      int rawZnRecordSize = 700;
+      int writeSizeLimitKb = 800;
+      int writeSizeLimit = writeSizeLimitKb * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          String.valueOf(writeSizeLimit));
+
+      final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
+      for (int i = 0; i < rawZnRecordSize; i++) {
+        normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
+      }
+
+      String path = "/" + root + "/normal";
+      zkClient.createPersistent(path, true);
+      zkClient.writeData(path, normalSizeRecord);
+
+      ZNRecord record = zkClient.readData(path);
+
+      // Successfully reads the same data.
+      Assert.assertEquals(normalSizeRecord, record);
+
+      int length = serializer.serialize(record).length;
+
+      // Less than writeSizeLimit so it is written to ZK.
+      Assert.assertTrue(length < writeSizeLimit);
+
+      // 2. Large size data is not allowed to write to ZK
+      // Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit.
+      rawZnRecordSize = 2000;
+      // Set the writeSizeLimit to very small so serialized data size exceeds the writeSizeLimit.
+      writeSizeLimitKb = 1;
+      writeSizeLimit = writeSizeLimitKb * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          String.valueOf(writeSizeLimit));
+
+      final ZNRecord largeRecord = new ZNRecord("large-size");
+      for (int i = 0; i < rawZnRecordSize; i++) {
+        largeRecord.setSimpleField(Integer.toString(i), bufStr);
+      }
+
+      path = "/" + root + "/large";
+      zkClient.createPersistent(path, true);
+
+      try {
+        zkClient.writeData(path, largeRecord);
+        Assert.fail("Data should not written to ZK because data size exceeds writeSizeLimit!");
+      } catch (ZkClientException expected) {
+        Assert.assertTrue(
+            expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
+      }
+
+      // test ZkDataAccessor
+      ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+      admin.addCluster(root, true);
+      InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+      admin.addInstance(root, instanceConfig);
+
+      // Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit.
+      writeSizeLimitKb = 10;
+      writeSizeLimit = writeSizeLimitKb * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          String.valueOf(writeSizeLimit));
+
+      // oversize data should not create any new data on zk
+      ZKHelixDataAccessor accessor =
+          new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(ZK_ADDR));
+      Builder keyBuilder = accessor.keyBuilder();
+
+      IdealState idealState = new IdealState("currentState");
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
+
+      for (int i = 0; i < 1024; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
+      boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+      Assert.assertTrue(succeed);
+      HelixProperty property = accessor.getProperty(
+          keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1"));
+      Assert.assertNull(property);
+
+      // legal sized data gets written to zk
+      idealState.getRecord().getSimpleFields().clear();
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
+
+      for (int i = 0; i < 900; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
+      succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+      Assert.assertTrue(succeed);
+      record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+      Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
+
+      // Set small write size limit so writing does not succeed.
+      writeSizeLimitKb = 1;
+      writeSizeLimit = writeSizeLimitKb * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          String.valueOf(writeSizeLimit));
+
+      // oversize data should not update existing data on zk
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(10);
+      for (int i = 900; i < 1024; i++) {
+        idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+      }
+
+      succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
+      Assert.assertFalse(succeed,
+          "Update property should not succeed because data exceeds znode write limit!");
+
+      // Delete the nodes.
+      deletePath(zkClient, "/" + root);
+    } finally {
+      zkClient.close();
+    }
+
+    // Reset: add the properties back to system properties if they were originally available.
+    if (thresholdProperty != null) {
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          thresholdProperty);
+    } else {
+      System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    }
+  }
+
+  private void deletePath(final HelixZkClient zkClient, final String path) throws Exception
{
+    Assert.assertTrue(TestHelper.verify(() -> {
+      do {
+        try {
+          zkClient.deleteRecursively(path);
+        } catch (ZkClientException ex) {
+          // ignore
+        }
+      } while (zkClient.exists(path));
+      return true;
+    }, TestHelper.WAIT_DURATION));
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
new file mode 100644
index 0000000..bd88c61
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
@@ -0,0 +1,53 @@
+package org.apache.helix.zookeeper.constant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This class contains various ZK system property keys.
+ */
+public class ZkSystemPropertyKeys {
+
+  /**
+   * Setting this property to true in system properties enables auto compression in ZK serializer.
+   * The data will be automatically compressed by
+   * {@link org.apache.helix.zookeeper.util.GZipCompressionUtil} when being written to Zookeeper
+   * if size of serialized data exceeds the write size limit, which by default is 1 MB or
could be
+   * set by {@value ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES}.
+   * <p>
+   * The default value is "true" (enabled).
+   */
+  public static final String ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED =
+      "zk.serializer.znrecord.auto-compress.enabled";
+
+  /**
+   * This is property that defines the maximum write size in bytes for ZKRecord's two serializers
+   * before serialized data is ready to be written to ZK. This property applies to
+   * 1. {@link org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer}
+   * 2. {@link org.apache.helix.zookeeper.datamodel.serializer.ZNRecordStreamingSerializer}.
+   * <p>
+   * If the size of serialized data (no matter whether it is compressed or not) exceeds this
+   * configured limit, the data will NOT be written to Zookeeper.
+   * <p>
+   * Default value is 1 MB. If the configured limit is less than or equal to 0 byte,
+   * the default value will be used.
+   */
+  public static final String ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES =
+      "zk.serializer.znrecord.write.size.limit.bytes";
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/ZNRecord.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/ZNRecord.java
index 6070414..38e4788 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/ZNRecord.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/ZNRecord.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.helix.zookeeper.datamodel.serializer.JacksonPayloadSerializer;
 import org.apache.helix.zookeeper.datamodel.serializer.PayloadSerializer;
 import org.codehaus.jackson.annotate.JsonCreator;
@@ -49,6 +50,17 @@ public class ZNRecord {
   @JsonIgnore(true)
   public static final String LIST_FIELD_BOUND = "listField.bound";
 
+  /** A field name in ZNRecord's simple fields to enable compression in ZNRecord serializers.
*/
+  @JsonIgnore
+  public static final String ENABLE_COMPRESSION_BOOLEAN_FIELD = "enableCompression";
+
+  /**
+   * Default value for system property
+   * {@link ZkSystemPropertyKeys#ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED}
+   */
+  @JsonIgnore
+  public static final String ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_DEFAULT = "true";
+
   @JsonIgnore(true)
   public static final int SIZE_LIMIT = 1000 * 1024; // leave a margin out of 1M
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
index 89850b0..bf2c4bf 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.util.GZipCompressionUtil;
+import org.apache.helix.zookeeper.util.ZNRecordUtil;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 
 public class ZNRecordSerializer implements ZkSerializer {
-  private static Logger logger = LoggerFactory.getLogger(ZNRecordSerializer.class);
+  private static Logger LOG = LoggerFactory.getLogger(ZNRecordSerializer.class);
 
   private static int getListFieldBound(ZNRecord record) {
     int max = Integer.MAX_VALUE;
@@ -45,7 +46,7 @@ public class ZNRecordSerializer implements ZkSerializer {
       try {
         max = Integer.parseInt(maxStr);
       } catch (Exception e) {
-        logger.error("IllegalNumberFormat for list field bound: " + maxStr);
+        LOG.error("IllegalNumberFormat for list field bound: " + maxStr);
       }
     }
     return max;
@@ -55,7 +56,7 @@ public class ZNRecordSerializer implements ZkSerializer {
   public byte[] serialize(Object data) {
     if (!(data instanceof ZNRecord)) {
       // null is NOT an instance of any class
-      logger.error("Input object must be of type ZNRecord but it is " + data
+      LOG.error("Input object must be of type ZNRecord but it is " + data
           + ". Will not write to zk");
       throw new ZkClientException("Input object is not of type ZNRecord (was " + data + ")");
     }
@@ -82,24 +83,33 @@ public class ZNRecordSerializer implements ZkSerializer {
     serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     byte[] serializedBytes;
+    boolean isCompressed = false;
+
     try {
       mapper.writeValue(baos, data);
       serializedBytes = baos.toByteArray();
       // apply compression if needed
-      if (record.getBooleanField("enableCompression", false) || serializedBytes.length >
ZNRecord.SIZE_LIMIT) {
+      if (ZNRecordUtil.shouldCompress(record, serializedBytes.length)) {
         serializedBytes = GZipCompressionUtil.compress(serializedBytes);
+        isCompressed = true;
       }
     } catch (Exception e) {
-      logger.error("Exception during data serialization. Will not write to zk. Data (first
1k): "
-          + new String(baos.toByteArray()).substring(0, 1024), e);
+      LOG.error(
+          "Exception during data serialization. ZNRecord ID: {} will not be written to zk.",
+          record.getId(), e);
       throw new ZkClientException(e);
     }
-    if (serializedBytes.length > ZNRecord.SIZE_LIMIT) {
-      logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
-          + ". Will not write to zk. Data (first 1k): "
-          + new String(serializedBytes).substring(0, 1024));
-      throw new ZkClientException("Data size larger than 1M, ZNRecord.id: " + record.getId());
+
+    int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
+    if (serializedBytes.length > writeSizeLimit) {
+      LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id:
{}."
+              + " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit,
+          isCompressed, record.getId());
+      throw new ZkClientException(
+          "Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit
+              + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId());
     }
+
     return serializedBytes;
   }
 
@@ -123,11 +133,10 @@ public class ZNRecordSerializer implements ZkSerializer {
         byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
         bais = new ByteArrayInputStream(uncompressedBytes);
       }
-      ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
 
-      return zn;
+      return mapper.readValue(bais, ZNRecord.class);
     } catch (Exception e) {
-      logger.error("Exception during deserialization of bytes: " + new String(bytes), e);
+      LOG.error("Exception during deserialization of bytes: {}", new String(bytes), e);
       return null;
     }
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordStreamingSerializer.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordStreamingSerializer.java
index c5acdb0..5c8579c 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordStreamingSerializer.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordStreamingSerializer.java
@@ -31,6 +31,7 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.util.GZipCompressionUtil;
+import org.apache.helix.zookeeper.util.ZNRecordUtil;
 import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.codehaus.jackson.JsonFactory;
@@ -79,7 +80,9 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       }
     }
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    byte[] serializedBytes = null;
+    byte[] serializedBytes;
+    boolean isCompressed = false;
+
     try {
       JsonFactory f = new JsonFactory();
       JsonGenerator g = f.createJsonGenerator(baos);
@@ -154,20 +157,25 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       g.close();
       serializedBytes = baos.toByteArray();
       // apply compression if needed
-      if (record.getBooleanField("enableCompression", false) || serializedBytes.length >
ZNRecord.SIZE_LIMIT) {
+      if (ZNRecordUtil.shouldCompress(record, serializedBytes.length)) {
         serializedBytes = GZipCompressionUtil.compress(serializedBytes);
+        isCompressed = true;
       }
     } catch (Exception e) {
-      LOG.error("Exception during data serialization. Will not write to zk. Data (first 1k):
"
-          + new String(baos.toByteArray()).substring(0, 1024), e);
+      LOG.error(
+          "Exception during data serialization. ZNRecord ID: {} will not be written to zk.",
+          record.getId(), e);
       throw new ZkClientException(e);
     }
     // check size
-    if (serializedBytes.length > ZNRecord.SIZE_LIMIT) {
-      LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
-          + ". Will not write to zk. Data (first 1k): "
-          + new String(serializedBytes).substring(0, 1024));
-      throw new ZkClientException("Data size larger than 1M, ZNRecord.id: " + record.getId());
+    int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
+    if (serializedBytes.length > writeSizeLimit) {
+      LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id:
{}."
+              + " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit,
+          isCompressed, record.getId());
+      throw new ZkClientException(
+          "Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit
+              + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId());
     }
 
     return serializedBytes;
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/ZNRecordUtil.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/ZNRecordUtil.java
new file mode 100644
index 0000000..e81bef1
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/ZNRecordUtil.java
@@ -0,0 +1,65 @@
+package org.apache.helix.zookeeper.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+
+/**
+ * This utility class contains various methods for manipulating ZNRecord.
+ */
+public class ZNRecordUtil {
+
+  /**
+   * Checks whether or not a serialized ZNRecord bytes should be compressed before being
written to
+   * Zookeeper.
+   *
+   * @param record raw ZNRecord before being serialized
+   * @param serializedLength length of the serialized bytes array
+   * @return
+   */
+  public static boolean shouldCompress(ZNRecord record, int serializedLength) {
+    if (record.getBooleanField(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD, false)) {
+      return true;
+    }
+
+    boolean autoCompressEnabled = Boolean.parseBoolean(System
+        .getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
+            ZNRecord.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_DEFAULT));
+
+    return autoCompressEnabled && serializedLength > getSerializerWriteSizeLimit();
+  }
+
+  /**
+   * Returns ZNRecord serializer write size limit in bytes. If size limit is configured to
be less
+   * than or equal to 0, the default value will be used instead.
+   */
+  public static int getSerializerWriteSizeLimit() {
+    Integer writeSizeLimit =
+        Integer.getInteger(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+    if (writeSizeLimit == null || writeSizeLimit <= 0) {
+      return ZNRecord.SIZE_LIMIT;
+    }
+
+    return writeSizeLimit;
+  }
+}
diff --git a/zookeeper-api/src/test/conf/testng.xml b/zookeeper-api/src/test/conf/testng.xml
index 6c78c76a4..0847f47 100644
--- a/zookeeper-api/src/test/conf/testng.xml
+++ b/zookeeper-api/src/test/conf/testng.xml
@@ -21,7 +21,7 @@ under the License.
 <suite name="Suite" parallel="false">
   <test name="Test" preserve-order="true">
     <packages>
-      <package name="org.apache.helix.zookeeper.api.*"/>
+      <package name="org.apache.helix.zookeeper.*"/>
     </packages>
   </test>
 </suite>
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/datamodel/serializer/TestZNRecordSerializeWriteSizeLimit.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/datamodel/serializer/TestZNRecordSerializeWriteSizeLimit.java
new file mode 100644
index 0000000..12f98ac
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/datamodel/serializer/TestZNRecordSerializeWriteSizeLimit.java
@@ -0,0 +1,200 @@
+package org.apache.helix.zookeeper.datamodel.serializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.util.GZipCompressionUtil;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZNRecordSerializeWriteSizeLimit {
+
+  /*
+   * Tests data serializing when auto compression is disabled. If the system property for
+   * auto compression is set to "false", auto compression is disabled.
+   */
+  @Test
+  public void testAutoCompressionDisabled() {
+    // Backup properties for later resetting.
+    final String compressionEnabledProperty =
+        System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED);
+    final String compressionThresholdProperty =
+        System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+    // Prepare system properties to disable auto compression.
+    final int writeSizeLimit = 200 * 1024;
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+        String.valueOf(writeSizeLimit));
+
+    // 2. Set the auto compression enabled property to false so auto compression is disabled.
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
"false");
+
+    // Verify auto compression is disabled.
+    Assert.assertFalse(
+        Boolean.getBoolean(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED));
+    // Data size 300 KB > size limit 200 KB: exception expected.
+    verifyAutoCompression(300, writeSizeLimit, true, false, true);
+
+    // Data size 100 KB < size limit 200 KB: pass
+    verifyAutoCompression(100, writeSizeLimit, false, false, false);
+
+    // Reset: add the properties back to system properties if they were originally available.
+    if (compressionEnabledProperty != null) {
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
+          compressionEnabledProperty);
+    } else {
+      System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED);
+    }
+    if (compressionThresholdProperty != null) {
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          compressionThresholdProperty);
+    } else {
+      System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    }
+  }
+
+  /*
+   * Tests data serializing when write size limit is set.
+   * Two cases:
+   * 1. limit is not set
+   * --> default size is used.
+   * 2. limit is set
+   * --> serialized data is checked by the limit: pass or throw ZkClientException.
+   */
+  @Test
+  public void testZNRecordSerializerWriteSizeLimit() {
+    // Backup properties for later resetting.
+    final String writeSizeLimitProperty =
+        System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+    // Unset write size limit property so default limit is used.
+    System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+    Assert.assertNull(
+        System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES));
+
+    verifyAutoCompression(500, ZNRecord.SIZE_LIMIT, false, false, false);
+
+    // 2. Set size limit so serialized data is greater than the size limit but compressed
data
+    // is smaller than the size limit.
+    // Set it to 2000 bytes
+    int writeSizeLimit = 2000;
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+        String.valueOf(writeSizeLimit));
+
+    // Verify auto compression is done.
+    verifyAutoCompression(200, writeSizeLimit, true, true, false);
+
+    // 3. Set size limit to be be less than default value. The default value will be used
for write
+    // size limit.
+    writeSizeLimit = 2000;
+    System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+        String.valueOf(writeSizeLimit));
+
+    // Verify ZkClientException is thrown because compressed data is larger than size limit.
+    verifyAutoCompression(1000, writeSizeLimit, true, true, true);
+
+    // Reset: add the properties back to system properties if they were originally available.
+    if (writeSizeLimitProperty != null) {
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          writeSizeLimitProperty);
+    } else {
+      System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    }
+  }
+
+  private void verifyAutoCompression(int recordSize, int limit, boolean greaterThanThreshold,
+      boolean compressionExpected, boolean exceptionExpected) {
+    ZNRecord record = createZNRecord(recordSize);
+
+    // Makes sure the length of serialized bytes is greater than limit to
+    // satisfy the condition: serialized bytes' length exceeds the limit.
+    byte[] preCompressedBytes = serialize(record);
+
+    Assert.assertEquals(preCompressedBytes.length > limit, greaterThanThreshold);
+
+    ZkSerializer zkSerializer = new ZNRecordSerializer();
+
+    byte[] bytes;
+    try {
+      bytes = zkSerializer.serialize(record);
+
+      Assert.assertEquals(bytes.length >= limit, exceptionExpected);
+      Assert.assertFalse(exceptionExpected);
+    } catch (ZkClientException ex) {
+      Assert.assertTrue(exceptionExpected, "Should not throw ZkClientException.");
+      Assert.assertTrue(ex.getMessage().contains(" is greater than " + limit + " bytes"));
+      // No need to verify following asserts as bytes data is not returned.
+      return;
+    }
+
+    // Verify whether serialized data is compressed or not.
+    Assert.assertEquals(GZipCompressionUtil.isCompressed(bytes), compressionExpected);
+    Assert.assertEquals(preCompressedBytes.length != bytes.length, compressionExpected);
+
+    // Verify serialized bytes could correctly deserialize.
+    Assert.assertEquals(zkSerializer.deserialize(bytes), record);
+  }
+
+  private ZNRecord createZNRecord(final int recordSizeKb) {
+    byte[] buf = new byte[1024];
+    for (int i = 0; i < 1024; i++) {
+      buf[i] = 'a';
+    }
+    String bufStr = new String(buf);
+
+    ZNRecord record = new ZNRecord("record");
+    for (int i = 0; i < recordSizeKb; i++) {
+      record.setSimpleField(Integer.toString(i), bufStr);
+    }
+
+    return record;
+  }
+
+  // Simulates serializing so we can check the size of serialized bytes.
+  // Returns raw serialized bytes before being compressed.
+  private byte[] serialize(Object data) {
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    byte[] serializedBytes = new byte[0];
+
+    try {
+      mapper.writeValue(baos, data);
+      serializedBytes = baos.toByteArray();
+    } catch (IOException e) {
+      Assert.fail("Can not serialize data.", e);
+    }
+
+    return serializedBytes;
+  }
+}


Mime
View raw message