helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [10/50] [abbrv] git commit: Tests with independent task rebalancer now pass
Date Thu, 10 Jul 2014 17:04:53 GMT
Tests with independent task rebalancer now pass


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/18628345
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/18628345
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/18628345

Branch: refs/heads/master
Commit: 186283457b92894eb03349fa264a5c4dcb7dc7d9
Parents: 2709b07
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Fri Feb 21 12:33:49 2014 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Feb 21 12:33:49 2014 -0800

----------------------------------------------------------------------
 .../helix/task/AbstractTaskRebalancer.java      | 11 ++-
 .../helix/task/IndependentTaskRebalancer.java   |  5 +-
 .../java/org/apache/helix/task/TaskConfig.java  | 21 ++++--
 .../java/org/apache/helix/task/TaskDriver.java  | 23 ++++--
 .../integration/task/TestTaskRebalancer.java    | 77 ++++++++++++++++++--
 .../integration/task/WorkflowGenerator.java     | 15 ++--
 6 files changed, 120 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/18628345/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
index fa4c1e5..9a9538c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
@@ -254,10 +254,13 @@ public abstract class AbstractTaskRebalancer implements HelixRebalancer
{
           continue;
         }
 
-        TaskPartitionState currState =
-            TaskPartitionState.valueOf(currStateOutput.getCurrentState(
-                ResourceId.from(taskResource), PartitionId.from(pName),
-                ParticipantId.from(instance)).toString());
+        // Current state is either present or dropped
+        State currentState =
+            currStateOutput.getCurrentState(ResourceId.from(taskResource), PartitionId.from(pName),
+                ParticipantId.from(instance));
+        String currentStateStr =
+            currentState != null ? currentState.toString() : TaskPartitionState.DROPPED.toString();
+        TaskPartitionState currState = TaskPartitionState.valueOf(currentStateStr);
 
         // Process any requested state transitions.
         State reqS =

