helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] helix git commit: Fix infinite loop for Workflow timeout issue
Date Wed, 11 Apr 2018 20:53:15 GMT
Repository: helix
Updated Branches:
  refs/heads/master 4a76f03e6 -> f8fcd0b71


Fix infinite loop for Workflow timeout issue

Infinity loop that caused by when workflow timed out and workflow is already completed. When
we set the state to be time_out, workflowcontext check prevent this operation happens. Thus,
pipeline will not keep continuing to clean up workflow. It repeats again and again.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f8fcd0b7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f8fcd0b7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f8fcd0b7

Branch: refs/heads/master
Commit: f8fcd0b717bfad2e3b351c52187d14aadb0fcaa7
Parents: 4b50445
Author: Junkai Xue <jxue@linkedin.com>
Authored: Tue Apr 3 16:19:39 2018 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Wed Apr 11 13:52:26 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/WorkflowRebalancer.java  |  8 +++++++-
 .../integration/task/TestWorkflowTimeout.java      | 17 +++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f8fcd0b7/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 1590039..04f4b09 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -22,8 +22,10 @@ package org.apache.helix.task;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -79,9 +81,13 @@ public class WorkflowRebalancer extends TaskRebalancer {
       LOG.debug("Workflow context is created for " + workflow);
     }
 
+    Set<TaskState> finalStates = new HashSet<>(Arrays.asList(
+        new TaskState[] { TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED,
+            TaskState.FAILED, TaskState.TIMED_OUT
+        }));
     // Only generic workflow get timeouted and schedule rebalance for timeout. Will skip
the set if
     // the workflow already got timeouted. Job Queue will ignore the setup.
-    if (!workflowCfg.isJobQueue() && !TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()))
{
+    if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState()))
{
       // If timeout point has already been passed, it will not be scheduled
       scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/f8fcd0b7/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
index c1f7060..0229943 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
@@ -12,6 +12,7 @@ import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -81,4 +82,20 @@ public class TestWorkflowTimeout extends TaskTestBase {
     // Add back the config
     _jobBuilder.setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999"));
   }
+
+  @Test
+  public void testWorkflowTimeoutWhenWorkflowCompleted() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    _jobBuilder.setWorkflow(workflowName);
+    _jobBuilder.setJobCommandConfigMap(Collections.<String, String>emptyMap());
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
+        .setWorkflowConfig(new WorkflowConfig.Builder(workflowName).setTimeout(0).build())
+        .addJob(JOB_NAME, _jobBuilder).setExpiry(2000L);
+
+    _driver.start(workflowBuilder.build());
+    // Pause the queue
+    Thread.sleep(2500);
+    Assert.assertNull(_driver.getWorkflowConfig(workflowName));
+    Assert.assertNull(_driver.getJobContext(workflowName));
+  }
 }


Mime
View raw message