helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [2/2] helix git commit: Check Workflow is JobQueue before doing parallel jobs logics
Date Thu, 06 Oct 2016 23:15:46 GMT
Check Workflow is JobQueue before doing parallel jobs logics


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/779103b5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/779103b5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/779103b5

Branch: refs/heads/helix-0.6.x
Commit: 779103b5323d6f00d903082f58298a083a556a03
Parents: 356b3b4
Author: Junkai Xue <jxue@linkedin.com>
Authored: Thu Oct 6 11:34:22 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Thu Oct 6 16:14:19 2016 -0700

----------------------------------------------------------------------
 .../helix/task/DeprecatedTaskRebalancer.java    |  3 +-
 .../java/org/apache/helix/task/JobQueue.java    |  1 +
 .../org/apache/helix/task/TaskRebalancer.java   | 30 ++++++++++----
 .../org/apache/helix/task/WorkflowConfig.java   | 41 ++++++++++++++++++--
 .../apache/helix/task/WorkflowRebalancer.java   |  2 +-
 .../helix/integration/task/TaskTestUtil.java    |  3 +-
 .../task/TestWorkflowJobDependency.java         | 39 ++++++++++++++++++-
 7 files changed, 104 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/779103b5/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index fbc4483..98b32e2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -158,7 +158,8 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer,
MappingCal
       }
     }
 
-    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+    if (notStartedCount > 0 || (workflowCfg.isJobQueue() && inCompleteCount >=
workflowCfg
+        .getParallelJobs())) {
       LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/779103b5/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
index c350fee..389b7d0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -122,6 +122,7 @@ public class JobQueue extends Workflow {
     public JobQueue build() {
       buildConfig();
       _workflowConfigBuilder.setTerminable(false);
+      _workflowConfigBuilder.setJobQueue(true);
       return new JobQueue(_name, _workflowConfigBuilder.build(), _jobConfigs, _taskConfigs);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/779103b5/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 54f7bd5..01e07b1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -152,23 +152,28 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       WorkflowContext workflowCtx) {
     int notStartedCount = 0;
     int failedCount = 0;
+    int incompleteParentCount = 0;
 
     for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
       TaskState jobState = workflowCtx.getJobState(parent);
       if (jobState == null || jobState == TaskState.NOT_STARTED) {
         ++notStartedCount;
-      }
-      if (jobState == TaskState.FAILED) {
+      } else if (jobState == TaskState.FAILED) {
         ++failedCount;
+      } else if (jobState != TaskState.COMPLETED) {
+        incompleteParentCount++;
       }
     }
 
+    // If there is any parent job not started, this job should not be scheduled
     if (notStartedCount > 0) {
       LOG.debug(String
           .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount));
       return false;
     }
 
+    // If there is parent job failed, schedule the job only when ignore dependent
+    // job failure enabled
     JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
     if (failedCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
       markJobFailed(job, null, workflowCfg, workflowCtx);
@@ -177,11 +182,22 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       return false;
     }
 
-    int inCompleteCount = getInCompleteJobCount(workflowCfg, workflowCtx);
-    if (inCompleteCount >= workflowCfg.getParallelJobs()) {
-      LOG.debug(String
-          .format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job, inCompleteCount));
-      return false;
+    if (workflowCfg.isJobQueue()) {
+      // If job comes from a JobQueue, it should apply the parallel job logics
+      int incompleteAllCount = getInCompleteJobCount(workflowCfg, workflowCtx);
+      if (incompleteAllCount >= workflowCfg.getParallelJobs()) {
+        LOG.debug(String.format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.",
job,
+            incompleteAllCount));
+        return false;
+      }
+    } else {
+      // If this job comes from a generic workflow, job will not be scheduled until
+      // all the direct parent jobs finished
+      if (incompleteParentCount > 0) {
+        LOG.debug(String
+            .format("Job %s is not ready to start, notFinishedParent(s)=%d.", job, incompleteParentCount));
+        return false;
+      }
     }
 
     return true;

