helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject helix git commit: [HELIX-562] TaskRebalancer doesn't honor MaxAttemptsPerTask when FailureThreshold is larger than 0, rb=29941
Date Thu, 15 Jan 2015 22:51:27 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x e2e3fec2d -> d5687fa41


[HELIX-562] TaskRebalancer doesn't honor MaxAttemptsPerTask when FailureThreshold is larger
than 0, rb=29941


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d5687fa4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d5687fa4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d5687fa4

Branch: refs/heads/helix-0.6.x
Commit: d5687fa41d091420f78023d950a9dc33f5e769ab
Parents: e2e3fec
Author: zzhang <zzhang@apache.org>
Authored: Thu Jan 15 14:51:01 2015 -0800
Committer: zzhang <zzhang@apache.org>
Committed: Thu Jan 15 14:51:01 2015 -0800

----------------------------------------------------------------------
 .../controller/rebalancer/AutoRebalancer.java   |  16 +-
 .../org/apache/helix/task/TaskRebalancer.java   |  22 ++-
 .../task/TestTaskRebalancerRetryLimit.java      | 170 +++++++++++++++++++
 3 files changed, 196 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d5687fa4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index d9b70d4..a8d83a2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -124,13 +124,6 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator
{
 
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("currentMapping: " + currentMapping);
-      LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveNodes);
-      LOG.info("allNodes: " + allNodes);
-      LOG.info("maxPartition: " + maxPartition);
-    }
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     placementScheme.init(_manager);
     _algorithm =
@@ -139,8 +132,13 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator
{
     ZNRecord newMapping =
         _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("newMapping: " + newMapping);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("currentMapping: " + currentMapping);
+      LOG.debug("stateCountMap: " + stateCountMap);
+      LOG.debug("liveNodes: " + liveNodes);
+      LOG.debug("allNodes: " + allNodes);
+      LOG.debug("maxPartition: " + maxPartition);
+      LOG.debug("newMapping: " + newMapping);
     }
 
     IdealState newIdealState = new IdealState(resourceName);

http://git-wip-us.apache.org/repos/asf/helix/blob/d5687fa4/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index a073b93..1c7a7a3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -405,7 +405,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
     scheduleForNextTask(jobResource, jobCtx, currentTime);
 
-    if (isJobComplete(jobCtx, allPartitions, skippedPartitions)) {
+    if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
       workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
       jobCtx.setFinishTime(currentTime);
       if (isWorkflowComplete(workflowCtx, workflowConfig)) {
@@ -421,6 +421,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       // This includes all completed, failed, delayed, and already assigned partitions.
       Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
       addCompletedPartitions(excludeSet, jobCtx, allPartitions);
+      addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
       excludeSet.addAll(skippedPartitions);
       excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
       // Get instance->[partition, ...] mappings for the target resource.
@@ -607,10 +608,11 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
    *         context, false otherwise.
    */
   private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
-      Set<Integer> skippedPartitions) {
+      Set<Integer> skippedPartitions, JobConfig cfg) {
     for (Integer pId : allPartitions) {
       TaskPartitionState state = ctx.getPartitionState(pId);
-      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED)
{
+      if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
+          && !isTaskGivenup(ctx, cfg, pId)) {
         return false;
       }
     }
@@ -794,6 +796,20 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     }
   }
 
+  private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
+    return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
+  }
+
+  // add all partitions that have been tried maxNumberAttempts
+  private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer>
pIds,
+      JobConfig cfg) {
+    for (Integer pId : pIds) {
+      if (isTaskGivenup(ctx, cfg, pId)) {
+        set.add(pId);
+      }
+    }
+  }
+
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
       Set<Integer> excluded, int n) {
     List<Integer> result = new ArrayList<Integer>();

http://git-wip-us.apache.org/repos/asf/helix/blob/d5687fa4/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
new file mode 100644
index 0000000..b678d7e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -0,0 +1,170 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test task will be retried up to MaxAttemptsPerTask {@see HELIX-562}
+ */
+public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
+  private final String _clusterName = TestHelper.getTestClassName();
+  private static final int _n = 5;
+  private static final int _p = 20;
+  private static final int _r = 3;
+  private final MockParticipantManager[] _participants = new MockParticipantManager[_n];
+  private ClusterControllerManager _controller;
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    ClusterSetup setup = new ClusterSetup(_gZkClient);
+    setup.addCluster(_clusterName, true);
+    for (int i = 0; i < _n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      setup.addInstanceToCluster(_clusterName, instanceName);
+    }
+
+    // Set up target db
+    setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave");
+    setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("ErrorTask", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new ErrorTask();
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < _n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i],
+          taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = "controller";
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager =
+        HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                _clusterName));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (int i = 0; i < _n; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].syncStop();
+      }
+    }
+    _manager.disconnect();
+  }
+
+  @Test
+  public void test() throws Exception {
+    String jobResource = TestHelper.getTestMethodName();
+    Workflow flow =
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
+            String.valueOf(2)).build();
+    Map<String, Map<String, String>> jobConfigs = flow.getJobConfigs();
+    for (Map<String, String> jobConfig : jobConfigs.values()) {
+      jobConfig.put(JobConfig.FAILURE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+      jobConfig.put(JobConfig.COMMAND, "ErrorTask");
+    }
+
+    _driver.start(flow);
+
+    // Wait until the job completes.
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+
+    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
+    for (int i = 0; i < _p; i++) {
+      TaskPartitionState state = ctx.getPartitionState(i);
+      if (state != null) {
+        Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
+        Assert.assertEquals(ctx.getPartitionNumAttempts(i), 2);
+      }
+    }
+
+  }
+
+  private static class ErrorTask implements Task {
+    public ErrorTask() {
+    }
+
+    @Override
+    public TaskResult run() {
+      throw new RuntimeException("IGNORABLE exception: test throw exception from task");
+    }
+
+    @Override
+    public void cancel() {
+    }
+  }
+}


Mime
View raw message