helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 06/10: HELIX: Recovery balance partitions with disabled top-state replicas
Date Thu, 28 Mar 2019 19:31:50 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit de38fa1a4e13939807427694a538ea468f8abb36
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:27:52 2019 -0700

    HELIX: Recovery balance partitions with disabled top-state replicas
    
        Previously, disabling of partitions or disabled instances did not affect Helix's throttling
logic. This was problematic because the ability to disable was designed in in order to move
partitons/replicas out of the given instance as a measure to deal with unhealthy partitions/instances.
This allows, for partitions that are disabled, to go into recovery balance, and when the user
has not set any throttling configs for recovery balance, these types of state transitions
will go throug [...]
        Changelist:
        1. Add a check for determining rebalance type for a given partition
        2. Add an integration test
---
 .../stages/IntermediateStateCalcStage.java         |  18 +-
 ...ority.java => TestStateTransitionPriority.java} |   2 +-
 .../TestNoThrottleDisabledPartitions.java          | 350 +++++++++++++++++++++
 3 files changed, 363 insertions(+), 7 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 16c3fe3..f021f66 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -41,6 +41,7 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
@@ -312,7 +313,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
 
       RebalanceType rebalanceType = getRebalanceType(cache, bestPossibleMap, preferenceList,
-          stateModelDef, currentStateMap, idealState);
+          stateModelDef, currentStateMap, idealState, partition.getPartitionName());
 
       // TODO: refine getRebalanceType to return more accurate rebalance types. So the following
       // logic doesn't need to check for more details.
@@ -691,11 +692,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
-   * For a partiton, given its preferenceList, bestPossibleState, and currentState, determine
which
+   * For a partition, given its preferenceList, bestPossibleState, and currentState, determine
which
    * type of rebalance is needed to model IdealState's states defined by the state model
definition.
    * @return RebalanceType needed to bring the replicas to idea states
    *         RECOVERY_BALANCE - not all required states (replicas) are available through
all
-   *         replicas
+   *         replicas, or the partition is disabled
    *         NONE - current state matches the ideal state
    *         LOAD_BALANCE - although all replicas required exist, Helix needs to optimize
the
    *         allocation
@@ -703,7 +704,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
       Map<String, String> bestPossibleMap, List<String> preferenceList,
       StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
