helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [4/7] helix git commit: Be able to stop workflow when no job is running.
Date Wed, 04 Oct 2017 01:49:45 GMT
Be able to stop workflow when no job is running.

Currently, to stop a workflow, the target state of the workflow is set to STOP, then when
each job(as a resource in ideal state) was processed in job rebalancer, it will check whether
all the jobs in the workflow is done(not in IN_PROGRESS or STOPPING) and set the workflow
state to be STOP.
However, if all the jobs are already done, there’s no job in ideal state to process, so
the workflow state never gets a chance to be set to STOP.

This commit adds a check in workflow rebalancer to set the state when all jobs are already
done.

A test is added to test specifically this case.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/408082a3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/408082a3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/408082a3

Branch: refs/heads/master
Commit: 408082a33d91f84556c3da31232fb6d4097b4371
Parents: 94f3961
Author: Weihan Kong <wkong@linkedin.com>
Authored: Mon Feb 13 13:52:16 2017 -0800
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Tue Oct 3 15:08:33 2017 -0700

----------------------------------------------------------------------
 .../apache/helix/task/WorkflowRebalancer.java   |  4 ++
 .../integration/task/TestStopWorkflow.java      | 45 ++++++++++++++++++++
 2 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 830f93a..8e72f7a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -76,6 +76,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     if (targetState == TargetState.STOP) {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
+      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+        TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+      }
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
new file mode 100644
index 0000000..b641698
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -0,0 +1,45 @@
+package org.apache.helix.integration.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStopWorkflow extends TaskTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numParitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testStopWorkflow() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1_will_succeed", jobBuilder);
+    jobQueue.enqueueJob("job2_will_fail", jobBuilder);
+    _driver.start(jobQueue.build());
+
+    // job1 should succeed and job2 should fail, wait until that happens
+    _driver.pollForJobState(jobQueueName,
+        TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"), TaskState.FAILED);
+
+    Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS));
+
+    // Now stop the workflow, and it should be stopped because all jobs have completed or
failed.
+    _driver.waitToStop(jobQueueName, 4000);
+
+    Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED));
+  }
+}


Mime
View raw message