helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 10/10: TASK2.0: Job scheduling core pipeline fixes
Date Thu, 28 Mar 2019 19:31:54 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0ad8af404908a54f7b98ee945bf2dda8e83f002f
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:31:25 2019 -0700

    TASK2.0: Job scheduling core pipeline fixes
    
    Task Framework 2.0 had stability issues and race conditions that weren't being handled
correctly. Also, integration with RuntimeJobDag had some loopholes that needed to be fixed.
This diff includes such fixes and improvements that makes it really show performance gains
and cuts down on redundant computation.
    Changelist:
    1. Race condition when a job is enqueued, only the new JobConfig is updated and not the
DAG
        Add a two-way selective update which ensures consistency between JobConfigs and parent
DAGs
    2. Moved where getNextJob() is called in scheduleJobs() in WorkflowDispatcher
        This ensures that once a RuntimeJobDag is rebuilt, update for jobs happens in one
pipeline run, which removes any extra delay or slowness
    3. Race condition where the job you got from getNextJob is for some reason not schedulable
        This is due to deleting and enqueuing a job of the same name
        RuntimeJobDag has the old job name, which conflicts with the dependency in the new
DAG
        This fixes the test: TestTaskRebalancerStopResume so that it does not enqueue a job
of the same name
    4. JobRebalancer was throwing an NPE when calling processJobStatusUpdateAndAssignment()
        This was sometimes making the Controller hang
        Added a null check for JobConfig (job could have been deleted/purged)
    5. Fix bug with isWorkflowStopped
        TargetState comparison was done in the opposite way
        This fixes the test: TestRecurringJobQueue's testDeletingRecurrentQueueWithHistory()
        Sometimes contexts do not get deleted cleanly but this does not affect correctness
    6. Add TestEnqueueJobs
    7. Fix unstable TestGetLastScheduledTaskExecInfo
    8. Other minor style fixes
---
 .../apache/helix/common/caches/TaskDataCache.java  | 19 ++++-
 .../stages/task/TaskSchedulingStage.java           |  5 +-
 .../apache/helix/task/AbstractTaskDispatcher.java  |  2 +-
 .../java/org/apache/helix/task/JobRebalancer.java  | 12 ++-
 .../java/org/apache/helix/task/RuntimeJobDag.java  | 12 +--
 .../main/java/org/apache/helix/task/TaskUtil.java  | 11 ++-
 .../org/apache/helix/task/WorkflowDispatcher.java  |  8 +-
 .../org/apache/helix/task/WorkflowRebalancer.java  |  2 +-
 .../helix/integration/task/TaskTestUtil.java       |  4 +-
 .../helix/integration/task/TestEnqueueJobs.java    | 99 ++++++++++++++++++++++
 .../integration/task/TestRecurringJobQueue.java    |  5 +-
 .../task/TestTaskRebalancerStopResume.java         | 16 ++--
 .../task/TestGetLastScheduledTaskExecInfo.java     |  2 +-
 13 files changed, 161 insertions(+), 36 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 31319ca..5c29124 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -19,7 +19,6 @@ package org.apache.helix.common.caches;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -39,7 +38,6 @@ import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.RuntimeJobDag;
