helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: [HELIX-696] fix workflow state flip-flop issue
Date Fri, 20 Apr 2018 16:57:05 GMT
Repository: helix
Updated Branches:
  refs/heads/master a6bdb3c22 -> 317c300c8


[HELIX-696] fix workflow state flip-flop issue


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/317c300c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/317c300c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/317c300c

Branch: refs/heads/master
Commit: 317c300c8f951c7e8308cb0b24e48f97a1ef32ef
Parents: a6bdb3c
Author: Harry Zhang <zhan849@usc.edu>
Authored: Thu Apr 19 16:13:13 2018 -0700
Committer: Harry Zhang <zhan849@usc.edu>
Committed: Fri Apr 20 09:56:34 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/JobRebalancer.java    |   1 +
 .../org/apache/helix/task/TaskRebalancer.java   |  36 ++-
 .../apache/helix/task/WorkflowRebalancer.java   |  66 +++--
 .../integration/task/TestDeleteWorkflow.java    |   2 +-
 .../task/TestWorkflowTermination.java           | 294 +++++++++++++++++++
 .../helix/task/TestJobStateOnCreation.java      |   2 +-
 6 files changed, 366 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 1620238..fd80229 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -678,6 +678,7 @@ public class JobRebalancer extends TaskRebalancer {
     jobContext.setFinishTime(currentTime);
     if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) {
       workflowContext.setFinishTime(currentTime);
+      updateWorkflowMonitor(workflowContext, workflowConfig);
     }
     scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index ece1935..d8b8ea9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -65,24 +65,31 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
    *
    * @param ctx Workflow context containing job states
    * @param cfg Workflow config containing set of jobs
-   * @return returns true if the workflow either completed (all tasks are {@link TaskState#COMPLETED})
-   * or failed (any task is {@link TaskState#FAILED}, false otherwise.
+   * @return returns true if the workflow
+   *            1. completed (all tasks are {@link TaskState#COMPLETED})
+   *            2. failed (any task is {@link TaskState#FAILED}
+   *            3. workflow is {@link TaskState#TIMED_OUT}
+   *         returns false otherwise.
    */
   protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
       Map<String, JobConfig> jobConfigMap) {
     boolean incomplete = false;
-    int failedJobs = 0;
+
     TaskState workflowState = ctx.getWorkflowState();
+    if (TaskState.TIMED_OUT.equals(workflowState)) {
+      // We don't update job state here as JobRebalancer will do it
+      return true;
+    }
+
+    // Check if failed job count is beyond threshold and if so, fail the workflow
+    // and abort in-progress jobs
+    int failedJobs = 0;
     for (String job : cfg.getJobDag().getAllNodes()) {
       TaskState jobState = ctx.getJobState(job);
       if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
         failedJobs++;
-        if (TaskState.TIMED_OUT.equals(workflowState) || (!cfg.isJobQueue() && failedJobs
> cfg
-            .getFailureThreshold())) {
+        if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);
-          if (_clusterStatusMonitor != null) {
-            _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED);
-          }
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
@@ -104,10 +111,6 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
 
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
-      if (_clusterStatusMonitor != null) {
-        _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED,
-            ctx.getFinishTime() - ctx.getStartTime());
-      }
       return true;
     }
 
@@ -254,6 +257,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     }
     if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) {
       workflowContext.setFinishTime(currentTime);
+      updateWorkflowMonitor(workflowContext, workflowConfig);
     }
     scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
   }
@@ -270,6 +274,14 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     }
   }
 
+  protected void updateWorkflowMonitor(WorkflowContext context,
+      WorkflowConfig config) {
+    if (_clusterStatusMonitor != null) {
+      _clusterStatusMonitor.updateWorkflowCounters(config, context.getWorkflowState(),
+          context.getFinishTime() - context.getStartTime());
+    }
+  }
+
   /**
    * Check if a workflow is ready to schedule.
    *

http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 709ba65..9233f9b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -54,6 +54,9 @@ import com.google.common.collect.Lists;
  */
 public class WorkflowRebalancer extends TaskRebalancer {
   private static final Logger LOG = LoggerFactory.getLogger(WorkflowRebalancer.class);
+  private static final Set<TaskState> finalStates = new HashSet<>(
+      Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, TaskState.TIMED_OUT)
+  );
 
   @Override
   public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
@@ -68,40 +71,41 @@ public class WorkflowRebalancer extends TaskRebalancer {
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
-    WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflow);
-    // Initialize workflow context if needed
-    if (workflowCtx == null) {
-      workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
-      workflowCtx.setStartTime(System.currentTimeMillis());
-      workflowCtx.setName(workflow);
+    WorkflowContext workflowCtx = getOrInitializeWorkflowContext(clusterData, workflow);
+
+    // Step 1: Check for deletion - if so, we don't need to go through further steps
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE) {
+      LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
+      cleanupWorkflow(workflow,  workflowCfg);
+      return buildEmptyAssignment(workflow, currStateOutput);
     }
 
-    Set<TaskState> finalStates = new HashSet<>(Arrays.asList(
-        new TaskState[] { TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED,
-            TaskState.FAILED, TaskState.TIMED_OUT
-        }));
+    // Step 2: handle timeout, which should have higher priority than STOP
     // Only generic workflow get timeouted and schedule rebalance for timeout. Will skip
the set if
     // the workflow already got timeouted. Job Queue will ignore the setup.
     if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState()))
{
       // If timeout point has already been passed, it will not be scheduled
       scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
 
-      if (isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
+      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState()) && isTimeout(
+          workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
         workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
         clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
-        return buildEmptyAssignment(workflow, currStateOutput);
       }
-    }
 
