helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/8] helix git commit: Change all Helix default created ZkClients to use ZnRecordSerializer.
Date Thu, 01 Nov 2018 23:02:17 GMT
Change all Helix default created ZkClients to use ZnRecordSerializer.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b549cda9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b549cda9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b549cda9

Branch: refs/heads/master
Commit: b549cda95cb114da78efc4b0458058862bcc6d02
Parents: f9bc9f8
Author: Lei Xia <lxia@linkedin.com>
Authored: Wed Sep 19 16:34:46 2018 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Thu Nov 1 14:38:31 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java |   2 +-
 .../helix/manager/zk/ZNRecordSerializer.java    |  17 +--
 .../manager/zk/TestZNRecordSerializer.java      | 151 +++++++++++++++++++
 .../zk/TestZNRecordStreamingSerializer.java     |  19 +++
 4 files changed, 177 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index c673f51..c4275df 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -594,7 +594,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
 
   void createClient() throws Exception {
     PathBasedZkSerializer zkSerializer =
-        ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
+        ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
 
     HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress);
     connectionConfig.setSessionTimeout(_sessionTimeout);

http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
index 95ebc06..0c92224 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
@@ -21,12 +21,8 @@ package org.apache.helix.manager.zk;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.StringWriter;
 import java.util.List;
 import java.util.Map;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
 
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
@@ -40,6 +36,7 @@ import org.codehaus.jackson.map.SerializationConfig;
 
 public class ZNRecordSerializer implements ZkSerializer {
   private static Logger logger = LoggerFactory.getLogger(ZNRecordSerializer.class);
+  private final ObjectMapper _mapper = new ObjectMapper();
 
   private static int getListFieldBound(ZNRecord record) {
     int max = Integer.MAX_VALUE;
@@ -78,15 +75,14 @@ public class ZNRecordSerializer implements ZkSerializer {
     }
 
     // do serialization
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    SerializationConfig serializationConfig = _mapper.getSerializationConfig();
     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
     serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
     serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    byte[] serializedBytes = null;
+    byte[] serializedBytes;
     try {
-      mapper.writeValue(baos, data);
+      _mapper.writeValue(baos, data);
       serializedBytes = baos.toByteArray();
       // apply compression if needed
       if (record.getBooleanField("enableCompression", false) || serializedBytes.length >
ZNRecord.SIZE_LIMIT) {
@@ -113,10 +109,9 @@ public class ZNRecordSerializer implements ZkSerializer {
       return null;
     }
 
-    ObjectMapper mapper = new ObjectMapper();
     ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
 
-    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    DeserializationConfig deserializationConfig = _mapper.getDeserializationConfig();
     deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
     deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
     deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
@@ -126,7 +121,7 @@ public class ZNRecordSerializer implements ZkSerializer {
         byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
         bais = new ByteArrayInputStream(uncompressedBytes);
       }
-      ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
+      ZNRecord zn = _mapper.readValue(bais, ZNRecord.class);
 
       return zn;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
index 05df1cd..e46eb4d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
@@ -2,9 +2,19 @@ package org.apache.helix.manager.zk;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -46,6 +56,147 @@ public class TestZNRecordSerializer {
     Assert.assertEquals(result, record);
   }
 
+
+  @Test
+  public void testNullFields() {
+    ZNRecord record = new ZNRecord("testId");
+    record.setMapField("K1", null);
+    record.setListField("k2", null);
+    record.setSimpleField("k3", null);
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    byte [] data = serializer.serialize(record);
+    ZNRecord result = (ZNRecord) serializer.deserialize(data);
+
+    Assert.assertEquals(result, record);
+    Assert.assertNull(result.getMapField("K1"));
+    Assert.assertNull(result.getListField("K2"));
+    Assert.assertNull(result.getSimpleField("K3"));
+    Assert.assertNull(result.getListField("K4"));
+  }
+
+
+  @Test (enabled = false)
+  public void testPerformance() {
+    ZNRecord record = createZnRecord();
+
+    ZNRecordSerializer serializer1 = new ZNRecordSerializer();
+    ZNRecordStreamingSerializer serializer2 = new ZNRecordStreamingSerializer();
+
+    int loop = 100000;
+
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      serializer1.serialize(record);
+    }
+    System.out.println("ZNRecordSerializer serialize took " + (System.currentTimeMillis()
- start) + " ms");
+
+    byte[] data = serializer1.serialize(record);
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      serializer1.deserialize(data);
+    }
+    System.out.println("ZNRecordSerializer deserialize took " + (System.currentTimeMillis()
- start) + " ms");
+
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      data = serializer2.serialize(record);
+    }
+    System.out.println("ZNRecordStreamingSerializer serialize took " + (System.currentTimeMillis()
- start) + " ms");
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      ZNRecord result = (ZNRecord) serializer2.deserialize(data);
+    }
+    System.out.println("ZNRecordStreamingSerializer deserialize took " + (System.currentTimeMillis()
- start) + " ms");
+  }
+
+
+  ZNRecord createZnRecord() {
+    ZNRecord record = new ZNRecord("testId");
+    for (int i = 0; i < 400; i++) {
+      Map<String, String> map = new HashMap<>();
+      map.put("localhost_" + i, "Master");
+      map.put("localhost_" + (i+1), "Slave");
+      map.put("localhost_" + (i+2), "Slave");
+
+      record.setMapField("partition_" + i, map);
+      record.setListField("partition_" + i, Lists.<String>newArrayList(map.keySet()));
+      record.setSimpleField("partition_" + i,  UUID.randomUUID().toString());
+    }
+
+    return record;
+  }
+
+
+  @Test (enabled = false)
+  public void testParallelPerformance() throws ExecutionException, InterruptedException {
+    final ZNRecord record = createZnRecord();
+
+    final ZNRecordSerializer serializer1 = new ZNRecordSerializer();
+    final ZNRecordStreamingSerializer serializer2 = new ZNRecordStreamingSerializer();
+
+    int loop = 100000;
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10000);
+
+    long start = System.currentTimeMillis();
+    batchSerialize(serializer1, executorService, loop, record);
+    System.out.println("ZNRecordSerializer serialize took " + (System.currentTimeMillis()
- start) + " ms");
+
+    byte[] data = serializer1.serialize(record);
+    start = System.currentTimeMillis();
+    batchSerialize(serializer2, executorService, loop, record);
+    System.out.println("ZNRecordSerializer deserialize took " + (System.currentTimeMillis()
- start) + " ms");
+
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      data = serializer2.serialize(record);
+    }
+    System.out.println("ZNRecordStreamingSerializer serialize took " + (System.currentTimeMillis()
- start) + " ms");
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      ZNRecord result = (ZNRecord) serializer2.deserialize(data);
+    }
+    System.out.println("ZNRecordStreamingSerializer deserialize took " + (System.currentTimeMillis()
- start) + " ms");
+  }
+
+
+  private void batchSerialize(final ZkSerializer serializer, ExecutorService executorService,
int repeatTime, final ZNRecord record)
+      throws ExecutionException, InterruptedException {
+    List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < repeatTime; i++) {
+      Future f = executorService.submit(new Runnable() {
+        @Override public void run() {
+          serializer.serialize(record);
+        }
+      });
+      futures.add(f);
+    }
+    for (Future f : futures) {
+      f.get();
+    }
+  }
+
+
+  private void batchDeSerialize(final ZkSerializer serializer, ExecutorService executorService,
int repeatTime, final byte [] data)
+      throws ExecutionException, InterruptedException {
+    List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < repeatTime; i++) {
+      Future f = executorService.submit(new Runnable() {
+        @Override public void run() {
+          serializer.deserialize(data);
+        }
+      });
+      futures.add(f);
+    }
+    for (Future f : futures) {
+      f.get();
+    }
+  }
+
   /**
    * Test that simple, list, and map fields are initialized as empty even when not in json
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
index 567d842..2aea3da 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
@@ -46,6 +46,25 @@ public class TestZNRecordStreamingSerializer {
     Assert.assertEquals(result, record);
   }
 
+
+  // TODO: need to fix ZnRecordStreamingSerializer before enabling this test.
+  @Test (enabled = false)
+  public void testNullFields() {
+    ZNRecord record = new ZNRecord("testId");
+    record.setMapField("K1", null);
+    record.setListField("k2", null);
+    record.setSimpleField("k3", null);
+    ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
+    byte [] data = serializer.serialize(record);
+    ZNRecord result = (ZNRecord) serializer.deserialize(data);
+
+    Assert.assertEquals(result, record);
+    Assert.assertNull(result.getMapField("K1"));
+    Assert.assertNull(result.getListField("K2"));
+    Assert.assertNull(result.getSimpleField("K3"));
+    Assert.assertNull(result.getListField("K4"));
+  }
+
   /**
    * Check that the ZNRecord is not constructed if there is no id in the json
    */


Mime
View raw message