helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch master updated: Add getIdealAssignmentForWagedFullAuto in HelixUtil for WAGED rebalancer (#1031)
Date Fri, 29 May 2020 16:21:08 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 431a096  Add getIdealAssignmentForWagedFullAuto in HelixUtil for WAGED rebalancer
(#1031)
431a096 is described below

commit 431a0961c0366ddfaac7dfc08e6c1b0bcd26f87b
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Fri May 29 09:20:58 2020 -0700

    Add getIdealAssignmentForWagedFullAuto in HelixUtil for WAGED rebalancer (#1031)
    
    This commit adds a method, getIdealAssignmentForWagedFullAuto() in HelixUtil that returns
to the user the cluster-wide assignment result obtained from running a rebalance using WAGED.
The user will be able to use this method to predict how Helix will be rebalancing resources
using the WAGED rebalancer.
---
 .../dataproviders/BaseControllerDataProvider.java  |  8 ++
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  | 88 ++++++++++++++++++++
 .../helix/manager/zk/ZkBucketDataAccessor.java     |  5 +-
 .../BestPossibleExternalViewVerifier.java          | 89 +++-----------------
 .../main/java/org/apache/helix/util/HelixUtil.java | 96 ++++++++++++++++++++++
 .../java/org/apache/helix/util/RebalanceUtil.java  | 24 ++++++
 .../helix/util/WeightAwareRebalanceUtil.java       |  2 +-
 .../WagedRebalancer/TestWagedRebalance.java        | 50 +++++++++++
 .../helix/integration/task/TaskTestUtil.java       | 11 +--
 9 files changed, 282 insertions(+), 91 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 51b2e80..a24ea46 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -601,6 +601,14 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
     return _resourceConfigCache.getPropertyMap();
   }
 
+  /**
+   * Sets the resource config map
+   * @param resourceConfigMap
+   */
+  public void setResourceConfigMap(Map<String, ResourceConfig> resourceConfigMap) {
+    _resourceConfigCache.setPropertyMap(resourceConfigMap);
+  }
+
   public ResourceConfig getResourceConfig(String resource) {
     return _resourceConfigCache.getPropertyByName(resource);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
new file mode 100644
index 0000000..eccb175
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -0,0 +1,88 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+
+
+/**
+ * This rebalancer is a version of WagedRebalancer that only reads the existing assignment
metadata
+ * to compute the best possible assignment but never writes back the resulting assignment
metadata
+ * from global or partial rebalance. It does so by using a modified version of
+ * AssignmentMetadataStore, ReadOnlyAssignmentMetadataStore.
+ *
+ * This class is to be used in the cluster verifiers, tests, or util methods.
+ */
+public class ReadOnlyWagedRebalancer extends WagedRebalancer {
+  public ReadOnlyWagedRebalancer(String metadataStoreAddress, String clusterName,
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+    super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddress, clusterName),
+        ConstraintBasedAlgorithmFactory.getInstance(preferences), Optional.empty());
+  }
+
+  @Override
+  protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, CurrentStateOutput currentStateOutput, RebalanceAlgorithm
algorithm)
+      throws HelixRebalanceException {
+    return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
+        resourceMap.keySet());
+  }
+
+  private static class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
+    ReadOnlyAssignmentMetadataStore(String metadataStoreAddress, String clusterName) {
+      super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName);
+    }
+
+    @Override
+    public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline)
{
+      // If baseline hasn't changed, skip updating the metadata store
+      if (compareAssignments(_globalBaseline, globalBaseline)) {
+        return false;
+      }
+      // Update the in-memory reference only
+      _globalBaseline = globalBaseline;
+      return true;
+    }
+
+    @Override
+    public boolean persistBestPossibleAssignment(
+        Map<String, ResourceAssignment> bestPossibleAssignment) {
+      // If bestPossibleAssignment hasn't changed, skip updating the metadata store
+      if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
+        return false;
+      }
+      // Update the in-memory reference only
+      _bestPossibleAssignment = bestPossibleAssignment;
+      return true;
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index f61fb50..2eda09b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import com.google.common.collect.ImmutableMap;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,18 +29,20 @@ import java.util.TimerTask;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
-import org.apache.helix.zookeeper.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index aa663bc..6fc833b 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -27,37 +27,27 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
-import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
-import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
-import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZkBucketDataAccessor;
-import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
+import org.apache.helix.util.RebalanceUtil;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -402,7 +392,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier
{
     ClusterEvent event = new ClusterEvent(ClusterEventType.StateVerifier);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
 
-    runStage(event, new ResourceComputationStage());
+    RebalanceUtil.runStage(event, new ResourceComputationStage());
 
     if (resources != null && !resources.isEmpty()) {
       // Filtering out all non-required resources
@@ -416,85 +406,26 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier
{
       event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMapToRebalance);
     }
 
-    runStage(event, new CurrentStateComputationStage());
-    // Note the dryrunWagedRebalancer is just for one time usage
-    DryrunWagedRebalancer dryrunWagedRebalancer =
-        new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
+    RebalanceUtil.runStage(event, new CurrentStateComputationStage());
+    // Note the readOnlyWagedRebalancer is just for one time usage
+    ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
+        new ReadOnlyWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
             cache.getClusterConfig().getGlobalRebalancePreference());