-    // Clean up if workflow marked for deletion
-    TargetState targetState = workflowCfg.getTargetState();
-    if (targetState == TargetState.DELETE) {
-      LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
-      cleanupWorkflow(workflow,  workflowCfg);
-      return buildEmptyAssignment(workflow, currStateOutput);
+      // We should not return after setting timeout, as in case the workflow is stopped already
+      // marking it timeout will not trigger rebalance pipeline as we are not listening on
+      // PropertyStore change, nor will we schedule rebalance for timeout as at this point,
+      // workflow is already timed-out. We should let the code proceed and wait for schedule
+      // future cleanup work
     }
 
-    if (targetState == TargetState.STOP) {
+    // Step 3: handle workflow that should STOP
+    // For workflows that already reached final states, STOP should not take into effect
+    if (!finalStates.contains(workflowCtx.getWorkflowState()) && TargetState.STOP
+        .equals(targetState)) {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
       if (isWorkflowStopped(workflowCtx, workflowCfg)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
@@ -111,13 +115,21 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
 
     long currentTime = System.currentTimeMillis();
-    // Check if workflow has been finished and mark it if it is.
+
+    // Step 4: Check and process finished workflow context (confusing,
+    // but its inside isWorkflowFinished())
+    // Check if workflow has been finished and mark it if it is. Also update cluster status
+    // monitor if provided
+    // Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed
+    // This is to handle TIMED_OUT only
     if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
         && isWorkflowFinished(workflowCtx, workflowCfg, clusterData.getJobConfigMap()))
{
       workflowCtx.setFinishTime(currentTime);
+      updateWorkflowMonitor(workflowCtx, workflowCfg);
       clusterData.updateWorkflowContext(workflow, workflowCtx, _manager.getHelixDataAccessor());
     }
 
+    // Step 5: Handle finished workflows
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
       LOG.info("Workflow " + workflow + " is finished.");
       long expiryTime = workflowCfg.getExpiry();
@@ -160,6 +172,18 @@ public class WorkflowRebalancer extends TaskRebalancer {
     return buildEmptyAssignment(workflow, currStateOutput);
   }
 
+  private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache clusterData, String
workflowName) {
+    WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowName);
+    if (workflowCtx == null) {
+      WorkflowConfig config = clusterData.getWorkflowConfig(workflowName);
+      workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+      workflowCtx.setName(workflowName);
+      LOG.debug("Workflow context is created for " + workflowName);
+    }
+    return workflowCtx;
+  }
+
   /**
    * Figure out whether the jobs in the workflow should be run,
    * and if it's ready, then just schedule it

http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
index a151827..381bdeb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
@@ -18,7 +18,7 @@ import org.testng.annotations.Test;
 
 
 public class TestDeleteWorkflow extends TaskTestBase  {
-  private static final int DELETE_DELAY = 1000;
+  private static final int DELETE_DELAY = 2000;
 
   private HelixAdmin admin;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
new file mode 100644
index 0000000..71d494b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -0,0 +1,294 @@
+package org.apache.helix.integration.task;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This test contains cases when a workflow finish
+ */
+public class TestWorkflowTermination extends TaskTestBase {
+  private final static String JOB_NAME = "TestJob";
+  private final static String WORKFLOW_TYPE = "TestWorkflow";
+  private static final MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 1;
+    _numNodes = 3;
+    _numParitions = 5;
+    _numReplicas = 3;
+    super.beforeClass();
+  }
+
+  private JobConfig.Builder createJobConfigBuilder(String workflow, boolean shouldJobFail,
long timeoutMs) {
+    String taskState = shouldJobFail ? TaskState.FAILED.name() : TaskState.COMPLETED.name();
+    return new JobConfig.Builder()
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+        .setWorkflow(workflow)
+        .setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(
+            ImmutableMap.of(
+                MockTask.TIMEOUT_CONFIG, Long.toString(timeoutMs),
+                MockTask.TASK_RESULT_STATUS, taskState
+            )
+        );
+  }
+
+  @Test
+  public void testWorkflowSucceed() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    long workflowExpiry = 2000;
+    long timeout = 2000;
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 50);
+    jobBuilder.setWorkflow(workflowName);
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
+        .setWorkflowConfig(
+            new WorkflowConfig.Builder(workflowName)
+                .setTimeout(timeout)
+                .setWorkFlowType(WORKFLOW_TYPE)
+                .build()
+        )
+        .addJob(JOB_NAME, jobBuilder)
+        .setExpiry(workflowExpiry);
+    _driver.start(workflowBuilder.build());
+
+    // Timeout is longer than job finish so workflow status should be COMPLETED
+    _driver.pollForWorkflowState(workflowName, 5000L, TaskState.COMPLETED);
+    WorkflowContext context = _driver.getWorkflowContext(workflowName);
+    Assert.assertTrue(context.getFinishTime() - context.getStartTime() < timeout);
+
+    // Workflow should be cleaned up after expiry
+    Thread.sleep(workflowExpiry + 200);
+    verifyWorkflowCleanup(workflowName, getJobNameToPoll(workflowName, JOB_NAME));
+
+    ObjectName objectName = getWorkflowMBeanObjectName(workflowName);
+    Assert.assertEquals((long) beanServer.getAttribute(objectName, "SuccessfulWorkflowCount"),
1);
+    Assert.assertTrue((long) beanServer.getAttribute(objectName, "MaximumWorkflowLatencyGauge")
> 0);
+    Assert.assertTrue((long) beanServer.getAttribute(objectName, "WorkflowLatencyCount")
> 0);
+
+  }
+
+  @Test
+  public void testWorkflowRunningTimeout() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    String notStartedJobName = JOB_NAME + "-NotStarted";
+    long workflowExpiry = 2000; // 2sec expiry time
+    long timeout = 50;
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 5000);
+    jobBuilder.setWorkflow(workflowName);
+
+    // Create a workflow where job2 depends on job1. Workflow would timeout before job1 finishes
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
+        .setWorkflowConfig(
+            new WorkflowConfig.Builder(workflowName)
+                .setTimeout(timeout)
+                .setWorkFlowType(WORKFLOW_TYPE)
+                .build()
+        )
+        .addJob(JOB_NAME, jobBuilder)
+        .addJob(notStartedJobName, jobBuilder)
+        .addParentChildDependency(JOB_NAME, notStartedJobName)
+        .setExpiry(workflowExpiry);
+
+    _driver.start(workflowBuilder.build());
+
+    _driver.pollForWorkflowState(workflowName, 10000L, TaskState.TIMED_OUT);
+
+    // Running job should be marked as timeout
+    // and job not started should be marked as NOT_STARTED
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, JOB_NAME), 10000L,
TaskState.TIMED_OUT);
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, notStartedJobName),
10000L,
+        TaskState.NOT_STARTED);
+
+    WorkflowContext context = _driver.getWorkflowContext(workflowName);
+    Assert.assertTrue(context.getFinishTime() - context.getStartTime() >= timeout);
+
+    Thread.sleep(workflowExpiry + 200);
+
+    verifyWorkflowCleanup(workflowName, getJobNameToPoll(workflowName, JOB_NAME),
+        getJobNameToPoll(workflowName, notStartedJobName));
+  }
+
+  @Test
+  public void testWorkflowPausedTimeout() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    long workflowExpiry = 2000; // 2sec expiry time
+    long timeout = 2000;
+    String notStartedJobName = JOB_NAME + "-NotStarted";
+
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 100);
+    jobBuilder.setWorkflow(workflowName);
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
+        .setWorkflowConfig(
+            new WorkflowConfig.Builder(workflowName)
+                .setTimeout(timeout)
+                .setWorkFlowType(WORKFLOW_TYPE)
+                .build()
+        )
+        .addJob(JOB_NAME, jobBuilder)
+        .addJob(notStartedJobName, jobBuilder)
+        .addParentChildDependency(JOB_NAME, notStartedJobName)
+        .setExpiry(workflowExpiry);
+
+    _driver.start(workflowBuilder.build());
+
+    // Wait a bit for the job to get scheduled. Job runs for 100ms so this will very likely
+    // to trigger a job stopped
+    Thread.sleep(40);
+
+    // Pause the queue
+    _driver.waitToStop(workflowName, 10000L);
+
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, JOB_NAME), 10000L,
+        TaskState.STOPPED);
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, notStartedJobName),
10000L,
+        TaskState.NOT_STARTED);
+
+    _driver.pollForWorkflowState(workflowName, 10000L, TaskState.TIMED_OUT);
+
+    WorkflowContext context = _driver.getWorkflowContext(workflowName);
+    Assert.assertTrue(context.getFinishTime() - context.getStartTime() >= timeout);
+
+    Thread.sleep(workflowExpiry + 200);
+    verifyWorkflowCleanup(workflowName, getJobNameToPoll(workflowName, JOB_NAME),
+        getJobNameToPoll(workflowName, notStartedJobName));
+
+  }
+
+  @Test
+  public void testJobQueueNotApplyTimeout() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    long timeout = 1000;
+    // Make jobs run success
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(queueName, false, 10);
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(queueName);
+    jobQueue.setWorkflowConfig(
+        new WorkflowConfig.Builder(queueName)
+            .setTimeout(timeout)
+            .setWorkFlowType(WORKFLOW_TYPE)
+            .build()
+    )
+        .enqueueJob(JOB_NAME, jobBuilder)
+        .enqueueJob(JOB_NAME + 1, jobBuilder);
+
+    _driver.start(jobQueue.build());
+
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, JOB_NAME),
+        TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, JOB_NAME
+ 1),
+        TaskState.COMPLETED);
+
+    Thread.sleep(timeout);
+
+    // Verify that job queue is still in progress
+    _driver.pollForWorkflowState(queueName, 10000L, TaskState.IN_PROGRESS);
+  }
+
+  @Test
+  public void testWorkflowJobFail() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    String job1 = JOB_NAME + "1";
+    String job2 = JOB_NAME + "2";
+    String job3 = JOB_NAME + "3";
+    String job4 = JOB_NAME + "4";
+    long workflowExpiry = 2000;
+    long timeout = 5000;
+
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 50);
+    JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, true, 10);
+
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
+        .setWorkflowConfig(
+            new WorkflowConfig.Builder(workflowName)
+                .setWorkFlowType(WORKFLOW_TYPE)
+                .setTimeout(timeout)
+                .setFailureThreshold(1)
+                .build()
+        )
+        .addJob(job1, jobBuilder)
+        .addJob(job2, jobBuilder)
+        .addJob(job3, failedJobBuilder)
+        .addJob(job4, jobBuilder)
+        .addParentChildDependency(job1, job2)
+        .addParentChildDependency(job1, job3)
+        .addParentChildDependency(job2, job4)
+        .addParentChildDependency(job3, job4)
+        .setExpiry(workflowExpiry);
+
+    _driver.start(workflowBuilder.build());
+
+    _driver.pollForWorkflowState(workflowName, 5000L, TaskState.FAILED);
+
+    // Timeout is longer than fail time, so the failover should occur earlier
+    WorkflowContext context = _driver.getWorkflowContext(workflowName);
+    Assert.assertTrue(context.getFinishTime() - context.getStartTime() < timeout);
+
+    // job1 will complete
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job1), 5000L,
+        TaskState.COMPLETED);
+
+    // Possible race between 2 and 3 so it's likely for job2 to stay in either COMPLETED
or ABORTED
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job2), 5000L,
+        TaskState.COMPLETED, TaskState.ABORTED);
+
+    // job3 meant to fail
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job3), 5000L,
+        TaskState.FAILED);
+
+    // because job4 has dependency over job3, it will fail as well
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, job4), 5000L,
+        TaskState.FAILED);
+
+    // Check MBean is updated
+    ObjectName objectName = getWorkflowMBeanObjectName(workflowName);
+    Assert.assertEquals((long) beanServer.getAttribute(objectName, "FailedWorkflowCount"),
1);
+
+    // For a failed workflow, after timing out, it will be purged
+    Thread.sleep(workflowExpiry + 200);
+    verifyWorkflowCleanup(
+        workflowName,
+        getJobNameToPoll(workflowName, job1),
+        getJobNameToPoll(workflowName, job2),
+        getJobNameToPoll(workflowName, job3),
+        getJobNameToPoll(workflowName, job4)
+    );
+  }
+
+  private void verifyWorkflowCleanup(String workflowName, String... jobNames) {
+    Assert.assertNull(_driver.getWorkflowConfig(workflowName));
+    Assert.assertNull(_driver.getWorkflowContext(workflowName));
+    for (String job : jobNames) {
+      Assert.assertNull(_driver.getJobConfig(job));
+      Assert.assertNull(_driver.getJobContext(job));
+    }
+  }
+
+  private static String getJobNameToPoll(String workflowName, String jobName) {
+    return String.format("%s_%s", workflowName, jobName);
+  }
+
+  private ObjectName getWorkflowMBeanObjectName(String workflowName)
+      throws MalformedObjectNameException {
+    return new ObjectName(String
+        .format("%s:%s=%s, %s=%s", MonitorDomainNames.ClusterStatus.name(), "cluster",
+            CLUSTER_NAME, "workflowType", WORKFLOW_TYPE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/317c300c/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java b/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
index 4b5b309..c454879 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestJobStateOnCreation.java
@@ -86,4 +86,4 @@ public class TestJobStateOnCreation extends TaskSynchronizedTestBase {
       Assert.assertEquals(jobStates.get(job), TaskState.NOT_STARTED);
     }
   }
-}
\ No newline at end of file
+}


Mime
View raw message