http://git-wip-us.apache.org/repos/asf/helix/blob/779103b5/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index a4356d2..17259d2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -57,7 +57,8 @@ public class  WorkflowConfig {
     FailureThreshold,
     /* this is for non-terminable workflow. */
     capacity,
-    WorkflowType
+    WorkflowType,
+    IsJobQueue
   }
 
   /* Default values */
@@ -79,10 +80,11 @@ public class  WorkflowConfig {
   private final int _failureThreshold;
   private final int _capacity;
   private final String _workflowType;
+  private final boolean _isJobQueue;
 
   protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState targetState, long
expiry,
       int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig, int capacity,
-      String workflowType) {
+      String workflowType, boolean isJobQueue) {
     _jobDag = jobDag;
     _parallelJobs = parallelJobs;
     _targetState = targetState;
@@ -92,6 +94,7 @@ public class  WorkflowConfig {
     _scheduleConfig = scheduleConfig;
     _capacity = capacity;
     _workflowType = workflowType;
+    _isJobQueue = isJobQueue;
   }
 
   public JobDag getJobDag() {
@@ -137,6 +140,10 @@ public class  WorkflowConfig {
     return _scheduleConfig != null && _scheduleConfig.isRecurring();
   }
 
+  public boolean isJobQueue() {
+    return _isJobQueue;
+  }
+
   public static SimpleDateFormat getDefaultDateFormat() {
     SimpleDateFormat defaultDateFormat = new SimpleDateFormat(
         "MM-dd-yyyy HH:mm:ss");
@@ -170,6 +177,7 @@ public class  WorkflowConfig {
     cfgMap.put(WorkflowConfigProperty.Expiry.name(), String.valueOf(getExpiry()));
     cfgMap.put(WorkflowConfigProperty.TargetState.name(), getTargetState().name());
     cfgMap.put(WorkflowConfigProperty.Terminable.name(), String.valueOf(isTerminable()));
+    cfgMap.put(WorkflowConfigProperty.IsJobQueue.name(), String.valueOf(isJobQueue()));
     cfgMap.put(WorkflowConfigProperty.FailureThreshold.name(),
         String.valueOf(getFailureThreshold()));
 
@@ -250,12 +258,13 @@ public class  WorkflowConfig {
     private int _capacity = Integer.MAX_VALUE;
     private ScheduleConfig _scheduleConfig;
     private String _workflowType;
+    private boolean _isJobQueue = false;
 
     public WorkflowConfig build() {
       validate();
 
       return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _failureThreshold,
-          _isTerminable, _scheduleConfig, _capacity, _workflowType);
+          _isTerminable, _scheduleConfig, _capacity, _workflowType, _isJobQueue);
     }
 
     public Builder() {}
@@ -269,7 +278,8 @@ public class  WorkflowConfig {
       _scheduleConfig = workflowConfig.getScheduleConfig();
       _capacity = workflowConfig.getCapacity();
       _failureThreshold = workflowConfig.getFailureThreshold();
-      _workflowType =workflowConfig.getWorkflowType();
+      _workflowType = workflowConfig.getWorkflowType();
+      _isJobQueue = workflowConfig.isJobQueue();
     }
 
     protected Builder setJobDag(JobDag v) {
@@ -277,6 +287,11 @@ public class  WorkflowConfig {
       return this;
     }
 
+    /**
+     * This method only applies for JobQueue, will be ignored in generic workflows
+     * @param parallelJobs Allowed parallel job numbers
+     * @return This builder
+     */
     public Builder setParallelJobs(int parallelJobs) {
       _parallelJobs = parallelJobs;
       return this;
@@ -297,6 +312,11 @@ public class  WorkflowConfig {
       return this;
     }
 
+    /**
+     * This method only applies for JobQueue, will be ignored in generic workflows
+     * @param capacity The number of capacity
+     * @return This builder
+     */
     public Builder setCapacity(int capacity) {
       _capacity = capacity;
       return this;
@@ -322,6 +342,11 @@ public class  WorkflowConfig {
       return this;
     }
 
+    protected Builder setJobQueue(boolean isJobQueue) {
+      _isJobQueue = isJobQueue;
+      return this;
+    }
+
     public static Builder fromMap(Map<String, String> cfg) {
       Builder builder = new Builder();
       builder.setConfigMap(cfg);
@@ -377,6 +402,10 @@ public class  WorkflowConfig {
       if (cfg.containsKey(WorkflowConfigProperty.WorkflowType.name())) {
         setWorkFlowType(cfg.get(WorkflowConfigProperty.WorkflowType.name()));
       }
+
+      if (cfg.containsKey(WorkflowConfigProperty.IsJobQueue.name())) {
+        setJobQueue(Boolean.parseBoolean(cfg.get(WorkflowConfigProperty.IsJobQueue.name())));
+      }
       return this;
     }
 
@@ -412,6 +441,10 @@ public class  WorkflowConfig {
       return _taskDag;
     }
 
+    public boolean isJobQueue() {
+      return _isJobQueue;
+    }
+
     public static Builder from(WorkflowBean workflowBean) {
       WorkflowConfig.Builder b = new WorkflowConfig.Builder();
       if (workflowBean.schedule != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/779103b5/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index f72f1a8..dc7c90b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -142,7 +142,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
         continue;
       }
 
-      if (scheduledJobs >= workflowCfg.getParallelJobs()) {
+      if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs())
{
         LOG.debug(String.format("Workflow %s already have enough job in progress, "
                 + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
         break;

http://git-wip-us.apache.org/repos/asf/helix/blob/779103b5/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 2f8fa60..6122463 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -140,7 +140,8 @@ public class TaskTestUtil {
       Thread.sleep(100);
     }
 
-    return maxRunningCount > 1 && maxRunningCount <= workflowConfig.getParallelJobs();
+    return maxRunningCount > 1 && (workflowConfig.isJobQueue() ? maxRunningCount
<= workflowConfig
+        .getParallelJobs() : true);
   }
 
   public static Date getDateFromStartTime(String startTime)

http://git-wip-us.apache.org/repos/asf/helix/blob/779103b5/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
index 6cae0e5..0d9ef67 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
@@ -22,8 +22,11 @@ package org.apache.helix.integration.task;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
@@ -43,7 +46,7 @@ public class TestWorkflowJobDependency extends TaskTestBase {
     super.beforeClass();
   }
 
-  @Test (enabled = false)
+  @Test
   public void testWorkflowWithOutDependencies() throws InterruptedException {
     String workflowName = TestHelper.getTestMethodName();
 
@@ -85,4 +88,38 @@ public class TestWorkflowJobDependency extends TaskTestBase {
     // All jobs have a valid overlap time range.
     Assert.assertTrue(startTime <= finishTime);
   }
+
+  @Test
+  public void testWorkflowWithDependencies() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    final int PARALLEL_NUM = 2;
+    // Workflow setup
+    WorkflowConfig.Builder workflowcfgBuilder =
+        new WorkflowConfig.Builder().setParallelJobs(PARALLEL_NUM);
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    builder.setWorkflowConfig(workflowcfgBuilder.build());
+
+    builder.addParentChildDependency("job" + _testDbs.get(0), "job" + _testDbs.get(1));
+    for (int i = 0; i < 2; i++) {
+      JobConfig.Builder jobConfig = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+          .setTargetResource(_testDbs.get(i)).setTargetPartitionStates(Sets.newHashSet("SLAVE","MASTER"))
+          .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+      String jobName = "job" + _testDbs.get(i);
+      builder.addJob(jobName, jobConfig);
+    }
+
+    // Start workflow
+    Workflow workflow = builder.build();
+    _driver.start(workflow);
+
+    // Wait until the workflow completes
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+
+    JobContext context1 = _driver
+        .getJobContext(TaskUtil.getNamespacedJobName(workflowName, "job" + _testDbs.get(0)));
+    JobContext context2 = _driver
+        .getJobContext(TaskUtil.getNamespacedJobName(workflowName, "job" + _testDbs.get(1)));
+    Assert.assertTrue(context2.getStartTime() - context1.getFinishTime() >= 0L);
+  }
 }


Mime
View raw message