helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject helix git commit: [HELIX-780] add get/add user content for workflow rest api
Date Thu, 01 Nov 2018 19:06:08 GMT
Repository: helix
Updated Branches:
  refs/heads/master bfaa83995 -> 71e4b6a66


[HELIX-780] add get/add user content for workflow rest api


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/71e4b6a6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/71e4b6a6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/71e4b6a6

Branch: refs/heads/master
Commit: 71e4b6a66af1ae56a3667d5f6f5ca7ac63080997
Parents: bfaa839
Author: Harry Zhang <hrzhang@linkedin.com>
Authored: Tue Oct 2 18:01:30 2018 -0700
Committer: Harry Zhang <hrzhang@linkedin.com>
Committed: Thu Nov 1 11:57:03 2018 -0700

----------------------------------------------------------------------
 .../resources/helix/WorkflowAccessor.java       | 66 +++++++++++++++++++-
 .../helix/rest/server/AbstractTestClass.java    | 23 +++++++
 .../helix/rest/server/TestWorkflowAccessor.java | 38 +++++++++++
 3 files changed, 124 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
index 9a9a62b..ac6a53c 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
@@ -21,6 +21,7 @@ package org.apache.helix.rest.server.resources.helix;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -34,24 +35,26 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Response;
-
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.UserContentStore;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.type.TypeFactory;
 import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.JsonNodeFactory;
 import org.codehaus.jackson.node.ObjectNode;
 import org.codehaus.jackson.node.TextNode;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Path("/clusters/{clusterId}/workflows")
 public class WorkflowAccessor extends AbstractHelixResource {
@@ -263,6 +266,63 @@ public class WorkflowAccessor extends AbstractHelixResource {
   }
 
   @GET
+  @Path("{workflowId}/userContent")
+  public Response getWorkflowUserContent(
+      @PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId
+  ) {
+    TaskDriver taskDriver = getTaskDriver(clusterId);
+    try {
+      Map<String, String> contentStore =
+          taskDriver.getWorkflowUserContentMap(workflowId);
+      if (contentStore == null) {
+        return JSONRepresentation(Collections.emptyMap());
+      }
+      return JSONRepresentation(contentStore);
+    } catch (ZkNoNodeException e) {
+      return notFound("Unable to find content store");
+    } catch (Exception e) {
+      return serverError(e);
+    }
+  }
+
+  @POST
+  @Path("{workflowId}/userContent")
+  public Response updateWorkflowUserContent(
+      @PathParam("clusterId") String clusterId,
+      @PathParam("workflowId") String workflowId,
+      @QueryParam("command") String commandStr,
+      String content
+  ) {
+    Command cmd;
+    Map<String, String> contentMap = Collections.emptyMap();
+    try {
+      contentMap = OBJECT_MAPPER.readValue(content, new TypeReference<Map<String, String>>()
{});
+      cmd = Command.valueOf(commandStr);
+    } catch (IOException e) {
+      return badRequest(String.format("Content %s cannot be deserialized to Map<String,
String>. Err: %s", content, e.getMessage()));
+    } catch (IllegalArgumentException ie) {
+      return badRequest(String.format("Invalid command: %s. Err: %s", commandStr, ie.getMessage()));
+    } catch (NullPointerException npe) {
+      cmd = Command.update;
+    }
+
+    TaskDriver driver = getTaskDriver(clusterId);
+    try {
+      switch (cmd) {
+      case update:
+        driver.addOrUpdateWorkflowUserContentMap(workflowId, contentMap);
+        return OK();
+      default:
+        return badRequest(String.format("Command \"%s\" is not supported!", cmd));
+      }
+    } catch (Exception e) {
+      _logger.error("Failed to update user content store", e);
+      return serverError(e);
+    }
+  }
+
+  @GET
   @Path("{workflowId}/context")
   public Response getWorkflowContext(@PathParam("clusterId") String clusterId,
       @PathParam("workflowId") String workflowId) {

http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 2596f30..f59db1a 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
@@ -39,12 +40,15 @@ import org.apache.helix.rest.server.auditlog.AuditLog;
 import org.apache.helix.rest.server.auditlog.AuditLogger;
 import org.apache.helix.rest.server.filters.AuditLogFilter;
 import org.apache.helix.rest.server.resources.AbstractResource;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
@@ -61,6 +65,7 @@ import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+import com.google.common.base.Joiner;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Application;
@@ -337,6 +342,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
 
   protected Map<String, Workflow> createWorkflows(String cluster, int numWorkflows)
{
     Map<String, Workflow> workflows = new HashMap<>();
+    HelixPropertyStore<ZNRecord> propertyStore = new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>)
_baseAccessor,
+        PropertyPathBuilder.propertyStore(cluster), null);
+
     for (int i = 0; i < numWorkflows; i++) {
       Workflow.Builder workflow = new Workflow.Builder(WORKFLOW_PREFIX + i);
       int j = 0;
@@ -352,11 +360,20 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
           TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, TaskConstants.CONTEXT_NODE),
           workflowContext.getRecord(), AccessOption.PERSISTENT);
       _configAccessor.setResourceConfig(cluster, WORKFLOW_PREFIX + i, workflow.getWorkflowConfig());
+
+      // Add workflow user content
+      propertyStore.create(Joiner.on("/")
+              .join(TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i,
+                  TaskUtil.USER_CONTENT_NODE), new ZNRecord(TaskUtil.USER_CONTENT_NODE),
+          AccessOption.PERSISTENT);
     }
     return workflows;
   }
 
   protected Set<JobConfig.Builder> createJobs(String cluster, String workflowName,
int numJobs) {
+    HelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor,
+            PropertyPathBuilder.propertyStore(cluster), null);
     Set<JobConfig.Builder> jobCfgs = new HashSet<>();
     for (int i = 0; i < numJobs; i++) {
       JobConfig.Builder job =
@@ -370,6 +387,12 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
           TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i,
           TaskConstants.CONTEXT_NODE), jobContext.getRecord(), AccessOption.PERSISTENT);
       _configAccessor.setResourceConfig(cluster, workflowName + "_" + JOB_PREFIX + i, job.build());
