helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [1/4] helix git commit: Add ABORT state in TaskState
Date Thu, 29 Sep 2016 01:38:11 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 9fc6c540b -> dca1ed05b


Add ABORT state in TaskState

1. Add ABORT state in TaskState
2. Set tasks IN_PROGRESS to ABORT when workflow fails


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/516ff0dc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/516ff0dc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/516ff0dc

Branch: refs/heads/helix-0.6.x
Commit: 516ff0dcad13cea578168b41752b62e45d43362d
Parents: 9fc6c54
Author: Junkai Xue <jxue@linkedin.com>
Authored: Thu Sep 22 16:28:58 2016 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Thu Sep 22 16:28:58 2016 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/TaskRebalancer.java   |  5 +++
 .../java/org/apache/helix/task/TaskState.java   |  6 +++-
 .../TestGenericTaskAssignmentCalculator.java    | 36 ++++++++++++++++++++
 3 files changed, 46 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 22f91e7..1517c5a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -83,6 +83,11 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
         failedJobs ++;
         if (failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);
+          for (String jobToFail : cfg.getJobDag().getAllNodes()) {
+            if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
+              ctx.setJobState(jobToFail, TaskState.ABORTED);
+            }
+          }
           return true;
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index dac5df9..1000a9b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -43,5 +43,9 @@ public enum TaskState {
   /**
    * All the task partitions have completed normally.
    */
-  COMPLETED
+  COMPLETED,
+  /**
+   * The task are aborted due to workflow fail
+   */
+  ABORTED
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/516ff0dc/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
index 5645009..7cff051 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -39,6 +39,7 @@ import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.tools.ClusterSetup;
 import org.testng.Assert;
@@ -54,6 +55,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
   private Map<String, Integer> _runCounts = Maps.newHashMap();
   private TaskConfig _taskConfig;
   private Map<String, String> _jobCommandMap;
+  private boolean failTask;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -110,6 +112,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase
{
 
   @Test
   public void testMultipleJobAssignment() throws InterruptedException {
+    failTask = false;
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
@@ -130,6 +133,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase
{
 
   @Test
   public void testMultipleTaskAssignment() throws InterruptedException {
+    failTask = false;
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
 
@@ -148,6 +152,35 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase
{
     Assert.assertEquals(_runCounts.size(), 5);
   }
 
+  @Test
+  public void testAbortTaskForWorkflowFail()
+      throws InterruptedException {
+    failTask = true;
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+    taskConfigs.add(_taskConfig);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(_jobCommandMap);
+
+    for (int i = 0; i < 5; i++) {
+      workflowBuilder.addJob("JOB" + i, jobBuilder);
+    }
+
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.FAILED);
+
+    int abortedTask = 0;
+    for (TaskState jobState : _driver.getWorkflowContext(workflowName).getJobStates().values())
{
+      if (jobState == TaskState.ABORTED) {
+        abortedTask++;
+      }
+    }
+
+    Assert.assertEquals(abortedTask, 4);
+  }
+
   private class TaskOne extends MockTask {
     private final String _instanceName;
 
@@ -165,6 +198,9 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase
{
     public TaskResult run() {
       _invokedClasses.add(getClass().getName());
       _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
+      if (failTask) {
+        return new TaskResult(TaskResult.Status.FAILED, "");
+      }
       return new TaskResult(TaskResult.Status.COMPLETED, "");
     }
   }


Mime
View raw message