helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [04/38] helix git commit: Add option to allow persisting best possible partition assignment in IdealState for semi-auto and full-auto modes.
Date Wed, 08 Feb 2017 17:59:39 GMT
Add option to allow persisting best possible partition assignment in IdealState for semi-auto
and full-auto modes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7d0885c7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7d0885c7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7d0885c7

Branch: refs/heads/helix-0.6.x
Commit: 7d0885c78dbb94c2224490f53a161a752ee83de5
Parents: f1e9188
Author: Lei Xia <lxia@linkedin.com>
Authored: Fri Jul 1 16:27:53 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Sun Feb 5 18:37:41 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixConstants.java   |   1 -
 .../main/java/org/apache/helix/PropertyKey.java |   2 +-
 .../controller/GenericHelixController.java      |   2 +
 .../controller/stages/ClusterDataCache.java     |   3 -
 .../stages/PersistAssignmentStage.java          | 100 ++++++++++
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |   2 +
 .../org/apache/helix/model/ClusterConfig.java   |  12 +-
 .../java/org/apache/helix/model/IdealState.java |  11 +
 .../org/apache/helix/model/ResourceConfig.java  |   2 +-
 .../TestRebalancerPersistAssignments.java       | 199 +++++++++++++++++++
 10 files changed, 327 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/HelixConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 6de0ff1..2436ba7 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -49,7 +49,6 @@ public interface HelixConstants {
   @Deprecated
   enum ClusterConfigType {
     HELIX_DISABLE_PIPELINE_TRIGGERS,
-    DISABLE_FULL_AUTO // override all resources in the cluster to use SEMI-AUTO instead of
FULL-AUTO
   }
 
   String DEFAULT_STATE_MODEL_FACTORY = "DEFAULT";

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 7265a7e..88afc71 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -177,7 +177,7 @@ public class PropertyKey {
      */
 
     public PropertyKey clusterConfigs() {
-      return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, HelixProperty.class,
+      return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, ClusterConfig.class,
           _clusterName, ConfigScopeProperty.CLUSTER.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8016e44..4cea33b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -54,6 +54,7 @@ import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.ResourceValidationStage;
@@ -198,6 +199,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       rebalancePipeline.addStage(new MessageSelectionStage());
       rebalancePipeline.addStage(new MessageThrottleStage());
       rebalancePipeline.addStage(new TaskAssignmentStage());
+      rebalancePipeline.addStage(new PersistAssignmentStage());
 
       // external view generation
       Pipeline externalViewPipeline = new Pipeline();

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index c8ca941..dbc12d4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.ClusterConfig;
@@ -42,7 +41,6 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.task.JobConfig;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Lists;
@@ -56,7 +54,6 @@ import com.google.common.collect.Sets;
 public class ClusterDataCache {
 
   private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
-
   private ClusterConfig _clusterConfig;
   Map<String, LiveInstance> _liveInstanceMap;
   Map<String, LiveInstance> _liveInstanceCacheMap;

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
new file mode 100644
index 0000000..ea49234
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -0,0 +1,100 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.log4j.Logger;
+
+/**
+ * Persist the ResourceAssignment of each resource that went through rebalancing
+ */
+public class PersistAssignmentStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(PersistAssignmentStage.class);
+
+  @Override public void process(ClusterEvent event) throws Exception {
+    LOG.info("START PersistAssignmentStage.process()");
+    long startTime = System.currentTimeMillis();
+
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    ClusterConfig clusterConfig = cache.getClusterConfig();
+
+    if (clusterConfig.isPersistBestPossibleAssignment()) {
+      HelixManager helixManager = event.getAttribute("helixmanager");
+      HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      BestPossibleStateOutput assignments =
+          event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+      Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+
+      for (String resourceId : assignments.resourceSet()) {
+        Resource resource = resourceMap.get(resourceId);
+        if (resource != null) {
+          boolean changed = false;
+          Map<Partition, Map<String, String>> assignment = assignments.getResourceMap(resourceId);
+          IdealState idealState = cache.getIdealState(resourceId);
+          if (idealState == null) {
+            LOG.warn("IdealState not found for resource " + resourceId);
+            continue;
+          }
+          IdealState.RebalanceMode mode = idealState.getRebalanceMode();
+          if (!mode.equals(IdealState.RebalanceMode.SEMI_AUTO) && !mode
+              .equals(IdealState.RebalanceMode.FULL_AUTO)) {
+            // do not persist assignment for resource in neither semi or full auto.
+            continue;
+          }
+          for (Partition partition : resource.getPartitions()) {
+            Map<String, String> instanceMap = assignment.get(partition);
+            Map<String, String> existInstanceMap =
+                idealState.getInstanceStateMap(partition.getPartitionName());
+            if (instanceMap == null && existInstanceMap == null) {
+              continue;
+            }
+            if (instanceMap == null || existInstanceMap == null || !instanceMap
+                .equals(existInstanceMap)) {
+              changed = true;
+              break;
+            }
+          }
+          if (changed) {
+            for (Partition partition : assignment.keySet()) {
+              Map<String, String> instanceMap = assignment.get(partition);
+              idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
+            }
+            accessor.setProperty(keyBuilder.idealStates(resourceId), idealState);
+          }
+        }
+      }
+    }
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END PersistAssignmentStage.process(), took " + (endTime - startTime) + " ms");
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index b7cb6d4..c264e7b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -58,6 +58,7 @@ import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
@@ -983,6 +984,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       // for now keep mapField in SEMI_AUTO mode and remove listField in CUSTOMIZED mode
       if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
         idealState.getRecord().setListFields(newIdealState.getListFields());
+        // TODO: need consider to remove this.
         idealState.getRecord().setMapFields(newIdealState.getMapFields());
       }
       if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 25a16d1..1ed7cf9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -32,7 +32,8 @@ public class ClusterConfig extends HelixProperty {
   public enum ClusterConfigProperty {
     HELIX_DISABLE_PIPELINE_TRIGGERS,
     TOPOLOGY,  // cluster topology definition, for example, "/zone/rack/host/instance"
-    FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places
the replicas from same partition.
+    FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places
the replicas from same partition.
+    PERSIST_BEST_POSSIBLE_ASSIGNMENT
   }
 
   /**
@@ -58,6 +59,15 @@ public class ClusterConfig extends HelixProperty {
    *
    * @return
    */
+  public Boolean isPersistBestPossibleAssignment() {
+    return _record
+        .getBooleanField(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString(),
false);
+  }
+
+  /**
+   *
+   * @return
+   */
   public Boolean isPipelineTriggersDisabled() {
     return _record
         .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(),
false);

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 3cc0456..e7da9b4 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -315,6 +315,17 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the current mapping of a partition
+   *
+   * @param partitionName    the name of the partition
+   * @param instanceStateMap instance->state mapping for this partition.
+   * @return the instances where the replicas live and the state of each
+   */
+  public void setInstanceStateMap(String partitionName, Map<String, String> instanceStateMap)
{
+    _record.setMapField(partitionName, instanceStateMap);
+  }
+
+  /**
    * Get the instances who host replicas of a partition
    * @param partitionName the partition to look up
    * @return set of instance names

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index d9a925f..c8c7b72 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -32,7 +32,7 @@ import org.apache.log4j.Logger;
  */
 public class ResourceConfig extends HelixProperty {
   /**
-   * Configurable characteristics of an instance
+   * Configurable characteristics of a resource
    */
   public enum ResourceConfigProperty {
     MONITORING_DISABLED, // Resource-level config, do not create Mbean and report any status
for the resource.

http://git-wip-us.apache.org/repos/asf/helix/blob/7d0885c7/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
new file mode 100644
index 0000000..aa38d2c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -0,0 +1,199 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
+  @Override
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NODE_NR; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // start dummy participants
+    for (int i = 0; i < NODE_NR; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+  }
+
+  @DataProvider(name = "rebalanceModes")
+  public static RebalanceMode [][] rebalanceModes() {
+    return new RebalanceMode[][] { {RebalanceMode.FULL_AUTO},
+        {RebalanceMode.SEMI_AUTO}
+    };
+  }
+
+  @Test(dataProvider = "rebalanceModes")
+  public void testAutoRebalanceWithPersistAssignmentEnable(RebalanceMode rebalanceMode)
+      throws Exception {
+    String testDb = "TestDB1-" + rebalanceMode.name();
+    enablePersistAssignment(true);
+
+    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 30,
+        BuiltInStateModelDefinitions.MasterSlave.name(), rebalanceMode.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    IdealState idealState =
+        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+    verifyAssignmentInIdealStateWithPersistEnabled(idealState, new HashSet<String>());
+
+    // kill 1 node
+    _participants[0].syncStop();
+
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
testDb);
+    // verify that IdealState contains updated assignment in it map fields.
+
+    Set<String> excludedInstances = new HashSet<String>();
+    excludedInstances.add(_participants[0].getInstanceName());
+    verifyAssignmentInIdealStateWithPersistEnabled(idealState, excludedInstances);
+
+    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+  }
+
+  @Test(dataProvider = "rebalanceModes")
+  public void testAutoRebalanceWithPersistAssignmentDisabled(RebalanceMode rebalanceMode)
+      throws Exception {
+    String testDb = "TestDB2-" + rebalanceMode.name();
+    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 30,
+        BuiltInStateModelDefinitions.MasterSlave.name(), rebalanceMode.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    // kill 1 node
+    _participants[0].syncStop();
+
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    IdealState idealState =
+        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+
+    Set<String> excludedInstances = new HashSet<String>();
+    excludedInstances.add(_participants[0].getInstanceName());
+    verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
+
+    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+  }
+
+  private void enablePersistAssignment(Boolean enable) {
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    HelixConfigScope clusterScope =
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+            .forCluster(CLUSTER_NAME).build();
+
+    configAccessor.set(clusterScope,
+        ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
+        enable.toString());
+  }
+
+  // verify that the disabled or failed instance should not be included in bestPossible assignment.
+  private void verifyAssignmentInIdealStateWithPersistEnabled(IdealState idealState,
+      Set<String> excludedInstances) {
+    for (String partition : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(partition);
+      Assert.assertNotNull(instanceStateMap);
+      Assert.assertFalse(instanceStateMap.isEmpty());
+
+      Set<String> instancesInMap = instanceStateMap.keySet();
+      Set<String> instanceInList = idealState.getInstanceSet(partition);
+      Assert.assertTrue(instanceInList.containsAll(instancesInMap));
+
+      for (String ins : excludedInstances) {
+        Assert.assertFalse(instancesInMap.contains(ins));
+      }
+    }
+  }
+
+  // verify that the bestPossible assignment should be empty or should not be changed.
+  private void verifyAssignmentInIdealStateWithPersistDisabled(IdealState idealState,
+      Set<String> excludedInstances) {
+    boolean mapFieldEmpty = true;
+    boolean assignmentNotChanged = false;
+    for (String partition : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(partition);
+      if (instanceStateMap == null || instanceStateMap.isEmpty()) {
+        continue;
+      }
+      mapFieldEmpty = false;
+      Set<String> instancesInMap = instanceStateMap.keySet();
+      for (String ins : excludedInstances) {
+        if(instancesInMap.contains(ins)) {
+          // if at least one excluded instance is included, it means assignment was not updated.
+          assignmentNotChanged = true;
+        }
+      }
+    }
+
+    Assert.assertTrue((mapFieldEmpty || assignmentNotChanged),
+        "BestPossible assignment was updated.");
+  }
+}


Mime
View raw message