http://git-wip-us.apache.org/repos/asf/helix/blob/18628345/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
index 71ac912..80ec23c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
@@ -70,15 +70,15 @@ public class IndependentTaskRebalancer extends AbstractTaskRebalancer
{
     states.put(State.from("ONLINE"), 1);
     List<Integer> partitionNums = Lists.newArrayList(partitionSet);
     Collections.sort(partitionNums);
+    final ResourceId resourceId = prevAssignment.getResourceId();
     List<PartitionId> partitions =
         new ArrayList<PartitionId>(Lists.transform(partitionNums,
             new Function<Integer, PartitionId>() {
               @Override
               public PartitionId apply(Integer partitionNum) {
-                return PartitionId.from(partitionNum.toString());
+                return PartitionId.from(resourceId, partitionNum.toString());
               }
             }));
-    ResourceId resourceId = prevAssignment.getResourceId();
     Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
     for (PartitionId partitionId : currStateOutput.getCurrentStateMappedPartitions(resourceId))
{
       currentMapping.put(partitionId, currStateOutput.getCurrentStateMap(resourceId, partitionId));
@@ -97,6 +97,7 @@ public class IndependentTaskRebalancer extends AbstractTaskRebalancer {
     Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
     for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
       String partitionName = e.getKey();
+      partitionName = String.valueOf(pId(partitionName));
       List<String> preferenceList = e.getValue();
       for (String participantName : preferenceList) {
         if (!taskAssignment.containsKey(participantName)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/18628345/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 8d1c4bb..2834e85 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -19,9 +19,6 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -32,6 +29,10 @@ import java.util.Set;
 
 import org.apache.helix.task.Workflow.WorkflowEnum;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 /**
  * Provides a typed interface to task configurations.
  */
@@ -134,7 +135,9 @@ public class TaskConfig {
     cfgMap.put(TaskConfig.COMMAND, _command);
     cfgMap.put(TaskConfig.COMMAND_CONFIG, _commandConfig);
     cfgMap.put(TaskConfig.TARGET_RESOURCE, _targetResource);
-    cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+    if (_targetPartitionStates != null) {
+      cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
+    }
     if (_targetPartitions != null) {
       cfgMap.put(TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
     }
@@ -252,11 +255,13 @@ public class TaskConfig {
     }
 
     private void validate() {
-      if (_targetResource == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", TARGET_RESOURCE));
+      if (_targetResource == null && (_targetPartitions == null || _targetPartitions.isEmpty()))
{
+        throw new IllegalArgumentException(String.format(
+            "%s cannot be null without specified partitions", TARGET_RESOURCE));
       }
-      if (_targetPartitionStates != null && _targetPartitionStates.isEmpty()) {
-        throw new IllegalArgumentException(String.format("%s cannot be an empty set",
+      if (_targetResource != null && _targetPartitionStates != null
+          && _targetPartitionStates.isEmpty()) {
+        throw new IllegalArgumentException(String.format("%s cannot be empty",
             TARGET_PARTITION_STATES));
       }
       if (_command == null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/18628345/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 5189fb7..17e7542 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -40,6 +41,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.log4j.Logger;
@@ -157,10 +159,19 @@ public class TaskDriver {
 
   /** Posts new task to cluster */
   private void scheduleTask(String taskResource, TaskConfig taskConfig) throws Exception
{
-    // Set up task resource based on partitions from target resource
-    int numPartitions =
-        _admin.getResourceIdealState(_clusterName, taskConfig.getTargetResource())
-            .getPartitionSet().size();
+    // Set up task resource based on partitions provided, or from target resource
+    int numPartitions;
+    List<Integer> partitions = taskConfig.getTargetPartitions();
+    String targetResource = taskConfig.getTargetResource();
+    if (partitions != null && !partitions.isEmpty()) {
+      numPartitions = partitions.size();
+    } else if (targetResource != null) {
+      numPartitions =
+          _admin.getResourceIdealState(_clusterName, taskConfig.getTargetResource())
+              .getPartitionSet().size();
+    } else {
+      numPartitions = 0;
+    }
     _admin.addResource(_clusterName, taskResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
     _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, taskResource),
         taskConfig.getResourceConfigMap());
@@ -175,7 +186,9 @@ public class TaskDriver {
       builder.add(taskResource + "_" + i);
     }
     IdealState is = builder.build();
-    is.setRebalancerClassName(TaskRebalancer.class.getName());
+    Class<? extends HelixRebalancer> rebalancerClass =
+        (targetResource != null) ? TaskRebalancer.class : IndependentTaskRebalancer.class;
+    is.setRebalancerClassName(rebalancerClass.getName());
     _admin.setResourceIdealState(_clusterName, taskResource, is);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/18628345/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index e9127a1..c221d96 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,20 +19,33 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
-
-import org.apache.helix.*;
-import org.apache.helix.controller.HelixControllerMain;
+import java.util.TreeMap;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.task.*;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -40,6 +53,9 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
 public class TestTaskRebalancer extends ZkIntegrationTestBase {
   private static final int n = 5;
   private static final int START_PORT = 12918;
@@ -265,6 +281,51 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertEquals(maxAttempts, 2);
   }
 
+  @Test
+  public void testIndependentTask() throws Exception {
+    final String taskResource = "independentTask";
+    Map<String, String> config = new TreeMap<String, String>();
+    config.put("TargetPartitions", "0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19");
+    config.put("Command", "Reindex");
+    config.put("CommandConfig", String.valueOf(200));
+    config.put("TimeoutPerPartition", String.valueOf(10 * 1000));
+    Workflow flow =
+        WorkflowGenerator.generateSingleTaskWorkflowBuilder(taskResource, config).build();
+    _driver.start(flow);
+
+    // Wait for task completion
+    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+
+    // Ensure all partitions are completed individually
+    TaskContext ctx =
+        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
+      Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
+    }
+  }
+
+  @Test
+  public void testIndependentRepeatedWorkflow() throws Exception {
+    final String workflowName = "independentTaskWorkflow";
+    Map<String, String> config = new TreeMap<String, String>();
+    config.put("TargetPartitions", "0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19");
+    config.put("Command", "Reindex");
+    config.put("CommandConfig", String.valueOf(200));
+    config.put("TimeoutPerPartition", String.valueOf(10 * 1000));
+    Workflow flow =
+        WorkflowGenerator.generateRepeatedTaskWorkflowBuilder(workflowName, config).build();
+    new TaskDriver(_manager).start(flow);
+
+    // Wait until the task completes
+    TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
+
+    // Assert completion for all tasks within two minutes
+    for (String task : flow.getTaskConfigs().keySet()) {
+      TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
+    }
+  }
+
   private static class ReindexTask implements Task {
     private final long _delay;
     private volatile boolean _canceled;

http://git-wip-us.apache.org/repos/asf/helix/blob/18628345/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 0d7251a..653d88a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -19,12 +19,12 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import org.apache.helix.task.Workflow;
-
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.task.Workflow;
+
 /**
  * Convenience class for generating various test workflows
  */
@@ -72,12 +72,17 @@ public class WorkflowGenerator {
   }
 
   public static Workflow.Builder generateDefaultRepeatedTaskWorkflowBuilder(String workflowName)
{
+    return generateRepeatedTaskWorkflowBuilder(workflowName, DEFAULT_TASK_CONFIG);
+  }
+
+  public static Workflow.Builder generateRepeatedTaskWorkflowBuilder(String workflowName,
+      Map<String, String> config) {
     Workflow.Builder builder = new Workflow.Builder(workflowName);
     builder.addParentChildDependency(TASK_NAME_1, TASK_NAME_2);
 
-    for (String key : DEFAULT_TASK_CONFIG.keySet()) {
-      builder.addConfig(TASK_NAME_1, key, DEFAULT_TASK_CONFIG.get(key));
-      builder.addConfig(TASK_NAME_2, key, DEFAULT_TASK_CONFIG.get(key));
+    for (String key : config.keySet()) {
+      builder.addConfig(TASK_NAME_1, key, config.get(key));
+      builder.addConfig(TASK_NAME_2, key, config.get(key));
     }
 
     return builder;


Mime
View raw message