+
+      // add job content stores
+      propertyStore.create(Joiner.on("/")
+              .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX
+ i,
+                  TaskUtil.USER_CONTENT_NODE), new ZNRecord(TaskUtil.USER_CONTENT_NODE),
+          AccessOption.PERSISTENT);
     }
     return jobCfgs;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
index 3e3b8ae..d622066 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
@@ -2,6 +2,8 @@ package org.apache.helix.rest.server;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
@@ -15,6 +17,7 @@ import org.apache.helix.task.TaskExecutionInfo;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.type.TypeReference;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -137,6 +140,41 @@ public class TestWorkflowAccessor extends AbstractTestClass {
         TargetState.START);
   }
 
+  @Test(dependsOnMethods = "testCreateWorkflow")
+  public void testGetAndUpdateWorkflowContentStore() throws IOException, InterruptedException
{
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String workflowName = "Workflow_0";
+    TaskDriver driver = getTaskDriver(CLUSTER_NAME);
+    // Wait for workflow to start processing
+    driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS, TaskState.COMPLETED,
TaskState.FAILED);
+    String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/userContent";
+
+    String body =
+        get(uri, Response.Status.OK.getStatusCode(), true);
+    Map<String, String> contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String,
String>>() {});
+    Assert.assertTrue(contentStore.isEmpty());
+
+    Map<String, String> map1 = new HashMap<>();
+    map1.put("k1", "v1");
+    Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), MediaType.APPLICATION_JSON_TYPE);
+    post(uri, ImmutableMap.of("command", "delete"), entity, Response.Status.BAD_REQUEST.getStatusCode());
+    post(uri, ImmutableMap.of("command", "update"), entity, Response.Status.OK.getStatusCode());
+
+    // update (add items) workflow content store
+    body = get(uri, Response.Status.OK.getStatusCode(), true);
+    contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>()
{});
+    Assert.assertEquals(contentStore, map1);
+
+    // modify map1 and verify
+    map1.put("k1", "v2");
+    map1.put("k2", "v2");
+    entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), MediaType.APPLICATION_JSON_TYPE);
+    post(uri, ImmutableMap.of("command", "update"), entity, Response.Status.OK.getStatusCode());
+    body = get(uri, Response.Status.OK.getStatusCode(), true);
+    contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>()
{});
+    Assert.assertEquals(contentStore, map1);
+  }
+
   @Test(dependsOnMethods = "testUpdateWorkflow")
   public void testDeleteWorkflow() throws InterruptedException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());


Mime
View raw message