helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch master updated: Fix adding same job multiple times to RuntimeJobDag so parallelJobs config will be honored (#1006)
Date Tue, 26 May 2020 17:13:24 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0799900  Fix adding same job multiple times to RuntimeJobDag so parallelJobs config
will be honored (#1006)
0799900 is described below

commit 07999003c7832414822cabe2cb418cde42f2bd0f
Author: Ali Reza Zamani Zadeh Najari <anajari@linkedin.com>
AuthorDate: Tue May 26 10:12:54 2020 -0700

    Fix adding same job multiple times to RuntimeJobDag so parallelJobs config will be honored
(#1006)
    
    In this commit, a fix has been implemented to avoid
    _readyJobList in RuntimeJobDag to contain multiple
    entries of the same job.
    
    The investigation for this fix was motivated by the observation that JobQueues' parallelJobs
config wasn't being honored - it was only processing jobs sequentially one by one. This commit
fixes this.
---
 .../java/org/apache/helix/task/RuntimeJobDag.java  | 16 +++++-
 .../helix/integration/task/TestEnqueueJobs.java    | 65 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
index b21723b..b5703f6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
@@ -145,9 +145,21 @@ public class RuntimeJobDag extends JobDag {
           String.format("Job: %s has either finished already, never been scheduled, or been
removed from DAG", job));
     }
     // Add finished job's successors to ready-list
+
     if (_isJobQueue) {
-      if (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
-        _readyJobList.offer(_parentsToChildren.get(_lastJob).iterator().next());
+      // If it is a jobQueue, there should be a check to make sure that the a job has not
been added
+      // to the _readyJobList multiple times. This check is necessary because once the controller
+      // switch happens, the _readyJobList and _inflightJobList will be created from scratch.
In this
+      // case, since there might be many jobs that have been finished before, we do not want
to have a
+      // job several times to the _readyJobList. If _readyJobList has multiple instances
of the same
+      // job, it can compromise the functionality of the parallel jobs.
+      while (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
+        String nextJob = _parentsToChildren.get(_lastJob).iterator().next();
+        if (!_readyJobList.contains(nextJob)) {
+          _readyJobList.offer(nextJob);
+          break;
+        }
+        _lastJob = nextJob;
       }
     } else if (_successorMap.containsKey(job)) {
         for (String successor : _successorMap.get(job)) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
index 28ee51d..4d88a5a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
@@ -1,12 +1,16 @@
 package org.apache.helix.integration.task;
 
+import java.util.Collections;
 import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -96,4 +100,63 @@ public class TestEnqueueJobs extends TaskTestBase {
 
     _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testQueueParallelJobs() throws InterruptedException {
+    final int parallelJobs = 3;
+    final int numberOfJobsAddedBeforeControllerSwitch = 4;
+    final int totalNumberOfJobs = 7;
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder()
+        .setWorkflowId(queueName).setParallelJobs(parallelJobs).setAllowOverlapJobAssignment(true);
+    _driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "10000"));
+
+    // Add 4 jobs to the queue
+    for (int i = 0; i < numberOfJobsAddedBeforeControllerSwitch; i++) {
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+    }
+
+    // Wait until all of the enqueued jobs (Job0 to Job3) are finished
+    for (int i = 0; i < numberOfJobsAddedBeforeControllerSwitch; i++) {
+      _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" +
i),
+          TaskState.COMPLETED);
+    }
+
+    // Stop the Controller
+    _controller.syncStop();
+
+    // Add 3 more jobs to the queue which should run in parallel after the Controller is
started
+    for (int i = numberOfJobsAddedBeforeControllerSwitch; i < totalNumberOfJobs; i++)
{
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+    }
+
+    // Start the Controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Wait until all of the newly added jobs (Job4 to Job6) are finished
+    for (int i = numberOfJobsAddedBeforeControllerSwitch; i < totalNumberOfJobs; i++)
{
+      _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" +
i),
+          TaskState.COMPLETED);
+    }
+
+    // Make sure the jobs have been running in parallel by checking the jobs start time and
finish
+    // time
+    long maxStartTime = Long.MIN_VALUE;
+    long minFinishTime = Long.MAX_VALUE;
+
+    for (int i = numberOfJobsAddedBeforeControllerSwitch; i < totalNumberOfJobs; i++)
{
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(queueName, "JOB" + i));
+      maxStartTime = Long.max(maxStartTime, jobContext.getStartTime());
+      minFinishTime = Long.min(minFinishTime, jobContext.getFinishTime());
+    }
+    Assert.assertTrue(minFinishTime > maxStartTime);
+  }
+}


Mime
View raw message