Repository: helix
Updated Branches:
refs/heads/master 6775cd3ff -> d57882b9b
Add protective check for ZooKeeper writing data that is bigger than 1MB
ZooKeeper server drops connections for requests that are trying to write data bigger than
1 MB, without returning any error code. When a Helix user does so, the request times out without
giving a reason.
ZkClient in Helix is a wrapper for ZkClient in ZooKeeper. Add check in the Helix ZkClient
wrapper to give user exact timeout reason when data is bigger than 1MB.
Add unit test.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f9f554e6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f9f554e6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f9f554e6
Branch: refs/heads/master
Commit: f9f554e68bfbffdfd8f87db76d546f7202f1541b
Parents: 6775cd3
Author: Weihan Kong <wkong@linkedin.com>
Authored: Mon Jan 23 17:18:00 2017 -0800
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Tue Oct 3 14:49:05 2017 -0700
----------------------------------------------------------------------
.../org/apache/helix/manager/zk/ZkClient.java | 24 +++-
.../org/apache/helix/TestZkClientWrapper.java | 116 ------------------
.../apache/helix/manager/zk/TestZkClient.java | 122 +++++++++++++++++++
3 files changed, 140 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 0a61e82..8f11eb3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -30,6 +30,8 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
@@ -287,7 +289,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
long startT = System.nanoTime();
try {
final byte[] data = serialize(datat, path);
-
+ checkDataSizeLimit(data);
retryUntilConnected(new Callable<Object>() {
@Override
@@ -308,12 +310,13 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
throws InterruptedException {
long start = System.nanoTime();
try {
- final byte[] bytes = _zkSerializer.serialize(datat, path);
+ final byte[] data = _zkSerializer.serialize(datat, path);
+ checkDataSizeLimit(data);
return retryUntilConnected(new Callable<Stat>() {
@Override
public Stat call() throws Exception {
- return ((ZkConnection) _connection).getZookeeper().setData(path, bytes, expectedVersion);
+ return ((ZkConnection) _connection).getZookeeper().setData(path, data, expectedVersion);
}
});
} finally {
@@ -325,7 +328,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
}
@Override
- public String create(final String path, Object data, final CreateMode mode)
+ public String create(final String path, Object datat, final CreateMode mode)
throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
{
if (path == null) {
throw new NullPointerException("path must not be null.");
@@ -333,13 +336,14 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
long startT = System.nanoTime();
try {
- final byte[] bytes = data == null ? null : serialize(data, path);
+ final byte[] data = datat == null ? null : serialize(datat, path);
+ checkDataSizeLimit(data);
return retryUntilConnected(new Callable<String>() {
@Override
public String call() throws Exception {
- return _connection.create(path, bytes, mode);
+ return _connection.create(path, data, mode);
}
});
} finally {
@@ -451,4 +455,12 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
}
});
}
+
+ private void checkDataSizeLimit(byte[] data) {
+ if (data != null && data.length > ZNRecord.SIZE_LIMIT) {
+ LOG.error("Data size larger than 1M, will not write to zk. Data (first 1k): "
+ + new String(data).substring(0, 1024));
+ throw new HelixException("Data size larger than 1M");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
deleted file mode 100644
index bc3d266..0000000
--- a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestZkClientWrapper extends ZkUnitTestBase {
- private static Logger LOG = Logger.getLogger(TestZkClientWrapper.class);
-
- ZkClient _zkClient;
-
- @BeforeClass
- public void beforeClass() {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- }
-
- @AfterClass
- public void afterClass() {
- _zkClient.close();
- }
-
- @Test()
- void testGetStat() {
- String path = "/tmp/getStatTest";
- _zkClient.deleteRecursive(path);
-
- Stat stat, newStat;
- stat = _zkClient.getStat(path);
- AssertJUnit.assertNull(stat);
- _zkClient.createPersistent(path, true);
-
- stat = _zkClient.getStat(path);
- AssertJUnit.assertNotNull(stat);
-
- newStat = _zkClient.getStat(path);
- AssertJUnit.assertEquals(stat, newStat);
-
- _zkClient.writeData(path, new ZNRecord("Test"));
- newStat = _zkClient.getStat(path);
- AssertJUnit.assertNotSame(stat, newStat);
- }
-
- @Test()
- void testSessioExpire() throws Exception {
- IZkStateListener listener = new IZkStateListener() {
-
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- System.out.println("In Old connection New state " + state);
- }
-
- @Override
- public void handleNewSession() throws Exception {
- System.out.println("In Old connection New session");
- }
-
- @Override
- public void handleSessionEstablishmentError(Throwable var1) throws Exception {
- }
- };
-
- _zkClient.subscribeStateChanges(listener);
- ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
- ZooKeeper zookeeper = connection.getZookeeper();
- System.out.println("old sessionId= " + zookeeper.getSessionId());
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- System.out.println("In New connection In process event:" + event);
- }
- };
- ZooKeeper newZookeeper =
- new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
- zookeeper.getSessionId(), zookeeper.getSessionPasswd());
- Thread.sleep(3000);
- System.out.println("New sessionId= " + newZookeeper.getSessionId());
- Thread.sleep(3000);
- newZookeeper.close();
- Thread.sleep(10000);
- connection = ((ZkConnection) _zkClient.getConnection());
- zookeeper = connection.getZookeeper();
- System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
new file mode 100644
index 0000000..0019d40
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
@@ -0,0 +1,122 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestZkClient extends ZkUnitTestBase {
+ private static Logger LOG = Logger.getLogger(TestZkClient.class);
+
+ ZkClient _zkClient;
+
+ @BeforeClass
+ public void beforeClass() {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ @AfterClass
+ public void afterClass() {
+ _zkClient.close();
+ }
+
+ @Test()
+ void testGetStat() {
+ String path = "/tmp/getStatTest";
+ _zkClient.deleteRecursive(path);
+
+ Stat stat, newStat;
+ stat = _zkClient.getStat(path);
+ AssertJUnit.assertNull(stat);
+ _zkClient.createPersistent(path, true);
+
+ stat = _zkClient.getStat(path);
+ AssertJUnit.assertNotNull(stat);
+
+ newStat = _zkClient.getStat(path);
+ AssertJUnit.assertEquals(stat, newStat);
+
+ _zkClient.writeData(path, new ZNRecord("Test"));
+ newStat = _zkClient.getStat(path);
+ AssertJUnit.assertNotSame(stat, newStat);
+ }
+
+ @Test()
+ void testSessioExpire() throws Exception {
+ IZkStateListener listener = new IZkStateListener() {
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ System.out.println("In Old connection New state " + state);
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ System.out.println("In Old connection New session");
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable var1) throws Exception {
+ }
+ };
+
+ _zkClient.subscribeStateChanges(listener);
+ ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+ ZooKeeper zookeeper = connection.getZookeeper();
+ System.out.println("old sessionId= " + zookeeper.getSessionId());
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ System.out.println("In New connection In process event:" + event);
+ }
+ };
+ ZooKeeper newZookeeper =
+ new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
+ zookeeper.getSessionId(), zookeeper.getSessionPasswd());
+ Thread.sleep(3000);
+ System.out.println("New sessionId= " + newZookeeper.getSessionId());
+ Thread.sleep(3000);
+ newZookeeper.close();
+ Thread.sleep(10000);
+ connection = ((ZkConnection) _zkClient.getConnection());
+ zookeeper = connection.getZookeeper();
+ System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
+ }
+
+ @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data
size larger than 1M.*")
+ void testDataSizeLimit() {
+ ZNRecord data = new ZNRecord(new String(new char[1024*1024]));
+ _zkClient.writeData("/test", data, -1);
+ }
+}
|