helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: Support user defined content store per workflow/job/task layer
Date Fri, 02 Sep 2016 00:09:41 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 947a7d557 -> adfe4dda8


Support user defined content store per workflow/job/task layer

1. Add feature to support workflow/job/task layer key value user defined content store
2. Add test case for workflow/job/task layer key-value pair store and verify.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/adfe4dda
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/adfe4dda
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/adfe4dda

Branch: refs/heads/helix-0.6.x
Commit: adfe4dda8aaef6b1ea088acaf24bd120f6fe250d
Parents: 947a7d5
Author: Junkai Xue <jxue@linkedin.com>
Authored: Thu Aug 18 13:33:31 2016 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Wed Aug 31 10:54:32 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |   3 +
 .../org/apache/helix/task/TaskStateModel.java   |   4 +
 .../java/org/apache/helix/task/TaskUtil.java    |  97 ++++++++
 .../org/apache/helix/task/UserContentStore.java | 110 ++++++++++
 .../apache/helix/task/WorkflowRebalancer.java   |   2 +
 .../integration/task/TestUserContentStore.java  | 219 +++++++++++++++++++
 6 files changed, 435 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index e62d15c..bc582e1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -596,6 +596,9 @@ public class TaskDriver {
     _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME);
 
     IdealState is = buildWorkflowIdealState(workflow);