-import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
@@ -128,6 +126,23 @@ public class TaskDataCache extends AbstractDataCache {
       if (!_jobConfigMap.containsKey(jobName) && newJobConfigs.get(jobName).getWorkflow()
!= null) {
         workflowsUpdated.add(newJobConfigs.get(jobName).getWorkflow());
       }
+
+      // Only for JobQueues when a new job is enqueued, there exists a race condition where
only
+      // JobConfig is updated and the RuntimeJobDag does not get updated because when the
client
+      // (TaskDriver) submits, it creates JobConfig ZNode first and modifies its parent JobDag
next.
+      // To ensure that they are both properly updated, check that workflow's DAG and existing
+      // JobConfigs are consistent for JobQueues
+      JobConfig jobConfig = newJobConfigs.get(jobName);
+      if (_workflowConfigMap.containsKey(jobConfig.getWorkflow())) {
+        WorkflowConfig workflowConfig = _workflowConfigMap.get(jobConfig.getWorkflow());
+        // Check that the job's parent workflow's DAG contains this job
+        if ((workflowConfig.isJobQueue() || !workflowConfig.isTerminable()) && !_runtimeJobDagMap
+            .get(workflowConfig.getWorkflowId()).getAllNodes().contains(jobName)) {
+          // Inconsistency between JobConfigs and DAGs found. Add the workflow to workflowsUpdated
+          // to rebuild the RuntimeJobDag
+          workflowsUpdated.add(jobConfig.getWorkflow());
+        }
+      }
     }
 
     // Removed jobs
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index bbc2a2f..94af50d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -259,9 +259,8 @@ public class TaskSchedulingStage extends AbstractBaseStage {
             String quotaType = getQuotaType(cache.getWorkflowConfig(workflowId));
             restOfResources.remove(workflowId);
             if (assignableInstanceManager.hasGlobalCapacity(quotaType)) {
-              _workflowDispatcher
-                  .assignWorkflow(workflowId, cache.getWorkflowConfig(workflowId), context,
-                      currentStateOutput, bestPossibleOutput, resourceMap);
+              _workflowDispatcher.assignWorkflow(workflowId, cache.getWorkflowConfig(workflowId),
+                  context, currentStateOutput, bestPossibleOutput);
             } else {
               LogUtil.logInfo(logger, _eventId, String.format(
                   "Fail to schedule new jobs assignment for Workflow %s due to quota %s is
full",
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 4de8112..78a7419 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -1065,7 +1065,7 @@ public abstract class AbstractTaskDispatcher {
    */
   protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
     if (cfg.isRecurring()) {
-      return cfg.getTargetState() == TargetState.START;
+      return cfg.getTargetState() == TargetState.STOP;
     }
 
     for (String job : cfg.getJobDag().getAllNodes()) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 0acf825..6d80229 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -40,6 +40,13 @@ public class JobRebalancer extends TaskRebalancer {
       CurrentStateOutput currStateOutput) {
     long startTime = System.currentTimeMillis();
     final String jobName = resource.getResourceName();
+    JobConfig jobConfig = clusterData.getJobConfig(jobName);
+    if (jobConfig == null) {
+      LOG.error(
+          "Job {}'s JobConfig is missing. This job might have been deleted or purged. Skipping
status update and assignment!",
+          jobName);
+      return buildEmptyAssignment(jobName, currStateOutput);
+    }
     LOG.debug("Computer Best Partition for job: " + jobName);
     if (_jobDispatcher == null) {
       _jobDispatcher = new JobDispatcher();
@@ -47,9 +54,8 @@ public class JobRebalancer extends TaskRebalancer {
     _jobDispatcher.init(_manager);
     _jobDispatcher.updateCache(clusterData);
     _jobDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
-    ResourceAssignment resourceAssignment = _jobDispatcher
-        .processJobStatusUpdateAndAssignment(jobName, currStateOutput,
-            clusterData.getWorkflowContext(clusterData.getJobConfig(jobName).getWorkflow()));
+    ResourceAssignment resourceAssignment = _jobDispatcher.processJobStatusUpdateAndAssignment(
+        jobName, currStateOutput, clusterData.getWorkflowContext(jobConfig.getWorkflow()));
     LOG.debug(String.format("JobRebalancer computation takes %d ms for Job %s",
         System.currentTimeMillis() - startTime, jobName));
     return resourceAssignment;
diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
index e63e3b4..c223a29 100644
--- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
@@ -20,26 +20,19 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayDeque;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
 import java.util.Set;
 import java.util.HashSet;
 
-import org.apache.helix.HelixException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * RuntimeJobDag is a job DAG that provides the job iterator functionality at runtime (when
jobs are
  * actually being assigned per job category). This is to support assignment of jobs based
on their
  * categories and quotas. RuntimeJobDag uses the list scheduling algorithm using ready-list
and
  * inflight-list to return jobs available for scheduling.
- *
  * NOTE: RuntimeJobDag is not thread-safe.
  */
 public class RuntimeJobDag extends JobDag {
@@ -125,11 +118,15 @@ public class RuntimeJobDag extends JobDag {
     }
     // If list is empty, return null
     if (_readyJobList.isEmpty()) {
+
       return null;
     }
     String nextJob = _readyJobList.poll();
     _inflightJobList.add(nextJob);
     _lastJob = nextJob;
+
+
+
     return nextJob;
   }
 
@@ -212,5 +209,4 @@ public class RuntimeJobDag extends JobDag {
   public Set<String> getInflightJobList() {
     return new HashSet<>(_inflightJobList);
   }
-
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d15cf8f..5da9fc5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -654,7 +654,7 @@ public class TaskUtil {
    * @return True if remove success, otherwise false
    */
   protected static boolean removeWorkflow(final HelixDataAccessor accessor,
-      final HelixPropertyStore propertyStore, String workflow, Set<String> jobs) {
+      final HelixPropertyStore<ZNRecord> propertyStore, String workflow, Set<String>
jobs) {
     // clean up all jobs
     for (String job : jobs) {
       if (!removeJob(accessor, propertyStore, job)) {
@@ -724,9 +724,9 @@ public class TaskUtil {
    * @return
    */
   protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor,
-      HelixPropertyStore propertyStore, WorkflowConfig workflowConfig,
+      HelixPropertyStore<ZNRecord> propertyStore, WorkflowConfig workflowConfig,
       WorkflowContext workflowContext) {
-    Set<String> expiredJobs = new HashSet<String>();
+    Set<String> expiredJobs = new HashSet<>();
 
     if (workflowContext != null) {
       Map<String, TaskState> jobStates = workflowContext.getJobStates();
@@ -742,7 +742,7 @@ public class TaskUtil {
           continue;
         }
         long expiry = jobConfig.getExpiry();
-        if (expiry == workflowConfig.DEFAULT_EXPIRY || expiry < 0) {
+        if (expiry == WorkflowConfig.DEFAULT_EXPIRY || expiry < 0) {
           expiry = workflowConfig.getExpiry();
         }
         if (jobContext != null && jobStates.get(job) == TaskState.COMPLETED) {
@@ -822,7 +822,7 @@ public class TaskUtil {
   /**
    * update workflow's property to remove jobs from JOB_STATES if there are already started.
    */
-  protected static boolean removeJobsState(final HelixPropertyStore propertyStore,
+  protected static boolean removeJobsState(final HelixPropertyStore<ZNRecord> propertyStore,
       final String workflow, final Set<String> jobs) {
     String contextPath =
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE);
@@ -983,7 +983,6 @@ public class TaskUtil {
    * @param workflowConfig
    * @param workflowContext
    */
-
   public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConfig,
       WorkflowContext workflowContext, HelixManager manager,
       RebalanceScheduler rebalanceScheduler) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 80d9afb..51b21eb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -174,7 +174,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
 
   public void assignWorkflow(String workflow, WorkflowConfig workflowCfg,
       WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
-      BestPossibleStateOutput bestPossibleOutput, Map<String, Resource> resourceMap)
{
+      BestPossibleStateOutput bestPossibleOutput) {
     // Fetch workflow configuration and context
     if (workflowCfg == null) {
       // Already logged in status update.
@@ -240,13 +240,13 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     // Assign new jobs
     while (nextJob != null) {
       String job = nextJob;
-      nextJob = jobDag.getNextJob();
       TaskState jobState = workflowCtx.getJobState(job);
       if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Job " + job + " is already started or completed.");
         }
         processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx);
+        nextJob = jobDag.getNextJob();
         continue;
       }
 
@@ -258,6 +258,9 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
         break;
       }
 
+      // TODO: Part of isJobReadyToSchedule() is already done by RuntimeJobDag. Because there
is
+      // some duplicate logic, consider refactoring. The check here and the ready-list in
+      // RuntimeJobDag may cause conflicts.
       // check ancestor job status
       if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap,
           clusterDataCache, clusterDataCache.getAssignableInstanceManager())) {
@@ -288,6 +291,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
           scheduledJobs++;
         }
       }
+      nextJob = jobDag.getNextJob();
     }
 
     long currentScheduledTime =
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index d0c4381..2411b39 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -54,7 +54,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     _workflowDispatcher.updateWorkflowStatus(workflow, workflowCfg, workflowCtx, currStateOutput,
         new BestPossibleStateOutput());
     _workflowDispatcher.assignWorkflow(workflow, workflowCfg, workflowCtx, currStateOutput,
-        new BestPossibleStateOutput(), new HashMap<String, Resource>());
+        new BestPossibleStateOutput());
 
     LOG.debug(String.format("WorkflowRebalancer computation takes %d ms for workflow %s",
         System.currentTimeMillis() - startTime, workflow));
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 02a7628..47b7cb8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -206,7 +206,7 @@ public class TaskTestUtil {
   }
 
   public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
-      int recurrenInSeconds, TargetState targetState) {
+      int recurrenceInSeconds, TargetState targetState) {
     WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder(jobQueueName);
     workflowCfgBuilder.setExpiry(120000);
     if (targetState != null) {
@@ -218,7 +218,7 @@ public class TaskTestUtil {
     cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
     cal.set(Calendar.MILLISECOND, 0);
     ScheduleConfig scheduleConfig =
-        ScheduleConfig.recurringFromDate(cal.getTime(), TimeUnit.SECONDS, recurrenInSeconds);
+        ScheduleConfig.recurringFromDate(cal.getTime(), TimeUnit.SECONDS, recurrenceInSeconds);
     workflowCfgBuilder.setScheduleConfig(scheduleConfig);
     return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
   }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
new file mode 100644
index 0000000..28ee51d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
@@ -0,0 +1,99 @@
+package org.apache.helix.integration.task;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestEnqueueJobs extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+  }
+
+  @Test
+  public void testJobQueueAddingJobsOneByOne() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder().setWorkflowId(queueName).setParallelJobs(1);
+    _driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2);
+    _driver.enqueueJob(queueName, "JOB0", jobBuilder);
+    for (int i = 1; i < 5; i++) {
+      _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" +
(i - 1)),
+          10000L, TaskState.COMPLETED);
+      _driver.waitToStop(queueName, 5000L);
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+      _driver.resume(queueName);
+    }
+
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4),
+        TaskState.COMPLETED);
+  }
+
+  @Test
+  public void testJobQueueAddingJobsAtSametime() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    WorkflowConfig.Builder workflowCfgBuilder =
+        new WorkflowConfig.Builder().setWorkflowId(queueName).setParallelJobs(1);
+    _driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+
+    // Adding jobs
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2);
+    _driver.waitToStop(queueName, 5000L);
+    for (int i = 0; i < 5; i++) {
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+    }
+    _driver.resume(queueName);
+
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4),
+        TaskState.COMPLETED);
+  }
+
+  @Test
+  public void testJobSubmitGenericWorkflows() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2);
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    for (int i = 0; i < 5; i++) {
+      builder.addJob("JOB" + i, jobBuilder);
+    }
+
+    /**
+     * Dependency visualization
+     *               JOB0
+     *
+     *             /   |    \
+     *
+     *         JOB1 <-JOB2   JOB4
+     *
+     *                 |     /
+     *
+     *                JOB3
+     */
+
+    builder.addParentChildDependency("JOB0", "JOB1");
+    builder.addParentChildDependency("JOB0", "JOB2");
+    builder.addParentChildDependency("JOB0", "JOB4");
+    builder.addParentChildDependency("JOB1", "JOB2");
+    builder.addParentChildDependency("JOB2", "JOB3");
+    builder.addParentChildDependency("JOB4", "JOB3");
+    _driver.start(builder.build());
+
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+  }
+}
\ No newline at end of file
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 5eba70a..6d05d38 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -274,12 +274,15 @@ public class TestRecurringJobQueue extends TaskTestBase {
 
     // Record all scheduled workflows
     wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
-    List<String> scheduledWorkflows = new ArrayList<String>(wCtx.getScheduledWorkflows());
+    List<String> scheduledWorkflows = new ArrayList<>(wCtx.getScheduledWorkflows());
     final String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
 
     // Delete recurrent workflow
     _driver.delete(queueName);
 
+    // Try to delete again to make sure things are cleaned up
+    _driver.delete(queueName);
+
     // Wait until recurrent workflow and the last scheduled workflow are cleaned up
     boolean result = TestHelper.verify(new TestHelper.Verifier() {
       @Override public boolean verify() throws Exception {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 5bd3392..5f5c6a3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -231,12 +231,16 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     currentJobNames.remove(deletedJob2);
 
     // add job 3 back
-    JobConfig.Builder job =
-        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(Sets.newHashSet("SLAVE"));
-    LOG.info("Enqueuing job: " + deletedJob2);
-    _driver.enqueueJob(queueName, deletedJob2, job);
-    currentJobNames.add(deletedJob2);
+    JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+
+    // the name here MUST be unique in order to avoid conflicts with the old job cached in
+    // RuntimeJobDag
+    String newJob = deletedJob2 + "_second";
+    LOG.info("Enqueuing job: " + newJob);
+    _driver.enqueueJob(queueName, newJob, job);
+    currentJobNames.add(newJob);
 
     // Ensure the jobs left are successful completed in the correct order
     long preJobFinish = 0;
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
index 25c486a..a73f02d 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
@@ -60,8 +60,8 @@ public class TestGetLastScheduledTaskExecInfo extends TaskTestBase {
     List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10);
     // API call needs to return the most recent timestamp (value at last index)
     lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_3");
-    execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3");
     Thread.sleep(200); // Let the tasks run
+    execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3");
 
     Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1), lastScheduledTaskTs);
     Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_3_job_0");


Mime
View raw message