helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [04/33] helix git commit: Support changes of workflow configuration.
Date Wed, 17 Aug 2016 04:27:00 GMT
Support changes of workflow configuration.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7a470703
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7a470703
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7a470703

Branch: refs/heads/helix-0.6.x
Commit: 7a47070362c536126e945138f10fb077215f706e
Parents: 3781929
Author: Lei Xia <lxia@linkedin.com>
Authored: Tue Jan 26 09:57:43 2016 -0800
Committer: Lei Xia <lxia@linkedin.com>
Committed: Tue Jul 5 14:34:59 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixException.java   |   4 +
 .../main/java/org/apache/helix/task/JobDag.java |   4 +-
 .../java/org/apache/helix/task/TaskDriver.java  | 133 ++++++++---
 .../org/apache/helix/task/WorkflowConfig.java   |  24 +-
 .../helix/integration/task/TaskTestUtil.java    |  36 +--
 .../task/TestRunJobsWithMissingTarget.java      |   9 +-
 .../integration/task/TestUpdateWorkflow.java    | 220 +++++++++++++++++++
 7 files changed, 382 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/main/java/org/apache/helix/HelixException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixException.java b/helix-core/src/main/java/org/apache/helix/HelixException.java
index 8693026..26585ed 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixException.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixException.java
@@ -33,4 +33,8 @@ public class HelixException extends RuntimeException {
   public HelixException(Throwable cause) {
     super(cause);
   }
+
+  public HelixException(String message, Throwable cause) {
+    super(message, cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index f708e91..73a5e58 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -19,12 +19,14 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -141,7 +143,7 @@ public class JobDag {
     return ret;
   }
 
-  public String toJson() throws Exception {
+  public String toJson() throws IOException {
     return new ObjectMapper().writeValueAsString(this);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index c4986ee..193526f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -42,6 +42,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
@@ -183,8 +184,11 @@ public class TaskDriver {
     helixMgr.disconnect();
   }
 
-  /** Schedules a new workflow */
-  public void start(Workflow flow) throws Exception {
+  /** Schedules a new workflow
+   *
+   * @param flow
+   */
+  public void start(Workflow flow) {
     // TODO: check that namespace for workflow is available
     LOG.info("Starting workflow " + flow.getName());
     flow.validate();
@@ -206,14 +210,65 @@ public class TaskDriver {
     addWorkflowResource(flow.getName());
   }
 
-  /** Creates a new named job queue (workflow) */
-  public void createQueue(JobQueue queue) throws Exception {
+  /**
+   * Update the configuration of a non-terminable workflow (queue).
+   * The terminable workflow's configuration is not allowed
+   * to change once created.
+   * Note:
+   * For recurrent workflow, the current running schedule will not be effected,
+   * the new configuration will be applied to the next scheduled runs of the workflow.
+   * For non-recurrent workflow, the new configuration may (or may not) be applied
+   * on the current running jobs, but it will be applied on the following unscheduled jobs.
+   *
+   * Example:
+   *
+   * WorkflowConfig currentWorkflowConfig = TaskUtil.getWorkflowCfg(_manager, workflow);
+   * WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig);
+
+   * // make needed changes to the config here
+   * configBuilder.setXXX();
+   *
+   * // update workflow configuration
+   * _driver.updateWorkflow(workflow, configBuilder.build());
+   *
+   * @param workflow
+   * @param newWorkflowConfig
+   */
+  public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
+    WorkflowConfig currentConfig =
+        TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, workflow);
+    if (currentConfig == null) {
+      throw new HelixException("Workflow " + workflow + " does not exist!");
+    }
+
+    if (currentConfig.isTerminable()) {
+      throw new HelixException(
+          "Workflow " + workflow + " is terminable, not allow to change its configuration!");
+    }
+
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, workflow),
+        newWorkflowConfig.getResourceConfigMap());
+
+    TaskUtil.invokeRebalance(_accessor, workflow);
+  }
+
+  /**
+   * Creates a new named job queue (workflow)
+   *
+   * @param queue
+   */
+  public void createQueue(JobQueue queue) {
     start(queue);
   }
 
-  /** Flushes a named job queue */
+  /**
+   * Flushes a named job queue
+   *
+   * @param queueName
+   * @throws Exception
+   */
   // TODO: need to make sure the queue is stopped or completed before flush the queue.
-  public void flushQueue(String queueName) throws Exception {
+  public void flushQueue(String queueName) {
     WorkflowConfig config =
         TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
     if (config == null) {
@@ -275,7 +330,13 @@ public class TaskDriver {
     _propertyStore.update(path, updater, AccessOption.PERSISTENT);
   }
 
-  /** Delete a job from an existing named queue, the queue has to be stopped prior to this
call */
+  /**
+   * Delete a job from an existing named queue,
+   * the queue has to be stopped prior to this call
+   *
+   * @param queueName
+   * @param jobName
+   */
   public void deleteJob(final String queueName, final String jobName) {
     WorkflowConfig workflowCfg =
         TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
@@ -315,8 +376,12 @@ public class TaskDriver {
     }
   }
 
-
-  /** delete a job from a scheduled (non-recurrent) queue.*/
+  /**
+   * delete a job from a scheduled (non-recurrent) queue.
+   *
+   * @param queueName
+   * @param jobName
+   */
   private void deleteJobFromScheduledQueue(final String queueName, final String jobName)
{
     WorkflowConfig workflowCfg =
         TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
@@ -354,9 +419,7 @@ public class TaskDriver {
     _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
   }
 
-  /**
-   * Remove the job name from the DAG from the queue configuration
-   */
+  /** Remove the job name from the DAG from the queue configuration */
   private void removeJobFromDag(final String queueName, final String jobName) {
     final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
 
@@ -410,8 +473,7 @@ public class TaskDriver {
     }
   }
 
-  /** update queue's property to remove job from JOB_STATES if it is already started.
-   */
+  /** update queue's property to remove job from JOB_STATES if it is already started. */
   private void removeJobStateFromQueue(final String queueName, final String jobName) {
     final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
     String queuePropertyPath =
@@ -435,9 +497,16 @@ public class TaskDriver {
     }
   }
 
-  /** Adds a new job to the end an existing named queue */
-  public void enqueueJob(final String queueName, final String jobName, JobConfig.Builder
jobBuilder)
-      throws Exception {
+  /**
+   * Adds a new job to the end an existing named queue.
+   *
+   * @param queueName
+   * @param jobName
+   * @param jobBuilder
+   * @throws Exception
+   */
+  public void enqueueJob(final String queueName, final String jobName,
+      JobConfig.Builder jobBuilder) {
     // Get the job queue config and capacity
     HelixProperty workflowConfig =
         _accessor.getProperty(_accessor.keyBuilder().resourceConfig(queueName));
@@ -468,12 +537,12 @@ public class TaskDriver {
         JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
         Set<String> allNodes = jobDag.getAllNodes();
         if (allNodes.size() >= capacity) {
-          throw new IllegalStateException("Queue " + queueName + " is at capacity, will not
add "
-              + jobName);
+          throw new IllegalStateException(
+              "Queue " + queueName + " is at capacity, will not add " + jobName);
         }
         if (allNodes.contains(namespacedJobName)) {
-          throw new IllegalStateException("Could not add to queue " + queueName + ", job
"
-              + jobName + " already exists");
+          throw new IllegalStateException(
+              "Could not add to queue " + queueName + ", job " + jobName + " already exists");
         }
         jobDag.addNode(namespacedJobName);
 
@@ -493,8 +562,8 @@ public class TaskDriver {
         try {
           currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
         } catch (Exception e) {
-          throw new IllegalStateException(
-              "Could not add job " + jobName + " to queue " + queueName, e);
+          throw new IllegalStateException("Could not add job " + jobName + " to queue " +
queueName,
+              e);
         }
         return currentData;
       }
@@ -550,17 +619,29 @@ public class TaskDriver {
     }
   }
 
-  /** Public method to resume a workflow/queue */
+  /**
+   * Public method to resume a workflow/queue.
+   *
+   * @param workflow
+   */
   public void resume(String workflow) {
     setWorkflowTargetState(workflow, TargetState.START);
   }
 
-  /** Public method to stop a workflow/queue */
+  /**
+   * Public method to stop a workflow/queue.
+   *
+   * @param workflow
+   */
   public void stop(String workflow) {
     setWorkflowTargetState(workflow, TargetState.STOP);
   }
 
-  /** Public method to delete a workflow/queue */
+  /**
+   * Public method to delete a workflow/queue.
+   *
+   * @param workflow
+   */
   public void delete(String workflow) {
     setWorkflowTargetState(workflow, TargetState.DELETE);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 56fba58..4c81654 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -18,13 +18,14 @@ package org.apache.helix.task;
  * specific language governing permissions and limitations
  * under the License.
  */
-
+import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
+import org.apache.helix.HelixException;
 
 /**
  * Provides a typed interface to workflow level configurations. Validates the configurations.
@@ -44,7 +45,9 @@ public class WorkflowConfig {
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
 
   /* Member variables */
+  // TODO: jobDag should not be in the workflowConfig.
   private final JobDag _jobDag;
+
   // _parallelJobs would kind of break the job dependency,
   // e.g: if job1 -> job2, but _parallelJobs == 2,
   // then job1 and job2 could be scheduled at the same time
@@ -114,9 +117,13 @@ public class WorkflowConfig {
     return _scheduleConfig.getStartTime();
   }
 
-  public Map<String, String> getResourceConfigMap() throws Exception {
+  public Map<String, String> getResourceConfigMap() {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
+    try {
+      cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
+    } catch (IOException ex) {
+      throw new HelixException("Invalid job dag configuration!", ex);
+    }
     cfgMap.put(WorkflowConfig.PARALLEL_JOBS, String.valueOf(getParallelJobs()));
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry()));
     cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name());
@@ -153,6 +160,17 @@ public class WorkflowConfig {
       return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _isTerminable,
_scheduleConfig);
     }
 
+    public Builder() {}
+
+    public Builder(WorkflowConfig workflowConfig) {
+      _taskDag = workflowConfig.getJobDag();
+      _parallelJobs = workflowConfig.getParallelJobs();
+      _targetState = workflowConfig.getTargetState();
+      _expiry = workflowConfig.getExpiry();
+      _isTerminable = workflowConfig.isTerminable();
+      _scheduleConfig = workflowConfig.getScheduleConfig();
+    }
+
     public Builder setJobDag(JobDag v) {
       _taskDag = v;
       return this;

http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 11677b8..06b9751 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -49,22 +49,26 @@ public class TaskTestUtil {
    * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout
is
    * reached.
    * If the task has not reached target state by then, an error is thrown
+   *
    * @param workflowResource Resource to poll for completeness
    * @throws InterruptedException
    */
   public static void pollForWorkflowState(HelixManager manager, String workflowResource,
-      TaskState state) throws InterruptedException {
+      TaskState... targetStates) throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
+    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
     do {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() !=
state)
-        && System.currentTimeMillis() < st + _default_timeout);
+    } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates
+        .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st
+ _default_timeout);
 
     Assert.assertNotNull(ctx);
-    Assert.assertEquals(ctx.getWorkflowState(), state);
+    TaskState workflowState = ctx.getWorkflowState();
+    Assert.assertTrue(allowedStates.contains(workflowState),
+        "expect workflow states: " + allowedStates + " actual workflow state: " + workflowState);
   }
 
   /**
@@ -101,10 +105,13 @@ public class TaskTestUtil {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
     }
-    while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
+    while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(
+        ctx.getJobState(jobName)))
         && System.currentTimeMillis() < st + _default_timeout);
-    Assert.assertNotNull(ctx);
-    Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
+    Assert.assertNotNull(ctx, "Empty job context");
+    TaskState jobState = ctx.getJobState(jobName);
+    Assert.assertTrue(allowedStates.contains(jobState),
+        "expect job states: " + allowedStates + " actual job state: " + jobState);
   }
 
   public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
@@ -127,8 +134,8 @@ public class TaskTestUtil {
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
     do {
-      Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+      Thread.sleep(100);
     } while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
     Assert.assertNotNull(ctx);
     return ctx;
@@ -228,9 +235,14 @@ public class TaskTestUtil {
   }
 
   public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart)
{
+    return buildRecurrentJobQueue(jobQueueName, delayStart, 60);
+  }
+
+  public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
+      int recurrenInSeconds) {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
-    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
+    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(recurrenInSeconds));
     cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
     Calendar cal = Calendar.getInstance();
     cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -238,8 +250,6 @@ public class TaskTestUtil {
     cal.set(Calendar.MILLISECOND, 0);
     cfgMap.put(WorkflowConfig.START_TIME,
         WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
-    //cfgMap.put(WorkflowConfig.START_TIME,
-    //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
     return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
   }
 
@@ -262,8 +272,4 @@ public class TaskTestUtil {
   public static JobQueue.Builder buildJobQueue(String jobQueueName) {
     return buildJobQueue(jobQueueName, 0);
   }
-
-  public static boolean pollForParticipantParallelState() {
-    return false;
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index 31e4325..9fd7735 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -68,7 +68,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase
{
   private HelixManager _manager;
   private TaskDriver _driver;
 
-  @BeforeClass public void beforeClass() throws Exception {
+  @BeforeClass
+  public void beforeClass() throws Exception {
     String namespace = "/" + CLUSTER_NAME;
     if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursive(namespace);
@@ -141,7 +142,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase
{
     }
   }
 
-  @Test public void testJobFailsWithMissingTarget() throws Exception {
+  @Test
+  public void testJobFailsWithMissingTarget() throws Exception {
     String queueName = TestHelper.getTestMethodName();
 
     // Create a queue
@@ -166,7 +168,8 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase
{
     TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
   }
 
-  @Test public void testJobFailsWithMissingTargetInRunning() throws Exception {
+  @Test
+  public void testJobFailsWithMissingTargetInRunning() throws Exception {
     String queueName = TestHelper.getTestMethodName();
 
     // Create a queue

http://git-wip-us.apache.org/repos/asf/helix/blob/7a470703/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
new file mode 100644
index 0000000..fc93392
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
@@ -0,0 +1,220 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.ScheduleConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class TestUpdateWorkflow extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestUpdateWorkflow.class);
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
+  private static final String TGT_DB = "TestDB";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+  private ZKHelixDataAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    _accessor =
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set up target db
+    setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
+    setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new MockTask(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+
+    _driver = new TaskDriver(_manager);
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+            ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    _controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      _participants[i].syncStop();
+    }
+  }
+
+  @Test
+  public void testUpdateQueueConfig() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i <= 1; i++) {
+      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+      JobConfig.Builder jobConfig =
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      String jobName = targetPartition.toLowerCase() + "Job" + i;
+      queueBuild.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuild.build());
+
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(_manager, queueName);
+    WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowConfig);
+
+    Calendar startTime = Calendar.getInstance();
+    startTime.set(Calendar.SECOND, startTime.get(Calendar.SECOND) + 1);
+
+    ScheduleConfig scheduleConfig =
+        ScheduleConfig.recurringFromDate(startTime.getTime(), TimeUnit.MINUTES, 2);
+
+    configBuilder.setScheduleConfig(scheduleConfig);
+
+    // ensure current schedule is started
+    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.IN_PROGRESS);
+
+    _driver.updateWorkflow(queueName, configBuilder.build());
+
+    // ensure current schedule is completed
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.COMPLETED);
+
+    Thread.sleep(1000);
+
+    wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
+    scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+    WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, scheduledQueue);
+
+    Calendar configStartTime = Calendar.getInstance();
+    configStartTime.setTime(wCfg.getStartTime());
+
+    Assert.assertTrue(
+        (startTime.get(Calendar.HOUR_OF_DAY) == configStartTime.get(Calendar.HOUR_OF_DAY)
&&
+            startTime.get(Calendar.MINUTE) == configStartTime.get(Calendar.MINUTE) &&
+            startTime.get(Calendar.SECOND) == configStartTime.get(Calendar.SECOND)));
+  }
+}
+


Mime
View raw message