helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 02/10: Fix N -> N + 1 extra bootstrap
Date Thu, 28 Mar 2019 19:31:46 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit be3dd97333fa36ddbd826e0326b33837e51abdf6
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:24:44 2019 -0700

    Fix N -> N + 1 extra bootstrap
    
    When rebalancing, Helix does the following before dropping replicas that are not in the
preference list:
        1. Send a replica on a node that just came online from OFFLINE to SLAVE
        2. Drop a replica on a node that just bootstrapped
    
    This happened because of the rebalancer is trying to make all current states exactly match
the states in the best possible mapping. Once conditions are met, Helix started dropping replicas.
This fix makes Helix guarantee that only the replicas in the preference list match the states
in the best possible mapping. So the rebalancer does not have to wait and bootstrap extra
replicas that are not in preference list.
---
 .../rebalancer/DelayedAutoRebalancer.java          |  15 ++-
 .../rebalancer/TestZeroReplicaAvoidance.java       |  37 +++++-
 .../helix/integration/task/TestJobFailure.java     |   1 +
 .../TestDelayedAutoRebalancer.MasterSlave.json     | 137 +++++++++++++++++++++
 4 files changed, 179 insertions(+), 11 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 1d00d60..65b3f84 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -529,7 +529,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
     // we should drop all partitions from previous assigned instances.
     if (!currentMapWithPreferenceList.values().contains(HelixDefinedState.ERROR.name())
         && bestPossibleStateMap.size() > numReplicas && readyToDrop(currentStateMap,
-        bestPossibleStateMap, numReplicas, combinedPreferenceList)) {
+        bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
       for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
         String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size()
- i - 1);
         bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