-    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
+    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), readOnlyWagedRebalancer);
     try {
-      runStage(event, new BestPossibleStateCalcStage());
+      RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
     } finally {
-      dryrunWagedRebalancer.close();
+      readOnlyWagedRebalancer.close();
     }
 
     BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     return output;
   }
 
-  private void runStage(ClusterEvent event, Stage stage) throws Exception {
-    StageContext context = new StageContext();
-    stage.init(context);
-    stage.preProcess();
-    stage.process(event);
-    stage.postProcess();
-  }
-
   @Override
   public String toString() {
     String verifierName = getClass().getSimpleName();
     return verifierName + "(" + _clusterName + "@" + _zkClient + "@resources["
        + (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
   }
-
-  /**
-   * A Dryrun WAGED rebalancer that only calculates the assignment based on the cluster status
but
-   * never update the rebalancer assignment metadata.
-   * This rebalacer is used in the verifiers or tests.
-   */
-  private class DryrunWagedRebalancer extends WagedRebalancer {
-    DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
-        Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-      super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs, clusterName),
-          ConstraintBasedAlgorithmFactory.getInstance(preferences), Optional.empty());
-    }
-
-    @Override
-    protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
-        ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-        Set<String> activeNodes, CurrentStateOutput currentStateOutput, RebalanceAlgorithm
algorithm)
-        throws HelixRebalanceException {
-      return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
-          resourceMap.keySet());
-    }
-  }
-
-  private class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
-    ReadOnlyAssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
-      super(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
-    }
-
-    @Override
-    public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline)
{
-      // If baseline hasn't changed, skip writing to metadata store
-      if (compareAssignments(_globalBaseline, globalBaseline)) {
-        return false;
-      }
-      // Update the in-memory reference only
-      _globalBaseline = globalBaseline;
-      return true;
-    }
-
-    @Override
-    public boolean persistBestPossibleAssignment(
-        Map<String, ResourceAssignment> bestPossibleAssignment) {
-      // If bestPossibleAssignment hasn't changed, skip writing to metadata store
-      if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
-        return false;
-      }
-      // Update the in-memory reference only
-      _bestPossibleAssignment = bestPossibleAssignment;
-      return true;
-    }
-  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 3480fb6..348ce07 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -28,23 +28,43 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Joiner;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyType;
+import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public final class HelixUtil {
   static private Logger LOG = LoggerFactory.getLogger(HelixUtil.class);
 
@@ -141,6 +161,82 @@ public final class HelixUtil {
   }
 
   /**
+   * Returns the expected ideal ResourceAssignments for the given resources in the cluster
+   * calculated using the read-only WAGED rebalancer.
+   * @param metadataStoreAddress
+   * @param clusterConfig
+   * @param instanceConfigs
+   * @param liveInstances
+   * @param idealStates
+   * @param resourceConfigs
+   * @return
+   */
+  public static Map<String, ResourceAssignment> getIdealAssignmentForWagedFullAuto(
+      String metadataStoreAddress, ClusterConfig clusterConfig,
+      List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+      List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
+    // Prepare a data accessor for a dataProvider (cache) refresh
+    BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
+    HelixDataAccessor helixDataAccessor =
+        new ZKHelixDataAccessor(clusterConfig.getClusterName(), baseDataAccessor);
+
+    // Create an instance of read-only WAGED rebalancer
+    ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
+        new ReadOnlyWagedRebalancer(metadataStoreAddress, clusterConfig.getClusterName(),
+            clusterConfig.getGlobalRebalancePreference());
+
+    // Use a dummy event to run the required stages for BestPossibleState calculation
+    // Attributes RESOURCES and RESOURCES_TO_REBALANCE are populated in ResourceComputationStage
+    ClusterEvent event = new ClusterEvent(clusterConfig.getClusterName(), ClusterEventType.Unknown);
+
+    try {
+      // Obtain a refreshed dataProvider (cache) and overwrite cluster parameters with the
given parameters
+      ResourceControllerDataProvider dataProvider =
+          new ResourceControllerDataProvider(clusterConfig.getClusterName());
+      dataProvider.requireFullRefresh();
+      dataProvider.refresh(helixDataAccessor);
+      dataProvider.setClusterConfig(clusterConfig);
+      dataProvider.setInstanceConfigMap(instanceConfigs.stream()
+          .collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity())));
+      dataProvider.setLiveInstances(
+          liveInstances.stream().map(LiveInstance::new).collect(Collectors.toList()));
+      dataProvider.setIdealStates(idealStates);
+      dataProvider.setResourceConfigMap(resourceConfigs.stream()
+          .collect(Collectors.toMap(ResourceConfig::getResourceName, Function.identity())));
+
+      event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
+      event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), readOnlyWagedRebalancer);
+
+      // Run the required stages to obtain the BestPossibleOutput
+      RebalanceUtil.runStage(event, new ResourceComputationStage());
+      RebalanceUtil.runStage(event, new CurrentStateComputationStage());
+      RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
+    } catch (Exception e) {
+      LOG.error("getIdealAssignmentForWagedFullAuto(): Failed to compute ResourceAssignments!",
e);
+    } finally {
+      // Close all ZK connections
+      baseDataAccessor.close();
+      readOnlyWagedRebalancer.close();
+    }
+
+    // Convert the resulting BestPossibleStateOutput to Map<String, ResourceAssignment>
+    Map<String, ResourceAssignment> result = new HashMap<>();
+    BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+    for (Resource resource : resourceMap.values()) {
+      String resourceName = resource.getResourceName();
+      PartitionStateMap partitionStateMap = output.getPartitionStateMap(resourceName);
+      ResourceAssignment resourceAssignment = new ResourceAssignment(resourceName);
+      for (Partition partition : resource.getPartitions()) {
+        resourceAssignment.addReplicaMap(partition, partitionStateMap.getPartitionMap(partition));
+      }
+      result.put(resourceName, resourceAssignment);
+    }
+    return result;
+  }
+
+  /**
    * This method provides the ideal state mapping with corresponding rebalance strategy
    * @param clusterConfig         The cluster config
    * @param instanceConfigs       List of all existing instance configs including disabled/down
instances
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 050762d..a16f421 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -28,6 +28,9 @@ import java.util.TreeMap;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.StateModelDefinition;
 import org.slf4j.Logger;
@@ -164,4 +167,25 @@ public class RebalanceUtil {
           clusterName);
     }
   }
+
+  /**
+   * runStage allows the run of individual stages. It can be used to mock a part of the Controller
+   * pipeline run.
+   *
+   * An example usage is as follows:
+   *       runStage(event, new ResourceComputationStage());
+   *       runStage(event, new CurrentStateComputationStage());
+   *       runStage(event, new BestPossibleStateCalcStage());
+   * By running these stages, we are able to obtain BestPossibleStateOutput in the event
object.
+   * @param event
+   * @param stage
+   * @throws Exception
+   */
+  public static void runStage(ClusterEvent event, Stage stage) throws Exception {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    stage.process(event);
+    stage.postProcess();
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
index f6f692a..ab8c93c 100644
--- a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
@@ -7,7 +7,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixException;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.api.config.RebalanceConfig;
 import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
 import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
@@ -22,6 +21,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * A rebalance tool that generate an resource partition assignment based on the input.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 1b804d1..2522696 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -29,6 +29,7 @@ import java.util.Set;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
@@ -36,14 +37,18 @@ import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.util.HelixUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -145,6 +150,51 @@ public class TestWagedRebalance extends ZkTestBase {
     }
   }
 
