helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] helix git commit: Fix Job level timeout not timeout jobs and refactor logics
Date Wed, 21 Mar 2018 23:29:02 GMT
Repository: helix
Updated Branches:
  refs/heads/master b3ee27d4a -> e1176fe40


Fix Job level timeout not timeout jobs and refactor logics

There is an issue that job does not get timeouted. The rebalancerSchduler is not got scheduled
when job started. Fixed this issue.
Refactor the logics to support workflow level timeout.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d742d098
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d742d098
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d742d098

Branch: refs/heads/master
Commit: d742d09846c8b39cd46c9d8560ca856355530a88
Parents: b3ee27d
Author: Junkai Xue <jxue@linkedin.com>
Authored: Thu Feb 8 15:22:18 2018 -0800
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Tue Mar 20 11:55:27 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   |  9 +++--
 .../org/apache/helix/task/JobRebalancer.java    | 34 +++---------------
 .../org/apache/helix/task/TaskConstants.java    |  2 ++
 .../org/apache/helix/task/TaskRebalancer.java   | 36 ++++++++++++++++++++
 .../org/apache/helix/task/beans/JobBean.java    |  3 +-
 .../helix/integration/task/TestJobTimeout.java  |  3 +-
 .../task/TestJobTimeoutTaskNotStarted.java      |  3 +-
 7 files changed, 51 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index fe2b244..f665dec 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -155,7 +155,6 @@ public class JobConfig extends ResourceConfig {
   }
 
   //Default property values
