Repository: helix
Updated Branches:
refs/heads/master bfaa83995 -> 71e4b6a66
[HELIX-780] add get/add user content for workflow rest api
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/71e4b6a6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/71e4b6a6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/71e4b6a6
Branch: refs/heads/master
Commit: 71e4b6a66af1ae56a3667d5f6f5ca7ac63080997
Parents: bfaa839
Author: Harry Zhang <hrzhang@linkedin.com>
Authored: Tue Oct 2 18:01:30 2018 -0700
Committer: Harry Zhang <hrzhang@linkedin.com>
Committed: Thu Nov 1 11:57:03 2018 -0700
----------------------------------------------------------------------
.../resources/helix/WorkflowAccessor.java | 66 +++++++++++++++++++-
.../helix/rest/server/AbstractTestClass.java | 23 +++++++
.../helix/rest/server/TestWorkflowAccessor.java | 38 +++++++++++
3 files changed, 124 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
index 9a9a62b..ac6a53c 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
@@ -21,6 +21,7 @@ package org.apache.helix.rest.server.resources.helix;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -34,24 +35,26 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
-
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.UserContentStore;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.type.TypeFactory;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
import org.codehaus.jackson.node.TextNode;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Path("/clusters/{clusterId}/workflows")
public class WorkflowAccessor extends AbstractHelixResource {
@@ -263,6 +266,63 @@ public class WorkflowAccessor extends AbstractHelixResource {
}
@GET
+ @Path("{workflowId}/userContent")
+ public Response getWorkflowUserContent(
+ @PathParam("clusterId") String clusterId,
+ @PathParam("workflowId") String workflowId
+ ) {
+ TaskDriver taskDriver = getTaskDriver(clusterId);
+ try {
+ Map<String, String> contentStore =
+ taskDriver.getWorkflowUserContentMap(workflowId);
+ if (contentStore == null) {
+ return JSONRepresentation(Collections.emptyMap());
+ }
+ return JSONRepresentation(contentStore);
+ } catch (ZkNoNodeException e) {
+ return notFound("Unable to find content store");
+ } catch (Exception e) {
+ return serverError(e);
+ }
+ }
+
+ @POST
+ @Path("{workflowId}/userContent")
+ public Response updateWorkflowUserContent(
+ @PathParam("clusterId") String clusterId,
+ @PathParam("workflowId") String workflowId,
+ @QueryParam("command") String commandStr,
+ String content
+ ) {
+ Command cmd;
+ Map<String, String> contentMap = Collections.emptyMap();
+ try {
+ contentMap = OBJECT_MAPPER.readValue(content, new TypeReference<Map<String, String>>()
{});
+ cmd = Command.valueOf(commandStr);
+ } catch (IOException e) {
+ return badRequest(String.format("Content %s cannot be deserialized to Map<String,
String>. Err: %s", content, e.getMessage()));
+ } catch (IllegalArgumentException ie) {
+ return badRequest(String.format("Invalid command: %s. Err: %s", commandStr, ie.getMessage()));
+ } catch (NullPointerException npe) {
+ cmd = Command.update;
+ }
+
+ TaskDriver driver = getTaskDriver(clusterId);
+ try {
+ switch (cmd) {
+ case update:
+ driver.addOrUpdateWorkflowUserContentMap(workflowId, contentMap);
+ return OK();
+ default:
+ return badRequest(String.format("Command \"%s\" is not supported!", cmd));
+ }
+ } catch (Exception e) {
+ _logger.error("Failed to update user content store", e);
+ return serverError(e);
+ }
+ }
+
+ @GET
@Path("{workflowId}/context")
public Response getWorkflowContext(@PathParam("clusterId") String clusterId,
@PathParam("workflowId") String workflowId) {
http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 2596f30..f59db1a 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkServer;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
+import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
@@ -39,12 +40,15 @@ import org.apache.helix.rest.server.auditlog.AuditLog;
import org.apache.helix.rest.server.auditlog.AuditLogger;
import org.apache.helix.rest.server.filters.AuditLogFilter;
import org.apache.helix.rest.server.resources.AbstractResource;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
@@ -61,6 +65,7 @@ import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
+import com.google.common.base.Joiner;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
@@ -337,6 +342,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
protected Map<String, Workflow> createWorkflows(String cluster, int numWorkflows)
{
Map<String, Workflow> workflows = new HashMap<>();
+ HelixPropertyStore<ZNRecord> propertyStore = new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>)
_baseAccessor,
+ PropertyPathBuilder.propertyStore(cluster), null);
+
for (int i = 0; i < numWorkflows; i++) {
Workflow.Builder workflow = new Workflow.Builder(WORKFLOW_PREFIX + i);
int j = 0;
@@ -352,11 +360,20 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, TaskConstants.CONTEXT_NODE),
workflowContext.getRecord(), AccessOption.PERSISTENT);
_configAccessor.setResourceConfig(cluster, WORKFLOW_PREFIX + i, workflow.getWorkflowConfig());
+
+ // Add workflow user content
+ propertyStore.create(Joiner.on("/")
+ .join(TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i,
+ TaskUtil.USER_CONTENT_NODE), new ZNRecord(TaskUtil.USER_CONTENT_NODE),
+ AccessOption.PERSISTENT);
}
return workflows;
}
protected Set<JobConfig.Builder> createJobs(String cluster, String workflowName,
int numJobs) {
+ HelixPropertyStore<ZNRecord> propertyStore =
+ new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor,
+ PropertyPathBuilder.propertyStore(cluster), null);
Set<JobConfig.Builder> jobCfgs = new HashSet<>();
for (int i = 0; i < numJobs; i++) {
JobConfig.Builder job =
@@ -370,6 +387,12 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i,
TaskConstants.CONTEXT_NODE), jobContext.getRecord(), AccessOption.PERSISTENT);
_configAccessor.setResourceConfig(cluster, workflowName + "_" + JOB_PREFIX + i, job.build());
+
+ // add job content stores
+ propertyStore.create(Joiner.on("/")
+ .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX
+ i,
+ TaskUtil.USER_CONTENT_NODE), new ZNRecord(TaskUtil.USER_CONTENT_NODE),
+ AccessOption.PERSISTENT);
}
return jobCfgs;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/71e4b6a6/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
index 3e3b8ae..d622066 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
@@ -2,6 +2,8 @@ package org.apache.helix.rest.server;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
@@ -15,6 +17,7 @@ import org.apache.helix.task.TaskExecutionInfo;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.type.TypeReference;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -137,6 +140,41 @@ public class TestWorkflowAccessor extends AbstractTestClass {
TargetState.START);
}
+ @Test(dependsOnMethods = "testCreateWorkflow")
+ public void testGetAndUpdateWorkflowContentStore() throws IOException, InterruptedException
{
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String workflowName = "Workflow_0";
+ TaskDriver driver = getTaskDriver(CLUSTER_NAME);
+ // Wait for workflow to start processing
+ driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS, TaskState.COMPLETED,
TaskState.FAILED);
+ String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/userContent";
+
+ String body =
+ get(uri, Response.Status.OK.getStatusCode(), true);
+ Map<String, String> contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String,
String>>() {});
+ Assert.assertTrue(contentStore.isEmpty());
+
+ Map<String, String> map1 = new HashMap<>();
+ map1.put("k1", "v1");
+ Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), MediaType.APPLICATION_JSON_TYPE);
+ post(uri, ImmutableMap.of("command", "delete"), entity, Response.Status.BAD_REQUEST.getStatusCode());
+ post(uri, ImmutableMap.of("command", "update"), entity, Response.Status.OK.getStatusCode());
+
+ // update (add items) workflow content store
+ body = get(uri, Response.Status.OK.getStatusCode(), true);
+ contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>()
{});
+ Assert.assertEquals(contentStore, map1);
+
+ // modify map1 and verify
+ map1.put("k1", "v2");
+ map1.put("k2", "v2");
+ entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), MediaType.APPLICATION_JSON_TYPE);
+ post(uri, ImmutableMap.of("command", "update"), entity, Response.Status.OK.getStatusCode());
+ body = get(uri, Response.Status.OK.getStatusCode(), true);
+ contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>()
{});
+ Assert.assertEquals(contentStore, map1);
+ }
+
@Test(dependsOnMethods = "testUpdateWorkflow")
public void testDeleteWorkflow() throws InterruptedException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
|