+  /**
+   * Use HelixUtil.getIdealAssignmentForWagedFullAuto() to compute the cluster-wide assignment
and
+   * verify that it matches with the result from the original WAGED rebalancer's algorithm
result.
+   */
+  @Test(dependsOnMethods = "test")
+  public void testRebalanceTool() throws InterruptedException {
+    // Create resources for testing
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+          _replica);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Read cluster parameters from ZK
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    ClusterConfig clusterConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+    List<InstanceConfig> instanceConfigs =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().instanceConfigs(), true);
+    List<String> liveInstances =
+        dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
+    List<IdealState> idealStates =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().idealStates(), true);
+    List<ResourceConfig> resourceConfigs =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().resourceConfigs(), true);
+
+    // Verify that utilResult contains the assignment for the resources added
+    Map<String, ResourceAssignment> utilResult = HelixUtil
+        .getIdealAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs, liveInstances,
+            idealStates, resourceConfigs);
+    Assert.assertNotNull(utilResult);
+    Assert.assertEquals(utilResult.size(), _allDBs.size());
+    for (IdealState idealState : idealStates) {
+      Assert.assertTrue(utilResult.containsKey(idealState.getResourceName()));
+      Assert.assertEquals(utilResult.get(idealState.getResourceName()).getRecord().getMapFields(),
+          idealState.getRecord().getMapFields());
+    }
+  }
+
   @Test(dependsOnMethods = "test")
   public void testWithInstanceTag() throws Exception {
     Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 892b7b1..3fa99f0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.util.RebalanceUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -292,14 +293,6 @@ public class TaskTestUtil {
     return cache;
   }
 
-  static void runStage(ClusterEvent event, Stage stage) throws Exception {
-    StageContext context = new StageContext();
-    stage.init(context);
-    stage.preProcess();
-    stage.process(event);
-    stage.postProcess();
-  }
-
   public static BestPossibleStateOutput calculateTaskSchedulingStage(WorkflowControllerDataProvider
cache,
       HelixManager manager) throws Exception {
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
@@ -331,7 +324,7 @@ public class TaskTestUtil {
     stages.add(new TaskGarbageCollectionStage());
 
     for (Stage stage : stages) {
-      runStage(event, stage);
+      RebalanceUtil.runStage(event, stage);
     }
 
     return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());


Mime
View raw message