+    TaskUtil
+        .createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
+
     _admin.setResourceIdealState(_clusterName, workflow, is);
 
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index fd07176..a7c58d2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -304,6 +304,10 @@ public class TaskStateModel extends StateModel {
     TaskFactory taskFactory = _taskFactoryRegistry.get(command);
     Task task = taskFactory.createNewTask(callbackContext);
 
+    if (task instanceof UserContentStore) {
+      ((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(),
taskPartition);
+    }
+
     // Submit the task for execution
     _taskRunner =
         new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager,

http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 44de175..1d89656 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -48,6 +49,7 @@ import com.google.common.collect.Maps;
 public class TaskUtil {
   private static final Logger LOG = Logger.getLogger(TaskUtil.class);
   public static final String CONTEXT_NODE = "Context";
+  public static final String USER_CONTENT_NODE = "UserContent";
 
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.
@@ -220,6 +222,101 @@ public class TaskUtil {
   }
 
   /**
+   * Intialize the user content store znode setup
+   * @param propertyStore       zookeeper property store
+   * @param workflowJobResource the name of workflow or job
+   * @param record              the initial data
+   */
+  protected static void createUserContent(HelixPropertyStore propertyStore, String workflowJobResource,
+      ZNRecord record) {
+    propertyStore.create(Joiner.on("/")
+        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource,
+            TaskUtil.USER_CONTENT_NODE), record, AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Get user defined workflow or job level key-value pair data
+   *
+   * @param manager             a connection to Helix
+   * @param workflowJobResource the name of workflow
+   * @param key                 the key of key-value pair
+   *
+   * @return null if there is no such pair, otherwise return a String
+   */
+  protected static String getWorkflowJobUserContent(HelixManager manager,
+      String workflowJobResource, String key) {
+    ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/")
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE),
null,
+        AccessOption.PERSISTENT);
+    return r != null ? r.getSimpleField(key) : null;
+  }
+
+  /**
+   * Add an user defined key-value pair data to workflow or job level
+   *
+   * @param manager             a connection to Helix
+   * @param workflowJobResource the name of workflow or job
+   * @param key                 the key of key-value pair
+   * @param value               the value of key-value pair
+   */
+  protected static void addWorkflowJobUserContent(final HelixManager manager,
+      String workflowJobResource, final String key, final String value) {
+    String path = Joiner.on("/")
+        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE);
+
+    manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
+      @Override public ZNRecord update(ZNRecord znRecord) {
+        znRecord.setSimpleField(key, value);
+        return znRecord;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Get user defined task level key-value pair data
+   *
+   * @param manager      a connection to Helix
+   * @param jobResource  the name of job
+   * @param taskResource the name of the task
+   * @param key          the key of key-value pair
+   *
+   * @return null if there is no such pair, otherwise return a String
+   */
+  protected static String getTaskUserContent(HelixManager manager, String jobResource,
+      String taskResource, String key) {
+    ZNRecord r = manager.getHelixPropertyStore().get(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE),
+        null, AccessOption.PERSISTENT);
+    return r != null ? (r.getMapField(taskResource) != null
+        ? r.getMapField(taskResource).get(key)
+        : null) : null;
+  }
+
+  /**
+   * Add an user defined key-value pair data to task level
+   *
+   * @param manager       a connection to Helix
+   * @param jobResource   the name of job
+   * @param taskResource  the name of task
+   * @param key           the key of key-value pair
+   * @param value         the value of key-value pair
+   */
+  protected static void addTaskUserContent(final HelixManager manager, String jobResource,
+      final String taskResource, final String key, final String value) {
+    String path =
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE);
+
+    manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
+      @Override public ZNRecord update(ZNRecord znRecord) {
+        if (znRecord.getMapField(taskResource) == null) {
+          znRecord.setMapField(taskResource, new HashMap<String, String>());
+        }
+        znRecord.getMapField(taskResource).put(key, value);
+        return znRecord;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+  /**
    * Get a workflow-qualified job name for a single-job workflow
    *
    * @param singleJobWorkflow the name of the single-job workflow

http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
new file mode 100644
index 0000000..b188be7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
@@ -0,0 +1,110 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+
+/**
+ * UserContentStore provides default implementation of user defined key-value pair store
per task,
+ * job and workflow level.
+ *
+ * TODO: This class should be merged to Task interface when Helix bump up to Java 8
+ */
+public abstract class UserContentStore {
+
+  protected enum Scope {
+    /**
+     * Define the content store in workflow level
+     */
+    WORKFLOW,
+
+    /**
+     * Define the content store in job level
+     */
+    JOB,
+
+    /**
+     * Define the content store in task level
+     */
+    TASK
+  }
+
+  private HelixManager _manager;
+  private String _workflowName;
+  private String _jobName;
+  private String _taskName;
+
+  /**
+   * Default initialization of user content store
+   * @param manager The Helix manager
+   * @param workflowName The name of workflow that the task belongs to
+   * @param jobName The name of job that the task belongs to
+   * @param taskName The name of current task
+   */
+  public void init(HelixManager manager, String workflowName, String jobName, String taskName)
{
+    _manager = manager;
+    _workflowName = workflowName;
+    _jobName = jobName;
+    _taskName = taskName;
+  }
+
+  /**
+   * Default implementation for user defined put key-value pair
+   * @param key The key of key-value pair
+   * @param value The value of key-value pair
+   * @param scope The scope defines which layer to store
+   */
+  public void putUserContent(String key, String value, Scope scope) {
+    switch (scope) {
+    case WORKFLOW:
+      TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value);
+      break;
+    case JOB:
+      TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value);
+      break;
+    case TASK:
+      TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value);
+      break;
+    default:
+      throw new HelixException("Invalid scope : " + scope.name());
+    }
+  }
+
+  /**
+   * Default implementation for user defined get key-value pair
+   * @param key The key of key-value pair
+   * @param scope The scope defines which layer that key-value pair stored
+   * @return Null if key-value pair not found or this content store does not exists. Otherwise,
+   *         return a String
+   */
+  public String getUserContent(String key, Scope scope) {
+    switch (scope) {
+    case WORKFLOW:
+      return TaskUtil.getWorkflowJobUserContent(_manager, _workflowName, key);
+    case JOB:
+      return TaskUtil.getWorkflowJobUserContent(_manager, _jobName, key);
+    case TASK:
+      return TaskUtil.getTaskUserContent(_manager, _jobName, _taskName, key);
+    default:
+      throw new HelixException("Invalid scope : " + scope.name());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index b4f25d5..b78ee7f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -164,6 +164,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
     HelixAdmin admin = _manager.getClusterManagmentTool();
 
     IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource);
+    TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
+        new ZNRecord(TaskUtil.USER_CONTENT_NODE));
     if (jobIS != null) {
       LOG.info("Job " + jobResource + " idealstate already exists!");
       return;

http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
new file mode 100644
index 0000000..b2b27ef
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
@@ -0,0 +1,219 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.UserContentStore;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestUserContentStore extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    // Setup cluster and instances
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < _numNodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < _numNodes; i++) {
+      final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+
+      // Set task callbacks
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+
+      taskFactoryReg.put("ContentStoreTask", new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new ContentStoreTask();
+        }
+      });
+
+      taskFactoryReg.put("TaskOne", new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new TaskOne();
+        }
+      });
+
+      taskFactoryReg.put("TaskTwo", new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new TaskTwo();
+        }
+      });
+
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Start an admin connection
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+  }
+
+  @Test
+  public void testWorkflowAndJobTaskUserContentStore() throws InterruptedException {
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+    Map<String, String> taskConfigMap = Maps.newHashMap();
+    TaskConfig taskConfig1 = new TaskConfig("ContentStoreTask", taskConfigMap, false);
+    taskConfigs.add(taskConfig1);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+
+    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
+        .addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(jobCommandMap);
+    workflowBuilder.addJob(jobName, jobBuilder);
+
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
+    Assert
+        .assertEquals(_driver.getWorkflowContext(jobName).getWorkflowState(), TaskState.COMPLETED);
+  }
+
+  @Test
+  public void testJobContentPutAndGetWithDependency() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 100);
+
+    List<TaskConfig> taskConfigs1 = Lists.newArrayListWithCapacity(1);
+    List<TaskConfig> taskConfigs2 = Lists.newArrayListWithCapacity(1);
+    Map<String, String> taskConfigMap1 = Maps.newHashMap();
+    Map<String, String> taskConfigMap2 = Maps.newHashMap();
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap1, false);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", taskConfigMap2, false);
+
+    taskConfigs1.add(taskConfig1);
+    taskConfigs2.add(taskConfig2);
+    Map<String, String> jobCommandMap = Maps.newHashMap();
+    jobCommandMap.put("Timeout", "1000");
+
+    JobConfig.Builder jobBuilder1 =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs1)
+            .setJobCommandConfigMap(jobCommandMap);
+    JobConfig.Builder jobBuilder2 =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs2)
+            .setJobCommandConfigMap(jobCommandMap);
+
+    queueBuilder.enqueueJob(queueName + 0, jobBuilder1);
+    queueBuilder.enqueueJob(queueName + 1, jobBuilder2);
+
+    _driver.start(queueBuilder.build());
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, queueName
+ 1),
+        TaskState.COMPLETED);
+    Assert.assertEquals(_driver.getWorkflowContext(queueName)
+        .getJobState(TaskUtil.getNamespacedJobName(queueName, queueName + 1)), TaskState.COMPLETED);
+  }
+
+  private static class ContentStoreTask extends UserContentStore implements Task {
+
+    @Override public TaskResult run() {
+      putUserContent("ContentTest", "Value1", Scope.JOB);
+      putUserContent("ContentTest", "Value2", Scope.WORKFLOW);
+      putUserContent("ContentTest", "Value3", Scope.TASK);
+      if (!getUserContent("ContentTest", Scope.JOB).equals("Value1") || !getUserContent(
+          "ContentTest", Scope.WORKFLOW).equals("Value2") || !getUserContent("ContentTest",
+          Scope.TASK).equals("Value3")) {
+        return new TaskResult(TaskResult.Status.FAILED, null);
+      }
+      return new TaskResult(TaskResult.Status.COMPLETED, null);
+    }
+
+    @Override public void cancel() {
+    }
+  }
+
+
+  private static class TaskOne extends UserContentStore implements Task {
+
+    @Override public TaskResult run() {
+      putUserContent("RaceTest", "RaceValue", Scope.WORKFLOW);
+      return new TaskResult(TaskResult.Status.COMPLETED, null);
+    }
+
+    @Override public void cancel() {
+    }
+  }
+
+  private static class TaskTwo extends UserContentStore implements Task {
+
+    @Override public TaskResult run() {
+      if (!getUserContent("RaceTest", Scope.WORKFLOW).equals("RaceValue")) {
+        return new TaskResult(TaskResult.Status.FAILED, null);
+      }
+      return new TaskResult(TaskResult.Status.COMPLETED, null);
+
+    }
+
+    @Override public void cancel() {
+    }
+  }
+}


Mime
View raw message