helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch master updated: Implement the propertyStore read endpoint (#516)
Date Mon, 02 Dec 2019 18:52:52 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 80a8cdb  Implement the propertyStore read endpoint (#516)
80a8cdb is described below

commit 80a8cdb56a3294beddf22d2b3933838c6027f37b
Author: Yi Wang <i3.wangyi@gmail.com>
AuthorDate: Mon Dec 2 10:52:41 2019 -0800

    Implement the propertyStore read endpoint (#516)
    
    Implement the propertyStore read endpoint
    Example: http://xxxx/clusters/TestCluster/propertyStore/*<PATH>*
    - The read method accepts a simple path parameter called "path" of the format "abc/abc/abc"
    - The path is validated using regex
    - The read operation parses the byte array content from a static bytearray zk base data
accessor, if it's ZNRecord format, return ZnRecord; Otherwise, return {"content": <binary
payload>}
---
 .../apache/helix/rest/server/ServerContext.java    |  37 ++++++-
 .../resources/helix/AbstractHelixResource.java     |  13 ++-
 .../resources/helix/PropertyStoreAccessor.java     |  94 +++++++++++++++++
 .../rest/server/TestPropertyStoreAccessor.java     | 117 +++++++++++++++++++++
 .../rest/server/util/JerseyUriRequestBuilder.java  |   4 +
 5 files changed, 257 insertions(+), 8 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index a9ab882..cfb4737 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -23,6 +23,8 @@ package org.apache.helix.rest.server;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -38,16 +40,17 @@ import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.tools.ClusterSetup;
 
+
 public class ServerContext {
   private final String _zkAddr;
   private HelixZkClient _zkClient;
   private ZKHelixAdmin _zkHelixAdmin;
   private ClusterSetup _clusterSetup;
   private ConfigAccessor _configAccessor;
-
+  // The lazy initialized base data accessor that reads/writes byte array to ZK
+  private ZkBaseDataAccessor<byte[]> _byteArrayBaseDataAccessor;
   // 1 Cluster name will correspond to 1 helix data accessor
   private final Map<String, HelixDataAccessor> _helixDataAccessorPool;
-
   // 1 Cluster name will correspond to 1 task driver
   private final Map<String, TaskDriver> _taskDriverPool;
 
@@ -66,8 +69,8 @@ public class ServerContext {
     if (_zkClient == null) {
       HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
       clientConfig.setZkSerializer(new ZNRecordSerializer());
-      _zkClient = SharedZkClientFactory
-          .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
+      _zkClient = SharedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
     }
     return _zkClient;
   }
@@ -110,7 +113,8 @@ public class ServerContext {
   public HelixDataAccessor getDataAccssor(String clusterName) {
     synchronized (_helixDataAccessorPool) {
       if (!_helixDataAccessorPool.containsKey(clusterName)) {
-        ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(getHelixZkClient());
+        ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
+            new ZkBaseDataAccessor<>(getHelixZkClient());
         _helixDataAccessorPool.put(clusterName,
             new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR, baseDataAccessor));
       }
@@ -118,6 +122,29 @@ public class ServerContext {
     }
   }
 
+  public ZkBaseDataAccessor<byte[]> getByteArrayBaseDataAccessor() {
+    if (_byteArrayBaseDataAccessor == null) {
+      synchronized (this) {
+        if (_byteArrayBaseDataAccessor == null) {
+          _byteArrayBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new ZkSerializer()
{
+            @Override
+            public byte[] serialize(Object o)
+                throws ZkMarshallingError {
+              throw new UnsupportedOperationException("Serialize is not supported yet!");
+            }
+
+            @Override
+            public Object deserialize(byte[] bytes)
+                throws ZkMarshallingError {
+              return bytes;
+            }
+          });
+        }
+      }
+    }
+    return _byteArrayBaseDataAccessor;
+  }
+
   public void close() {
     if (_zkClient != null) {
       _zkClient.close();
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
index e5694d0..7b00a1d 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
@@ -25,6 +25,7 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.rest.common.ContextPropertyKeys;
@@ -39,7 +40,7 @@ import org.apache.helix.tools.ClusterSetup;
  * such as cluster, instance, job, resource, workflow, etc in
  * metadata store.
  */
-public class AbstractHelixResource extends AbstractResource{
+public class AbstractHelixResource extends AbstractResource {
 
   public HelixZkClient getHelixZkClient() {
     ServerContext serverContext = getServerContext();
@@ -76,11 +77,17 @@ public class AbstractHelixResource extends AbstractResource{
     return serverContext.getDataAccssor(clusterName);
   }
 
-  protected static ZNRecord toZNRecord(String data) throws IOException {
+  protected ZkBaseDataAccessor<byte[]> getByteArrayDataAccessor() {
+    return getServerContext().getByteArrayBaseDataAccessor();
+  }
+
+  protected static ZNRecord toZNRecord(String data)
+      throws IOException {
     return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data);
   }
 
   private ServerContext getServerContext() {
-    return (ServerContext) _application.getProperties().get(ContextPropertyKeys.SERVER_CONTEXT.name());
+    return (ServerContext) _application.getProperties()
+        .get(ContextPropertyKeys.SERVER_CONTEXT.name());
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
new file mode 100644
index 0000000..1928388
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
@@ -0,0 +1,94 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.codehaus.jackson.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Path("/clusters/{clusterId}/propertyStore")
+public class PropertyStoreAccessor extends AbstractHelixResource {
+  private static Logger LOG = LoggerFactory.getLogger(PropertyStoreAccessor.class);
+  private static final String CONTENT_KEY = "content";
+  private static final ZNRecordSerializer ZN_RECORD_SERIALIZER = new ZNRecordSerializer();
+
+  /**
+   * Sample HTTP URLs:
+   *  http://<HOST>/clusters/{clusterId}/propertyStore/<PATH>
+   * It refers to the /PROPERTYSTORE/<PATH> in Helix metadata store
+   * @param clusterId The cluster Id
+   * @param path path parameter is like "abc/abc/abc" in the URL
+   * @return If the payload is ZNRecord format, return ZnRecord json response;
+   *         Otherwise, return json object {<PATH>: raw string}
+   */
+  @GET
+  @Path("{path: .+}")
+  public Response getPropertyByPath(@PathParam("clusterId") String clusterId,
+      @PathParam("path") String path) {
+    path = "/" + path;
+    if (!isPathValid(path)) {
+      LOG.info("The propertyStore path {} is invalid for cluster {}", path, clusterId);
+      return badRequest(
+          "Invalid path string. Valid path strings use slash as the directory separator and
names the location of ZNode");
+    }
+    final String recordPath = PropertyPathBuilder.propertyStore(clusterId) + path;
+    ZkBaseDataAccessor<byte[]> propertyStoreDataAccessor = getByteArrayDataAccessor();
+    if (propertyStoreDataAccessor.exists(recordPath, AccessOption.PERSISTENT)) {
+      byte[] bytes = propertyStoreDataAccessor.get(recordPath, null, AccessOption.PERSISTENT);
+      ZNRecord znRecord = (ZNRecord) ZN_RECORD_SERIALIZER.deserialize(bytes);
+      // The ZNRecordSerializer returns null when exception occurs in deserialization method
+      if (znRecord == null) {
+        ObjectNode jsonNode = OBJECT_MAPPER.createObjectNode();
+        jsonNode.put(CONTENT_KEY, new String(bytes));
+        return JSONRepresentation(jsonNode);
+      }
+      return JSONRepresentation(znRecord);
+    } else {
+      throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+          .entity(String.format("The property store path %s doesn't exist", recordPath)).build());
+    }
+  }
+
+  /**
+   * Valid matches:
+   * /
+   * /abc
+   * /abc/abc/abc/abc
+   * Invalid matches:
+   * null or empty string
+   * /abc/
+   * /abc/abc/abc/abc/
+   **/
+  private static boolean isPathValid(String path) {
+    return path.matches("^/|(/[\\w-]+)+$");
+  }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
new file mode 100644
index 0000000..0bff205
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
@@ -0,0 +1,117 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import javax.ws.rs.core.Response;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
+import org.apache.http.HttpStatus;
+import org.codehaus.jackson.JsonNode;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestPropertyStoreAccessor extends AbstractTestClass {
+  private static final String TEST_CLUSTER = "TestCluster_0";
+  private static final String ZNRECORD_PATH =
+      PropertyPathBuilder.propertyStore(TEST_CLUSTER) + "/ZnRecord";
+  private static final ZNRecord TEST_ZNRECORD = new ZNRecord("TestContent");
+  private static final String CUSTOM_PATH =
+      PropertyPathBuilder.propertyStore(TEST_CLUSTER) + "/NonZnRecord";
+  private static final String TEST_CONTENT = "TestContent";
+  private static final String CONTENT_KEY = "content";
+
+  private ZkBaseDataAccessor<String> _customDataAccessor;
+
+  @BeforeClass
+  public void init() {
+    _customDataAccessor = new ZkBaseDataAccessor<>(ZK_ADDR, new ZkSerializer() {
+      @Override
+      public byte[] serialize(Object o)
+          throws ZkMarshallingError {
+        return o.toString().getBytes();
+      }
+
+      @Override
+      public Object deserialize(byte[] bytes)
+          throws ZkMarshallingError {
+        return new String(bytes);
+      }
+    });
+    // initially prepare the datas in different paths
+    Assert
+        .assertTrue(_customDataAccessor.create(CUSTOM_PATH, TEST_CONTENT, AccessOption.PERSISTENT));
+    Assert.assertTrue(_baseAccessor.create(ZNRECORD_PATH, TEST_ZNRECORD, AccessOption.PERSISTENT));
+  }
+
+  @AfterClass
+  public void close() {
+    if (_customDataAccessor != null) {
+      _customDataAccessor.close();
+    }
+  }
+
+  @Test
+  public void testGetPropertyStoreWithZNRecordData()
+      throws IOException {
+    String data =
+        new JerseyUriRequestBuilder("clusters/{}/propertyStore/ZnRecord").format(TEST_CLUSTER)
+            .isBodyReturnExpected(true).get(this);
+    ZNRecord record = OBJECT_MAPPER.reader(ZNRecord.class).readValue(data);
+    Assert.assertEquals(record.getId(), TEST_ZNRECORD.getId());
+  }
+
+  @Test
+  public void testGetPropertyStoreWithTestStringData() throws IOException {
+    String actual =
+        new JerseyUriRequestBuilder("clusters/{}/propertyStore/NonZnRecord").format(TEST_CLUSTER)
+            .isBodyReturnExpected(true).get(this);
+    JsonNode jsonNode = OBJECT_MAPPER.readTree(actual);
+    String payLoad = jsonNode.get(CONTENT_KEY).getValueAsText();
+
+    Assert.assertEquals(TEST_CONTENT, payLoad);
+  }
+
+  @Test
+  public void testGetPropertyStoreWithEmptyDataPath() {
+    Response response =
+        new JerseyUriRequestBuilder("clusters/{}/propertyStore/EmptyPath").format(TEST_CLUSTER)
+            .isBodyReturnExpected(true).getResponse(this);
+    Assert.assertEquals(response.getStatus(), HttpStatus.SC_NOT_FOUND);
+  }
+
+  @Test
+  public void testGetPropertyStoreWithInValidPath() {
+    String path = "/context/";
+    Response response =
+        new JerseyUriRequestBuilder("clusters/{}/propertyStore" + path).format(TEST_CLUSTER)
+            .getResponse(this);
+    Assert.assertEquals(response.getStatus(), HttpStatus.SC_BAD_REQUEST);
+  }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
index c97710b..359999e 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
@@ -90,6 +90,10 @@ public class JerseyUriRequestBuilder {
     return body;
   }
 
+  public Response getResponse(JerseyTestNg.ContainerPerClassTest container) {
+    return buildWebTarget(container).request().get();
+  }
+
   /**
    * Execute put request
    * @param container


Mime
View raw message