From commits-return-4394-apmail-helix-commits-archive=helix.apache.org@helix.apache.org Wed Jul 9 01:48:18 2014 Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D638711A5F for ; Wed, 9 Jul 2014 01:48:18 +0000 (UTC) Received: (qmail 81305 invoked by uid 500); 9 Jul 2014 01:48:18 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 81268 invoked by uid 500); 9 Jul 2014 01:48:18 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 81259 invoked by uid 99); 9 Jul 2014 01:48:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Jul 2014 01:48:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 17EB99A8A3B; Wed, 9 Jul 2014 01:48:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: [HELIX-438] Improve task framework retry logic Date: Wed, 9 Jul 2014 01:48:18 +0000 (UTC) Repository: helix Updated Branches: refs/heads/helix-provisioning c5921f429 -> 0272e3701 [HELIX-438] Improve task framework retry logic Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0272e370 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0272e370 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0272e370 Branch: refs/heads/helix-provisioning Commit: 0272e3701492fc738eadb878ddbc45b54d0ca62f Parents: c5921f4 Author: Kanak Biscuitwala Authored: Fri May 23 14:22:48 2014 -0700 Committer: Kanak Biscuitwala Committed: Tue Jul 8 18:47:56 2014 -0700 ---------------------------------------------------------------------- .../helix/task/FixedTargetTaskRebalancer.java | 13 +-- .../helix/task/GenericTaskRebalancer.java | 91 ++++++++++++++++++-- .../java/org/apache/helix/task/JobConfig.java | 28 +++++- .../org/apache/helix/task/TaskRebalancer.java | 8 +- .../java/org/apache/helix/task/TaskRunner.java | 14 ++- .../java/org/apache/helix/task/Workflow.java | 47 +++++----- .../org/apache/helix/task/beans/JobBean.java | 3 +- .../task/TestIndependentTaskRebalancer.java | 89 ++++++++++++++----- 8 files changed, 231 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/0272e370/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java index d1329ee..3d6f2eb 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java @@ -19,6 +19,7 @@ package org.apache.helix.task; * under the License. */ +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -56,7 +57,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { @Override public Map> getTaskAssignment( ResourceCurrentState currStateOutput, ResourceAssignment prevAssignment, - Iterable instanceList, JobConfig jobCfg, JobContext jobContext, + Collection instances, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set partitionSet, Cluster cache) { IdealState tgtIs = getTgtIdealState(jobCfg, cache); @@ -64,7 +65,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { return Collections.emptyMap(); } Set tgtStates = jobCfg.getTargetPartitionStates(); - return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet, + return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet, jobContext); } @@ -120,7 +121,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { /** * Get partition assignments for the target resource, but only for the partitions of interest. * @param currStateOutput The current state of the instances in the cluster. - * @param instanceList The set of instances. + * @param instances The instances. * @param tgtIs The ideal state of the target resource. * @param tgtStates Only partitions in this set of states will be considered. If null, partitions * do not need to @@ -129,11 +130,11 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { * @return A map of instance vs set of partition ids assigned to that instance. */ private static Map> getTgtPartitionAssignment( - ResourceCurrentState currStateOutput, Iterable instanceList, IdealState tgtIs, + ResourceCurrentState currStateOutput, Collection instances, IdealState tgtIs, Set tgtStates, Set includeSet, JobContext jobCtx) { Map> result = new HashMap>(); - for (ParticipantId instance : instanceList) { + for (ParticipantId instance : instances) { result.put(instance, new TreeSet()); } @@ -145,7 +146,7 @@ public class FixedTargetTaskRebalancer extends TaskRebalancer { } int pId = partitions.get(0); if (includeSet.contains(pId)) { - for (ParticipantId instance : instanceList) { + for (ParticipantId instance : instances) { State s = currStateOutput.getCurrentState(ResourceId.from(tgtIs.getResourceName()), PartitionId.from(pName), instance); http://git-wip-us.apache.org/repos/asf/helix/blob/0272e370/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java index 8b5a258..740b1b9 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java @@ -20,6 +20,7 @@ package org.apache.helix.task; */ import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -41,6 +42,8 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.ResourceAssignment; import com.google.common.base.Function; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -50,6 +53,9 @@ import com.google.common.collect.Sets; * assignment to target partitions and states of another resource */ public class GenericTaskRebalancer extends TaskRebalancer { + /** Reassignment policy for this algorithm */ + private RetryPolicy _retryPolicy = new DefaultRetryReassigner(); + @Override public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Cluster cache) { @@ -68,7 +74,7 @@ public class GenericTaskRebalancer extends TaskRebalancer { @Override public Map> getTaskAssignment( ResourceCurrentState currStateOutput, ResourceAssignment prevAssignment, - Iterable instanceList, JobConfig jobCfg, final JobContext jobContext, + Collection instances, JobConfig jobCfg, final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set partitionSet, Cluster cache) { // Gather input to the full auto rebalancing algorithm @@ -121,7 +127,7 @@ public class GenericTaskRebalancer extends TaskRebalancer { new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE, new AutoRebalanceStrategy.DefaultPlacementScheme()); List allNodes = - Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instanceList, cache)); + Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); Collections.sort(allNodes); ZNRecord record = strategy.typedComputePartitionAssignment(allNodes, currentMapping, allNodes); Map> preferenceLists = record.getListFields(); @@ -140,6 +146,9 @@ public class GenericTaskRebalancer extends TaskRebalancer { taskAssignment.get(participantId).add(Integer.valueOf(partitionName)); } } + + // Finally, adjust the assignment if tasks have been failing + taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, taskAssignment); return taskAssignment; } @@ -147,14 +156,14 @@ public class GenericTaskRebalancer extends TaskRebalancer { * Filter a list of instances based on targeted resource policies * @param jobCfg the job configuration * @param currStateOutput the current state of all instances in the cluster - * @param instanceList valid instances + * @param instances valid instances * @param cache current snapshot of the cluster * @return a set of instances that can be assigned to */ private Set getEligibleInstances(JobConfig jobCfg, - ResourceCurrentState currStateOutput, Iterable instanceList, Cluster cache) { + ResourceCurrentState currStateOutput, Iterable instances, Cluster cache) { // No target resource means any instance is available - Set allInstances = Sets.newHashSet(instanceList); + Set allInstances = Sets.newHashSet(instances); String targetResource = jobCfg.getTargetResource(); if (targetResource == null) { return allInstances; @@ -193,4 +202,76 @@ public class GenericTaskRebalancer extends TaskRebalancer { allInstances.retainAll(eligibleInstances); return allInstances; } + + public interface RetryPolicy { + /** + * Adjust the assignment to allow for reassignment if a task keeps failing where it's currently + * assigned + * @param jobCfg the job configuration + * @param jobCtx the job context + * @param instances instances that can serve tasks + * @param origAssignment the unmodified assignment + * @return the adjusted assignment + */ + Map> reassign(JobConfig jobCfg, JobContext jobCtx, + Collection instances, Map> origAssignment); + } + + private static class DefaultRetryReassigner implements RetryPolicy { + @Override + public Map> reassign(JobConfig jobCfg, JobContext jobCtx, + Collection instances, Map> origAssignment) { + // Compute an increasing integer ID for each instance + BiMap instanceMap = HashBiMap.create(instances.size()); + int instanceIndex = 0; + for (ParticipantId instance : instances) { + instanceMap.put(instance, instanceIndex++); + } + + // Move partitions + Map> newAssignment = Maps.newHashMap(); + for (Map.Entry> e : origAssignment.entrySet()) { + ParticipantId instance = e.getKey(); + SortedSet partitions = e.getValue(); + Integer instanceId = instanceMap.get(instance); + if (instanceId != null) { + for (int p : partitions) { + // Determine for each partition if there have been failures with the current assignment + // strategy, and if so, force a shift in assignment for that partition only + int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, p); + int newInstanceId = (instanceId + shiftValue) % instances.size(); + ParticipantId newInstance = instanceMap.inverse().get(newInstanceId); + if (newInstance == null) { + newInstance = instance; + } + if (!newAssignment.containsKey(newInstance)) { + newAssignment.put(newInstance, new TreeSet()); + } + newAssignment.get(newInstance).add(p); + } + } else { + // In case something goes wrong, just keep the previous assignment + newAssignment.put(instance, partitions); + } + } + return newAssignment; + } + + /** + * In case tasks fail, we may not want to schedule them in the same place. This method allows us + * to compute a shifting value so that we can systematically choose other instances to try + * @param jobCfg the job configuration + * @param jobCtx the job context + * @param instances instances that can be chosen + * @param p the partition to look up + * @return the shifting value + */ + private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx, + Collection instances, int p) { + int numAttempts = jobCtx.getPartitionNumAttempts(p); + int maxNumAttempts = jobCfg.getMaxAttemptsPerTask(); + int numInstances = Math.min(instances.size(), jobCfg.getMaxForcedReassignmentsPerTask() + 1); + return numAttempts / (maxNumAttempts / numInstances); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/0272e370/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index b166da1..3f9ab41 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -61,6 +61,8 @@ public class JobConfig { public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition"; /** The maximum number of times the task rebalancer may attempt to execute a task. */ public static final String MAX_ATTEMPTS_PER_TASK = "MaxAttemptsPerTask"; + /** The maximum number of times Helix will intentionally move a failing task */ + public static final String MAX_FORCED_REASSIGNMENTS_PER_TASK = "MaxForcedReassignmentsPerTask"; /** The number of concurrent tasks that are allowed to run on an instance. */ public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance"; /** The number of tasks within the job that are allowed to fail. */ @@ -75,6 +77,7 @@ public class JobConfig { public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10; public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1; public static final int DEFAULT_FAILURE_THRESHOLD = 0; + public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0; private final String _workflow; private final String _targetResource; @@ -85,13 +88,14 @@ public class JobConfig { private final long _timeoutPerTask; private final int _numConcurrentTasksPerInstance; private final int _maxAttemptsPerTask; + private final int _maxForcedReassignmentsPerTask; private final int _failureThreshold; private final Map _taskConfigMap; private JobConfig(String workflow, String targetResource, List targetPartitions, Set targetPartitionStates, String command, Map jobConfigMap, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, - int failureThreshold, Map taskConfigMap) { + int maxForcedReassignmentsPerTask, int failureThreshold, Map taskConfigMap) { _workflow = workflow; _targetResource = targetResource; _targetPartitions = targetPartitions; @@ -101,6 +105,7 @@ public class JobConfig { _timeoutPerTask = timeoutPerTask; _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance; _maxAttemptsPerTask = maxAttemptsPerTask; + _maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask; _failureThreshold = failureThreshold; if (taskConfigMap != null) { _taskConfigMap = taskConfigMap; @@ -145,6 +150,10 @@ public class JobConfig { return _maxAttemptsPerTask; } + public int getMaxForcedReassignmentsPerTask() { + return _maxForcedReassignmentsPerTask; + } + public int getFailureThreshold() { return _failureThreshold; } @@ -180,6 +189,7 @@ public class JobConfig { } cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask); cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask); + cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask); cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold); return cfgMap; } @@ -198,6 +208,7 @@ public class JobConfig { private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK; private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK; + private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK; private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD; public JobConfig build() { @@ -205,7 +216,7 @@ public class JobConfig { return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates, _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance, - _maxAttemptsPerTask, _failureThreshold, _taskConfigMap); + _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _taskConfigMap); } /** @@ -246,6 +257,10 @@ public class JobConfig { if (cfg.containsKey(MAX_ATTEMPTS_PER_TASK)) { b.setMaxAttemptsPerTask(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_TASK))); } + if (cfg.containsKey(MAX_FORCED_REASSIGNMENTS_PER_TASK)) { + b.setMaxForcedReassignmentsPerTask(Integer.parseInt(cfg + .get(MAX_FORCED_REASSIGNMENTS_PER_TASK))); + } if (cfg.containsKey(FAILURE_THRESHOLD)) { b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD))); } @@ -297,6 +312,11 @@ public class JobConfig { return this; } + public Builder setMaxForcedReassignmentsPerTask(int v) { + _maxForcedReassignmentsPerTask = v; + return this; + } + public Builder setFailureThreshold(int v) { _failureThreshold = v; return this; @@ -340,6 +360,10 @@ public class JobConfig { throw new IllegalArgumentException(String.format("%s has invalid value %s", MAX_ATTEMPTS_PER_TASK, _maxAttemptsPerTask)); } + if (_maxForcedReassignmentsPerTask < 0) { + throw new IllegalArgumentException(String.format("%s has invalid value %s", + MAX_FORCED_REASSIGNMENTS_PER_TASK, _maxForcedReassignmentsPerTask)); + } if (_failureThreshold < 0) { throw new IllegalArgumentException(String.format("%s has invalid value %s", FAILURE_THRESHOLD, _failureThreshold)); http://git-wip-us.apache.org/repos/asf/helix/blob/0272e370/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 376eca5..043e7dd 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -20,6 +20,7 @@ package org.apache.helix.task; */ import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -75,7 +76,7 @@ public abstract class TaskRebalancer implements HelixRebalancer { * Compute an assignment of tasks to instances * @param currStateOutput the current state of the instances * @param prevAssignment the previous task partition assignment - * @param instanceList the instances + * @param instances the instances * @param jobCfg the task configuration * @param taskCtx the task context * @param workflowCfg the workflow configuration @@ -86,7 +87,7 @@ public abstract class TaskRebalancer implements HelixRebalancer { */ public abstract Map> getTaskAssignment( ResourceCurrentState currStateOutput, ResourceAssignment prevAssignment, - Iterable instanceList, JobConfig jobCfg, JobContext jobContext, + Collection instanceList, JobConfig jobCfg, JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set partitionSet, Cluster cache); @@ -192,7 +193,7 @@ public abstract class TaskRebalancer implements HelixRebalancer { private ResourceAssignment computeResourceMapping(String jobResource, WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment, - Iterable liveInstances, ResourceCurrentState currStateOutput, + Collection liveInstances, ResourceCurrentState currStateOutput, WorkflowContext workflowCtx, JobContext jobCtx, Set partitionsToDropFromIs, Cluster cache) { TargetState jobTgtState = workflowConfig.getTargetState(); @@ -381,6 +382,7 @@ public abstract class TaskRebalancer implements HelixRebalancer { // This includes all completed, failed, already assigned partitions. Set excludeSet = Sets.newTreeSet(assignedPartitions); addCompletedPartitions(excludeSet, jobCtx, allPartitions); + excludeSet.addAll(skippedPartitions); // Get instance->[partition, ...] mappings for the target resource. Map> tgtPartitionAssignments = getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, http://git-wip-us.apache.org/repos/asf/helix/blob/0272e370/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java index cd909ed..66abba6 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java @@ -21,6 +21,7 @@ package org.apache.helix.task; import org.apache.helix.HelixManager; import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.task.TaskResult.Status; import org.apache.log4j.Logger; /** @@ -64,7 +65,12 @@ public class TaskRunner implements Runnable { public void run() { try { signalStarted(); - _result = _task.run(); + try { + _result = _task.run(); + } catch (Throwable t) { + LOG.error("Problem running the task", t); + _result = new TaskResult(Status.ERROR, null); + } switch (_result.getStatus()) { case COMPLETED: @@ -96,8 +102,10 @@ public class TaskRunner implements Runnable { * Signals the task to cancel itself. */ public void timeout() { - _timeout = true; - cancel(); + if (!_done) { + _timeout = true; + cancel(); + } } /** http://git-wip-us.apache.org/repos/asf/helix/blob/0272e370/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index 8afafe4..70fb82c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -180,7 +180,9 @@ public class Workflow { Joiner.on(",").join(job.targetPartitions)); } builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK, - String.valueOf(job.maxAttemptsPerPartition)); + String.valueOf(job.maxAttemptsPerTask)); + builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, + String.valueOf(job.maxForcedReassignmentsPerTask)); builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, String.valueOf(job.numConcurrentTasksPerInstance)); builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK, @@ -243,40 +245,41 @@ public class Workflow { _expiry = -1; } - public Builder addConfig(String node, String key, String val) { - node = namespacify(node); - _dag.addNode(node); - if (!_jobConfigs.containsKey(node)) { - _jobConfigs.put(node, new TreeMap()); + public Builder addConfig(String job, String key, String val) { + job = namespacify(job); + _dag.addNode(job); + if (!_jobConfigs.containsKey(job)) { + _jobConfigs.put(job, new TreeMap()); } - _jobConfigs.get(node).put(key, val); + _jobConfigs.get(job).put(key, val); return this; } - public Builder addJobConfigMap(String node, Map jobConfigMap) { - return addConfig(node, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap)); + public Builder addJobConfigMap(String job, Map jobConfigMap) { + return addConfig(job, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap)); } - public Builder addJobConfig(String node, JobConfig jobConfig) { + public Builder addJobConfig(String job, JobConfig jobConfig) { for (Map.Entry e : jobConfig.getResourceConfigMap().entrySet()) { String key = e.getKey(); String val = e.getValue(); - addConfig(node, key, val); + addConfig(job, key, val); } - addTaskConfigs(node, jobConfig.getTaskConfigMap().values()); + jobConfig.getJobConfigMap().put(JobConfig.WORKFLOW_ID, _name); + addTaskConfigs(job, jobConfig.getTaskConfigMap().values()); return this; } - public Builder addTaskConfigs(String node, Collection taskConfigs) { - node = namespacify(node); - _dag.addNode(node); - if (!_taskConfigs.containsKey(node)) { - _taskConfigs.put(node, new ArrayList()); + public Builder addTaskConfigs(String job, Collection taskConfigs) { + job = namespacify(job); + _dag.addNode(job); + if (!_taskConfigs.containsKey(job)) { + _taskConfigs.put(job, new ArrayList()); } - if (!_jobConfigs.containsKey(node)) { - _jobConfigs.put(node, new TreeMap()); + if (!_jobConfigs.containsKey(job)) { + _jobConfigs.put(job, new TreeMap()); } - _taskConfigs.get(node).addAll(taskConfigs); + _taskConfigs.get(job).addAll(taskConfigs); return this; } @@ -293,8 +296,8 @@ public class Workflow { return this; } - public String namespacify(String task) { - return TaskUtil.getNamespacedJobName(_name, task); + public String namespacify(String job) { + return TaskUtil.getNamespacedJobName(_name, job); } public Workflow build() { http://git-wip-us.apache.org/repos/asf/helix/blob/0272e370/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index af5882c..bc5350a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -38,6 +38,7 @@ public class JobBean { public List tasks; public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK; public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; - public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK; + public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK; + public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK; public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD; } http://git-wip-us.apache.org/repos/asf/helix/blob/0272e370/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index 5dad94c..006c3fe 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -63,6 +63,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { private final MockParticipantManager[] _participants = new MockParticipantManager[n]; private ClusterControllerManager _controller; private Set _invokedClasses = Sets.newHashSet(); + private Map _runCounts = Maps.newHashMap(); private HelixManager _manager; private TaskDriver _driver; @@ -82,24 +83,25 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); } - // Set task callbacks - Map taskFactoryReg = new HashMap(); - taskFactoryReg.put("TaskOne", new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new TaskOne(context); - } - }); - taskFactoryReg.put("TaskTwo", new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new TaskTwo(context); - } - }); - // start dummy participants for (int i = 0; i < n; i++) { - String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + final String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + + // Set task callbacks + Map taskFactoryReg = new HashMap(); + taskFactoryReg.put("TaskOne", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new TaskOne(context, instanceName); + } + }); + taskFactoryReg.put("TaskTwo", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new TaskTwo(context, instanceName); + } + }); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); // Register a Task state model factory. @@ -125,6 +127,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { @BeforeMethod public void beforeMethod() { _invokedClasses.clear(); + _runCounts.clear(); } @Test @@ -208,10 +211,46 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName())); } + @Test + public void testReassignment() throws Exception { + final int NUM_INSTANCES = 2; + String jobName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); + List taskConfigs = Lists.newArrayListWithCapacity(2); + Map taskConfigMap = + Maps.newHashMap(ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + + START_PORT)); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); + taskConfigs.add(taskConfig1); + workflowBuilder.addTaskConfigs(jobName, taskConfigs); + workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); + workflowBuilder.addConfig(jobName, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance + Map jobConfigMap = Maps.newHashMap(); + jobConfigMap.put("Timeout", "1000"); + workflowBuilder.addJobConfigMap(jobName, jobConfigMap); + _driver.start(workflowBuilder.build()); + + // Ensure the job completes + TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); + TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + + // Ensure that the class was invoked + Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); + + // Ensure that this was tried on two different instances, the first of which exhausted the + // attempts number, and the other passes on the first try + Assert.assertEquals(_runCounts.size(), NUM_INSTANCES); + Assert.assertTrue(_runCounts.values().contains( + JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES)); + Assert.assertTrue(_runCounts.values().contains(1)); + } + private class TaskOne extends ReindexTask { private final boolean _shouldFail; + private final String _instanceName; - public TaskOne(TaskCallbackContext context) { + public TaskOne(TaskCallbackContext context, String instanceName) { super(context); // Check whether or not this task should succeed @@ -221,15 +260,25 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Map configMap = taskConfig.getConfigMap(); if (configMap != null && configMap.containsKey("fail") && Boolean.parseBoolean(configMap.get("fail"))) { - shouldFail = true; + // if a specific instance is specified, only fail for that one + shouldFail = + !configMap.containsKey("failInstance") + || configMap.get("failInstance").equals(instanceName); } } _shouldFail = shouldFail; + + // Initialize the count for this instance if not already done + if (!_runCounts.containsKey(instanceName)) { + _runCounts.put(instanceName, 0); + } + _instanceName = instanceName; } @Override public TaskResult run() { _invokedClasses.add(getClass().getName()); + _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1); // Fail the task if it should fail if (_shouldFail) { @@ -241,8 +290,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { } private class TaskTwo extends TaskOne { - public TaskTwo(TaskCallbackContext context) { - super(context); + public TaskTwo(TaskCallbackContext context, String instanceName) { + super(context, instanceName); } } }