-      IdealState idealState) {
+      IdealState idealState, String partitionName) {
     if (preferenceList == null) {
       preferenceList = Collections.emptyList();
     }
@@ -716,8 +717,13 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     // required by StateModelDefinition.
     LinkedHashMap<String, Integer> expectedStateCountMap =
         stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's
counts
-    Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap);
// Current
-    // counts
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(
+        cache.getDisabledInstancesForPartition(idealState.getResourceName(), partitionName));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
     // Go through each state and compare counts
     for (String state : expectedStateCountMap.keySet()) {
       Integer expectedCount = expectedStateCountMap.get(state);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
similarity index 99%
rename from helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
rename to helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
index e15ca93..080e270 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
@@ -42,7 +42,7 @@ import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-public class TestStateTransitionPrirority extends BaseStageTest {
+public class TestStateTransitionPriority extends BaseStageTest {
   public static final String RESOURCE = "Resource";
   public static final String PARTITION = "Partition";
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
new file mode 100644
index 0000000..73bdcf8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
@@ -0,0 +1,350 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apacahe.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestNoThrottleDisabledPartitions extends ZkTestBase {
+  private String _resourceName = "TestDB";
+  private String _clusterName = getShortClassName();
+  private HelixDataAccessor _accessor;
+  private MockParticipantManager[] _participants;
+
+  /**
+   * Given the following setup for a partition:
+   * instance 1 : M
+   * instance 2 : S
+   * instance 3 : S
+   * and no throttle config set for recovery balance
+   * and throttle config of 1 set for load balance,
+   * test that disabling instance 1 puts this partition in recovery balance, so that all
transitions
+   * for a partition go through.
+   * * instance 1 : S (M->S->Offline)
+   * * instance 2 : M (S->M because it's in recovery)
+   * * instance 3 : S
+   * @throws Exception
+   */
+  @Test
+  public void testDisablingTopStateReplicaByDisablingInstance() throws Exception {
+    int participantCount = 5;
+    setupEnvironment(participantCount);
+
+    // Set the throttling only for load balance
+    setThrottleConfigForLoadBalance();
+
+    // Disable instance 0 so that it will cause a partition to do a recovery balance
+    PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
+    InstanceConfig instanceConfig = _accessor.getProperty(key);
+    instanceConfig.setInstanceEnabled(false);
+    _accessor.setProperty(key, instanceConfig);
+
+    // Resume the controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    controller.syncStart();
+    Thread.sleep(500L);
+
+    // The disabled instance should not hold any top state replicas (MASTER)
+    PropertyKey liveInstanceKey =
+        _accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
+    LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
+    if (liveInstance != null) {
+      String sessionId = liveInstance.getSessionId();
+      List<CurrentState> currentStates = _accessor.getChildValues(
+          _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId));
+      for (CurrentState currentState : currentStates) {
+        for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap()
+            .entrySet()) {
+          Assert.assertFalse(partitionState.getValue().equals("MASTER"));
+        }
+      }
+    }
+
+    // clean up the cluster
+    controller.syncStop();
+    for (int i = 0; i < participantCount; i++) {
+      _participants[i].syncStop();
+    }
+    deleteCluster(_clusterName);
+  }
+
+  /**
+   * Given the following setup for a partition:
+   * instance 1 : M
+   * instance 2 : S
+   * instance 3 : S
+   * and no throttle config set for recovery balance
+   * and throttle config of 1 set for load balance,
+   * Instead of disabling the instance, we disable the partition in the instance config.
+   * * instance 1 : S (M->S->Offline)
+   * * instance 2 : M (S->M because it's in recovery)
+   * * instance 3 : S
+   * @throws Exception
+   */
+  @Test
+  public void testDisablingPartitionOnInstance() throws Exception {
+    int participantCount = 5;
+    setupEnvironment(participantCount);
+
+    // Set the throttling only for load balance
+    setThrottleConfigForLoadBalance();
+
+    // In this setup, TestDB0_2 has a MASTER replica on localhost_12918
+    disablePartitionOnInstance(_participants[0], _resourceName + "0", "TestDB0_2");
+
+    // Resume the controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    controller.syncStart();
+    Thread.sleep(500L);
+
+    // The disabled instance should not hold any top state replicas (MASTER)
+    PropertyKey liveInstanceKey =
+        _accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
+    LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
+    if (liveInstance != null) {
+      String sessionId = liveInstance.getSessionId();
+      List<CurrentState> currentStates = _accessor.getChildValues(
+          _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId));
+      for (CurrentState currentState : currentStates) {
+        for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap()
+            .entrySet()) {
+          if (partitionState.getKey().equals("TestDB0_2")) {
+            Assert.assertFalse(partitionState.getValue().equals("MASTER"));
+          }
+        }
+      }
+    }
+
+    // clean up the cluster
+    controller.syncStop();
+    for (int i = 0; i < participantCount; i++) {
+      _participants[i].syncStop();
+    }
+    deleteCluster(_clusterName);
+  }
+
+  /**
+   * Given the following setup for a partition:
+   * instance 1 : M
+   * instance 2 : S
+   * instance 3 : S
+   * and no throttle config set for recovery balance
+   * and throttle config of 1 set for load balance,
+   * Instead of disabling the instance, we disable the partition in the instance config.
+   * Here, we set the recovery balance config. Then we should still see the MASTER.
+   * * instance 1 : S (M->S->Offline)
+   * * instance 2 : M (S->M because it's in recovery)
+   * * instance 3 : S
+   * @throws Exception
+   */
+  @Test
+  public void testDisablingPartitionOnInstanceWithRecoveryThrottle() throws Exception {
+    int participantCount = 5;
+    setupEnvironment(participantCount);
+
+    // Set the throttling
+    setThrottleConfigForLoadBalance();
+    setThrottleConfigForRecoveryBalance();
+
+    // In this setup, TestDB0_2 has a MASTER replica on localhost_12918
+    disablePartitionOnInstance(_participants[0], _resourceName + "0", "TestDB0_2");
+
+    // Resume the controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    controller.syncStart();
+    Thread.sleep(500L);
+
+    // The disabled instance should not hold any top state replicas (MASTER)
+    PropertyKey liveInstanceKey =
+        _accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
+    LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
+    if (liveInstance != null) {
+      String sessionId = liveInstance.getSessionId();
+      List<CurrentState> currentStates = _accessor.getChildValues(
+          _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId));
+      for (CurrentState currentState : currentStates) {
+        for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap()
+            .entrySet()) {
+          if (partitionState.getKey().equals("TestDB0_2")) {
+            Assert.assertTrue(partitionState.getValue().equals("MASTER"));
+          }
+        }
+      }
+    }
+
+    // clean up the cluster
+    controller.syncStop();
+    for (int i = 0; i < participantCount; i++) {
+      _participants[i].syncStop();
+    }
+    deleteCluster(_clusterName);
+  }
+
+  /**
+   * Set up the cluster and pause the controller.
+   * @param participantCount
+   * @throws Exception
+   */
+  private void setupEnvironment(int participantCount) throws Exception {
+    _participants = new MockParticipantManager[participantCount];
+
+    _accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    setupCluster(_clusterName, participantCount);
+
+    DelayedTransitionBase transition = new DelayedTransitionBase(10L);
+
+    // Start _participants
+    for (int i = 0; i < participantCount; i++) {
+      _participants[i] =
+          new MockParticipantManager(ZK_ADDR, _clusterName, "localhost_" + (12918 + i));
+      _participants[i].setTransition(transition);
+      _participants[i].syncStart();
+    }
+
+    // Start the controller and verify that it is in the best possible state
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    controller.syncStart();
+    BestPossibleExternalViewVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify(3000));
+
+    // Pause the controller
+    controller.syncStop();
+  }
+
+  /**
+   * Set throttle limits only for load balance so that none of them would happen.
+   */
+  private void setThrottleConfigForLoadBalance() {
+    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+
+    ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
+    clusterConfig.setResourcePriorityField("Name");
+    List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
+
+    // Add throttling at cluster-level
+    throttleConfigs.add(
+        new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 0));
+
+    // Add throttling at instance level
+    throttleConfigs.add(
+        new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 0));
+
+    clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+    _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
+  }
+
+  /**
+   * Set throttle limits only for recovery balance so that none of them would happen.
+   */
+  private void setThrottleConfigForRecoveryBalance() {
+    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+
+    ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
+    clusterConfig.setResourcePriorityField("Name");
+    List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
+
+    // Add throttling at cluster-level
+    throttleConfigs.add(new StateTransitionThrottleConfig(
+        StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 0));
+
+    // Add throttling at instance level
+    throttleConfigs.add(new StateTransitionThrottleConfig(
+        StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+        StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 0));
+
+    clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+    _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
+  }
+
+  /**
+   * Set up delayed rebalancer and minimum active replica settings to mimic user's use case.
+   * @param clusterName
+   * @param participantCount
+   * @throws Exception
+   */
+  private void setupCluster(String clusterName, int participantCount) throws Exception {
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
+        "localhost", // participant name prefix
+        _resourceName, // resource name prefix
+        3, // resources
+        5, // partitions per resource
+        participantCount, // number of nodes
+        3, // replicas
+        "MasterSlave", IdealState.RebalanceMode.FULL_AUTO, true); // do rebalance
+
+    // Enable DelayedAutoRebalance
+    ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(1800000L);
+    _accessor.setProperty(_accessor.keyBuilder().clusterConfig(), clusterConfig);
+
+    // Set minActiveReplicas at 2
+    List<String> idealStates = _accessor.getChildNames(_accessor.keyBuilder().idealStates());
+    for (String is : idealStates) {
+      IdealState idealState = _accessor.getProperty(_accessor.keyBuilder().idealStates(is));
+      idealState.setMinActiveReplicas(2);
+      idealState.setRebalanceStrategy(
+          "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
+      idealState
+          .setRebalancerClassName("org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
+      _accessor.setProperty(_accessor.keyBuilder().idealStates(is), idealState);
+    }
+  }
+
+  /**
+   * Disable select partitions from the first instance to test that these partitions are
not subject
+   * to throttling.
+   */
+  private void disablePartitionOnInstance(MockParticipantManager participant, String resourceName,
+      String partitionName) {
+    String instanceName = participant.getInstanceName();
+    PropertyKey key = _accessor.keyBuilder().instanceConfig(instanceName);
+    InstanceConfig instanceConfig = _accessor.getProperty(key);
+    instanceConfig.setInstanceEnabledForPartition(resourceName, partitionName, false);
+    _accessor.setProperty(key, instanceConfig);
+  }
+}


Mime
View raw message