@@ -551,13 +551,12 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
   }
 
   private boolean readyToDrop(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, int numReplicas, List<String> combinedPreferenceList)
{
-    if (currentStateMap.size() != bestPossibleMap.size()) {
-      return false;
-    }
-    
-    for (int i = 0; i < numReplicas; i++) {
-      String instance = combinedPreferenceList.get(i);
+      Map<String, String> bestPossibleMap, List<String> preferenceList,
+      List<String> combinedPreferenceList) {
+    Set<String> preferenceWithActiveState = new HashSet<>(preferenceList);
+    preferenceWithActiveState.retainAll(combinedPreferenceList);
+
+    for (String instance : preferenceWithActiveState) {
       if (!currentStateMap.containsKey(instance) || !currentStateMap.get(instance)
           .equals(bestPossibleMap.get(instance))) {
         return false;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 05141e1..fe5014c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -5,15 +5,20 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -45,7 +50,7 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
 
   @Test(dataProvider = "zeroReplicaInput")
   public void testZeroReplicaAvoidanceDuringRebalance(StateModelDefinition stateModelDef,
-      List<String> instancePreferenceList, Map<String, String> currentStateMap,
+      List<String> instancePreferenceList, Map<String, String> currentStateMap,
Map<String, List<Message>> pendingMessages,
       Map<String, String> expectedBestPossibleMap) {
     System.out.println("START TestDelayedAutoRebalancer at " + new Date(System.currentTimeMillis()));
 
@@ -66,6 +71,18 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
       currentStateOutput
           .setCurrentState("test", partition, instance, currentStateMap.get(instance));
     }
+    Set<String> allInstances = new HashSet<>(instancePreferenceList);
+    allInstances.addAll(currentStateMap.keySet());
+    if (pendingMessages != null) {
+      for (String instance : allInstances) {
+        List<Message> messages = pendingMessages.get(instance);
+        if (messages != null) {
+          for (Message message : messages) {
+            currentStateOutput.setPendingMessage("test", partition, instance, message);
+          }
+        }
+      }
+    }
     Map<String, String> bestPossibleMap = rebalancer
         .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList,
             currentStateOutput, Collections.<String>emptySet(), is,
@@ -93,6 +110,7 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
 
   private final String INPUT = "inputs";
   private final String CURRENT_STATE = "currentStates";
+  private final String PENDING_MESSAGES = "pendingMessages";
   private final String BEST_POSSIBLE_STATE = "bestPossibleStates";
   private final String PREFERENCE_LIST = "preferenceList";
   private final String STATE_MODEL = "statemodel";
@@ -115,8 +133,21 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
         Map<String, String> bestPossibleStates =
             (Map<String, String>) inMap.get(BEST_POSSIBLE_STATE);
         List<String> preferenceList = (List<String>) inMap.get(PREFERENCE_LIST);
-
-        ret.add(new Object[] { stateModelDef, preferenceList, currentStates, bestPossibleStates
});
+        Map<String, String> pendingStates = (Map<String, String>) inMap.get(PENDING_MESSAGES);
+        Map<String, List<Message>> pendingMessages = null;
+        if (pendingStates != null) {
+          Random r = new Random();
+          pendingMessages = new HashMap<>();
+          for (String instance : pendingStates.keySet()) {
+            pendingMessages.put(instance, new ArrayList<Message>());
+            Message m = new Message(new ZNRecord(UUID.randomUUID().toString()));
+            m.setFromState(pendingStates.get(instance).split(":")[0]);
+            m.setToState(pendingStates.get(instance).split(":")[1]);
+            pendingMessages.get(instance).add(m);
+          }
+        }
+
+        ret.add(new Object[] { stateModelDef, preferenceList, currentStates, pendingMessages,
bestPossibleStates });
       }
     } catch (IOException e) {
       e.printStackTrace();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
index 5309eb9..28d7f76 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
@@ -96,6 +96,7 @@ public final class TestJobFailure extends TaskSynchronizedTestBase {
         TaskState.valueOf(expectedJobEndingStates));
     _driver.pollForWorkflowState(WORKFLOW_NAME, TaskState.valueOf(expectedWorkflowEndingStates));
 
+    Thread.sleep(2000);
     JobContext jobContext = _driver.getJobContext(TaskUtil.getNamespacedJobName(WORKFLOW_NAME,
JOB_NAME));
     for (int pId : jobContext.getPartitionSet()) {
       Map<String, String> targetPartitionConfig = targetPartitionConfigs.get(jobContext.getTargetForPartition(pId));
diff --git a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
index aa2c98d..22bfbe3 100644
--- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
+++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.MasterSlave.json
@@ -115,6 +115,143 @@
         "localhost_3": "SLAVE",
         "localhost_4": "SLAVE"
       }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_6": "OFFLINE",
+        "localhost_3": "MASTER",
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "SLAVE"
+      },
+      "bestPossibleStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "DROPPED",
+        "localhost_3": "MASTER",
+        "localhost_6": "DROPPED"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_6": "OFFLINE",
+        "localhost_3": "SLAVE",
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "SLAVE"
+      },
+      "bestPossibleStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "SLAVE",
+        "localhost_3": "MASTER"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_6": "OFFLINE",
+        "localhost_3": "OFFLINE",
+        "localhost_0": "SLAVE",
+        "localhost_1": "MASTER",
+        "localhost_2": "SLAVE"
+      },
+      "bestPossibleStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "MASTER",
+        "localhost_2": "SLAVE",
+        "localhost_3": "SLAVE"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_6": "OFFLINE",
+        "localhost_3": "OFFLINE",
+        "localhost_1": "MASTER"
+      },
+      "bestPossibleStates": {
+        "localhost_1": "MASTER",
+        "localhost_4": "SLAVE",
+        "localhost_3": "SLAVE"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "MASTER",
+        "localhost_2": "OFFLINE"
+      },
+      "bestPossibleStates": {
+        "localhost_1": "MASTER",
+        "localhost_0": "SLAVE",
+        "localhost_3": "SLAVE",
+        "localhost_2": "SLAVE"
+      },
+      "pendingMessages": {
+        "localhost_3": "OFFLINE:SLAVE"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "MASTER",
+        "localhost_2": "OFFLINE",
+        "localhost_3": "SLAVE"
+      },
+      "bestPossibleStates": {
+        "localhost_1": "SLAVE",
+        "localhost_0": "SLAVE",
+        "localhost_3": "MASTER",
+        "localhost_2": "SLAVE"
+      }
+    },
+    {
+      "preferenceList": [
+        "localhost_3",
+        "localhost_4",
+        "localhost_5"
+      ],
+      "currentStates": {
+        "localhost_0": "SLAVE",
+        "localhost_1": "SLAVE",
+        "localhost_2": "OFFLINE",
+        "localhost_3": "MASTER"
+      },
+      "bestPossibleStates": {
+        "localhost_1": "SLAVE",
+        "localhost_0": "SLAVE",
+        "localhost_3": "MASTER",
+        "localhost_2": "DROPPED"
+      }
     }
   ]
 }


Mime
View raw message