-  public static final long DEFAULT_TIMEOUT_NEVER = -1; // never timeout
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
   public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -226,7 +225,7 @@ public class JobConfig extends ResourceConfig {
     if (executionStart > 0) {
       getRecord().setLongField(JobConfigProperty.StartTime.name(), executionStart);
     }
-    if (timeout > DEFAULT_TIMEOUT_NEVER) {
+    if (timeout > TaskConstants.DEFAULT_NEVER_TIMEOUT) {
       getRecord().setLongField(JobConfigProperty.Timeout.name(), timeout);
     }
     getRecord().setLongField(JobConfigProperty.TimeoutPerPartition.name(), timeoutPerTask);
@@ -298,7 +297,7 @@ public class JobConfig extends ResourceConfig {
   }
 
   public long getTimeout() {
-    return getRecord().getLongField(JobConfigProperty.Timeout.name(), DEFAULT_TIMEOUT_NEVER);
+    return getRecord().getLongField(JobConfigProperty.Timeout.name(), TaskConstants.DEFAULT_NEVER_TIMEOUT);
   }
 
   public long getTimeoutPerTask() {
@@ -401,7 +400,7 @@ public class JobConfig extends ResourceConfig {
     private String _command;
     private Map<String, String> _commandConfig;
     private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap();
-    private long _timeout = DEFAULT_TIMEOUT_NEVER;
+    private long _timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT;
     private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
     private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
     private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
@@ -681,7 +680,7 @@ public class JobConfig extends ResourceConfig {
           }
         }
       }
-      if (_timeout < DEFAULT_TIMEOUT_NEVER) {
+      if (_timeout < TaskConstants.DEFAULT_NEVER_TIMEOUT) {
         throw new IllegalArgumentException(String
             .format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout));
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 51da264..a855f7d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -132,7 +132,9 @@ public class JobRebalancer extends TaskRebalancer {
       workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
     }
 
-    scheduleRebalanceForJobTimeout(jobCfg, jobCtx);
+    if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) {
+      scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
+    }
 
     // Grab the old assignment, or an empty one if it doesn't exist
     ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName);
@@ -212,7 +214,8 @@ public class JobRebalancer extends TaskRebalancer {
     TargetState jobTgtState = workflowConfig.getTargetState();
     TaskState jobState = workflowCtx.getJobState(jobResource);
 
-    if (jobState == TaskState.IN_PROGRESS && isJobTimeout(jobCtx, jobCfg)) {
+    if (jobState == TaskState.IN_PROGRESS && isTimeout(jobCtx.getStartTime(),
+        jobCfg.getTimeout())) {
       jobState = TaskState.TIMING_OUT;
       workflowCtx.setJobState(jobResource, TaskState.TIMING_OUT);
     } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING)
{
@@ -626,12 +629,6 @@ public class JobRebalancer extends TaskRebalancer {
     TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
   }
 
-  private boolean isJobTimeout(JobContext jobContext, JobConfig jobConfig) {
-    long jobTimeoutTime = computeJobTimeoutTime(jobContext, jobConfig);
-    return jobTimeoutTime != jobConfig.DEFAULT_TIMEOUT_NEVER && jobTimeoutTime <=
System
-        .currentTimeMillis();
-  }
-
   private boolean isJobFinished(JobContext jobContext, String jobResource,
       CurrentStateOutput currentStateOutput) {
     for (int pId : jobContext.getPartitionSet()) {
@@ -648,15 +645,6 @@ public class JobRebalancer extends TaskRebalancer {
     return true;
   }
 
-  // Return jobConfig.DEFAULT_TIMEOUT_NEVER if job should never timeout.
-  // job start time can't be -1 before calling this method.
-  private long computeJobTimeoutTime(JobContext jobContext, JobConfig jobConfig) {
-    return (jobConfig.getTimeout() == JobConfig.DEFAULT_TIMEOUT_NEVER
-        || jobConfig.getTimeout() > Long.MAX_VALUE - jobContext.getStartTime()) // check
long overflow
-        ? jobConfig.DEFAULT_TIMEOUT_NEVER
-        : jobContext.getStartTime() + jobConfig.getTimeout();
-  }
-
   private void markJobComplete(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
       WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) {
     long currentTime = System.currentTimeMillis();
@@ -694,18 +682,6 @@ public class JobRebalancer extends TaskRebalancer {
     }
   }
 
-  // Set job timeout rebalance, if the time is earlier than the current scheduled rebalance
time
-  // This needs to run for every rebalance because the scheduled rebalance could be removed
in other places.
-  private void scheduleRebalanceForJobTimeout(JobConfig jobCfg, JobContext jobCtx) {
-    long jobTimeoutTime = computeJobTimeoutTime(jobCtx, jobCfg);
-    if (jobTimeoutTime != JobConfig.DEFAULT_TIMEOUT_NEVER && jobTimeoutTime >
System.currentTimeMillis()) {
-      long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(jobCfg.getJobId());
-      if (nextRebalanceTime == JobConfig.DEFAULT_TIMEOUT_NEVER || jobTimeoutTime < nextRebalanceTime)
{
-        _rebalanceScheduler.scheduleRebalance(_manager, jobCfg.getJobId(), jobTimeoutTime);
-      }
-    }
-  }
-
   /**
    * Get the last task assignment for a given job
    *

http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 4752602..eee57d3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -44,4 +44,6 @@ public class TaskConstants {
    */
   public static final String CONTEXT_NODE = "Context";
 
+  public static final long DEFAULT_NEVER_TIMEOUT = -1; // never timeout
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 19a4049..a5e5271 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -270,6 +270,40 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
   }
 
+  /**
+   * Basic function to check task framework resources, workflow and job, are timeout
+   * @param startTime       Resources start time
+   * @param timeoutPeriod   Resources timeout period. Will be -1 if it is not set.
+   * @return
+   */
+  protected boolean isTimeout(long startTime, long timeoutPeriod) {
+    long nextTimeout = getTimeoutTime(startTime, timeoutPeriod);
+    return nextTimeout != TaskConstants.DEFAULT_NEVER_TIMEOUT && nextTimeout <=
System
+        .currentTimeMillis();
+  }
+
+  /**
+   * Schedule the rebalancer timer for task framework elements
+   * @param resourceId       The resource id
+   * @param startTime        The resource start time
+   * @param timeoutPeriod    The resource timeout period. Will be -1 if it is not set.
+   */
+  protected void scheduleRebalanceForTimeout(String resourceId, long startTime,
+      long timeoutPeriod) {
+    long nextTimeout = getTimeoutTime(startTime, timeoutPeriod);
+    long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId);
+    if (nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT
+        || nextTimeout < nextRebalanceTime) {
+      _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout);
+    }
+  }
+
+  private long getTimeoutTime(long startTime, long timeoutPeriod) {
+    return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
+        || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
+        ? TaskConstants.DEFAULT_NEVER_TIMEOUT : startTime + timeoutPeriod;
+  }
+
   @Override
   public IdealState computeNewIdealState(String resourceName,
       IdealState currentIdealState, CurrentStateOutput currentStateOutput,
@@ -279,6 +313,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     return currentIdealState;
   }
 
+
+
   /**
    * Set the ClusterStatusMonitor for metrics update
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 913b989..5674d92 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConstants;
 
 /**
  * Bean class used for parsing job definitions from YAML.
@@ -38,7 +39,7 @@ public class JobBean {
   public String command;
   public Map<String, String> jobCommandConfigMap;
   public List<TaskBean> tasks;
-  public long timeout = JobConfig.DEFAULT_TIMEOUT_NEVER;
+  public long timeout = TaskConstants.DEFAULT_NEVER_TIMEOUT;
   public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
   public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;

http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
index e6f2116..56fc04a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
@@ -45,12 +45,11 @@ public final class TestJobTimeout extends TaskSynchronizedTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    _participants =  new MockParticipantManager[_numNodes];
     _numNodes = 2;
     _numParitions = 2;
     _numReplicas = 1; // only Master, no Slave
     _numDbs = 1;
-
+    _participants =  new MockParticipantManager[_numNodes];
     String namespace = "/" + CLUSTER_NAME;
     if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursively(namespace);

http://git-wip-us.apache.org/repos/asf/helix/blob/d742d098/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java
index 9521e4c..6129946 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java
@@ -56,12 +56,11 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase
{
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    _participants =  new MockParticipantManager[_numNodes];
     _numDbs = 1;
     _numNodes = 1;
     _numParitions = 50;
     _numReplicas = 1;
-
+    _participants =  new MockParticipantManager[_numNodes];
     String namespace = "/" + CLUSTER_NAME;
     if (_gZkClient.exists(namespace)) {
       _gZkClient.deleteRecursively(namespace);


Mime
View raw message