helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [14/52] [abbrv] git commit: [HELIX-297] Support old rebalancer for backward compatibility, rb=15431
Date Wed, 20 Nov 2013 21:12:29 GMT
[HELIX-297] Support old rebalancer for backward compatibility, rb=15431


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/55c93516
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/55c93516
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/55c93516

Branch: refs/heads/helix-yarn
Commit: 55c935169eeb1e86825a533f4f5c101083d88364
Parents: ec36112
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Mon Nov 11 14:24:05 2013 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Mon Nov 11 14:24:05 2013 -0800

----------------------------------------------------------------------
 .../controller/rebalancer/CustomRebalancer.java |  62 +------
 .../rebalancer/FallbackRebalancer.java          | 185 +++++++++++++++++++
 .../util/ConstraintBasedAssignment.java         |  50 +++++
 .../stages/BestPossibleStateCalcStage.java      |  16 +-
 .../controller/stages/ResourceCurrentState.java |  11 ++
 .../java/org/apache/helix/model/Partition.java  |   2 +
 .../java/org/apache/helix/model/Resource.java   |   4 +-
 .../TestUserDefRebalancerCompatibility.java     | 104 +++++++++++
 8 files changed, 366 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 69c379a..5209e2c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -1,10 +1,8 @@
 package org.apache.helix.controller.rebalancer;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
@@ -61,65 +59,13 @@ public class CustomRebalancer implements HelixRebalancer {
       Map<ParticipantId, State> currentStateMap =
           currentState.getCurrentStateMap(config.getResourceId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
-          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
partition);
       Map<ParticipantId, State> bestStateForPartition =
-          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
-              stateModelDef, config.getPreferenceMap(partition), currentStateMap,
-              disabledInstancesForPartition);
+          ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
+              .getLiveParticipantMap().keySet(), stateModelDef, config.getPreferenceMap(partition),
+              currentStateMap, disabledInstancesForPartition);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
-
-  /**
-   * compute best state for resource in CUSTOMIZED rebalancer mode
-   * @param liveParticipantMap
-   * @param stateModelDef
-   * @param preferenceMap
-   * @param currentStateMap
-   * @param disabledParticipantsForPartition
-   * @return
-   */
-  private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
-      Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
-      Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
-      Set<ParticipantId> disabledParticipantsForPartition) {
-    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
-    // if the resource is deleted, idealStateMap will be null/empty and
-    // we should drop all resources.
-    if (currentStateMap != null) {
-      for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
-            && !disabledParticipantsForPartition.contains(participantId)) {
-          // if dropped and not disabled, transit to DROPPED
-          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
-        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
-            participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
-          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
-        }
-      }
-    }
-
-    // ideal state is deleted
-    if (preferenceMap == null) {
-      return participantStateMap;
-    }
-
-    for (ParticipantId participantId : preferenceMap.keySet()) {
-      boolean notInErrorState =
-          currentStateMap == null || currentStateMap.get(participantId) == null
-              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
-
-      if (liveParticipantSet.contains(participantId) && notInErrorState
-          && !disabledParticipantsForPartition.contains(participantId)) {
-        participantStateMap.put(participantId, preferenceMap.get(participantId));
-      }
-    }
-
-    return participantStateMap;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
new file mode 100644
index 0000000..fc4bfa0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -0,0 +1,185 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This class is intented for use to wrap usages of {@link Rebalancer}. It is subject to
removal
+ * once that class is removed.
+ */
+@SuppressWarnings("deprecation")
+public class FallbackRebalancer implements HelixRebalancer {
+  private static final Logger LOG = Logger.getLogger(FallbackRebalancer.class);
+  private HelixManager _helixManager;
+
+  @Override
+  public void init(HelixManager helixManager) {
+    _helixManager = helixManager;
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
+    // make sure the manager is not null
+    if (_helixManager == null) {
+      LOG.info("HelixManager is null!");
+      return null;
+    }
+
+    // get the context
+    PartitionedRebalancerContext context =
+        rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
+    if (context == null) {
+      LOG.info("Resource is not partitioned");
+      return null;
+    }
+
+    // get the ideal state and rebalancer class
+    ResourceId resourceId = context.getResourceId();
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(context.getStateModelDefId());
+    if (stateModelDef == null) {
+      LOG.info("StateModelDefinition unavailable for " + resourceId);
+      return null;
+    }
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceId.stringify()));
+    if (idealState == null) {
+      LOG.info("No IdealState available for " + resourceId);
+      return null;
+    }
+    String rebalancerClassName = idealState.getRebalancerClassName();
+    if (rebalancerClassName == null) {
+      LOG.info("No Rebalancer class available for " + resourceId);
+      return null;
+    }
+
+    // try to instantiate the rebalancer class
+    Rebalancer rebalancer = null;
+    try {
+      rebalancer =
+          (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+    } catch (Exception e) {
+      LOG.warn("rebalancer " + rebalancerClassName + " not available", e);
+    }
+    if (rebalancer == null) {
+      LOG.warn("Rebalancer class " + rebalancerClassName + " could not be instantiated for
"
+          + resourceId);
+      return null;
+    }
+
+    // get the cluster data cache (unfortunately involves a second read of the cluster)
+    ClusterDataCache cache = new ClusterDataCache();
+    cache.refresh(accessor);
+
+    // adapt ResourceCurrentState to CurrentStateOutput
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    for (ResourceId resource : currentState.getResourceIds()) {
+      currentStateOutput.setBucketSize(resource.stringify(), currentState.getBucketSize(resource));
+      currentStateOutput.setResourceStateModelDef(resource.stringify(), currentState
+          .getResourceStateModelDef(resource).stringify());
+      Set<PartitionId> partitions = currentState.getCurrentStateMappedPartitions(resource);
+      for (PartitionId partitionId : partitions) {
+        // set current state
+        Map<ParticipantId, State> currentStateMap =
+            currentState.getCurrentStateMap(resource, partitionId);
+        for (ParticipantId participantId : currentStateMap.keySet()) {
+          currentStateOutput.setCurrentState(resource.stringify(),
+              new Partition(partitionId.stringify()), participantId.stringify(), currentStateMap
+                  .get(participantId).toString());
+        }
+
+        // set pending current state
+        Map<ParticipantId, State> pendingStateMap =
+            currentState.getPendingStateMap(resource, partitionId);
+        for (ParticipantId participantId : pendingStateMap.keySet()) {
+          currentStateOutput.setPendingState(resource.stringify(),
+              new Partition(partitionId.stringify()), participantId.stringify(), pendingStateMap
+                  .get(participantId).toString());
+        }
+      }
+    }
+
+    // call the rebalancer
+    rebalancer.init(_helixManager);
+    IdealState newIdealState =
+        rebalancer.computeResourceMapping(resourceId.stringify(), idealState, currentStateOutput,
+            cache);
+
+    // do the resource assignments
+    ResourceAssignment assignment = new ResourceAssignment(resourceId);
+    if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+      // customized ideal state uses a map
+      for (PartitionId partitionId : newIdealState.getPartitionIdSet()) {
+        Set<ParticipantId> disabledParticipants =
+            ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+                partitionId);
+        Map<ParticipantId, State> replicaMap =
+            ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
+                .getLiveParticipantMap().keySet(), stateModelDef, newIdealState
+                .getParticipantStateMap(partitionId), currentState.getCurrentStateMap(resourceId,
+                partitionId), disabledParticipants);
+        assignment.addReplicaMap(partitionId, replicaMap);
+      }
+    } else {
+      // other modes use auto assignment
+      Map<State, String> upperBounds =
+          ConstraintBasedAssignment
+              .stateConstraints(stateModelDef, resourceId, cluster.getConfig());
+      for (PartitionId partitionId : newIdealState.getPartitionIdSet()) {
+        Set<ParticipantId> disabledParticipants =
+            ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+                partitionId);
+        Map<ParticipantId, State> replicaMap =
+            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
+                .getLiveParticipantMap().keySet(), stateModelDef, newIdealState
+                .getPreferenceList(partitionId), currentState.getCurrentStateMap(resourceId,
+                partitionId), disabledParticipants);
+        assignment.addReplicaMap(partitionId, replicaMap);
+      }
+    }
+    return assignment;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 84129de..7951784 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -260,4 +260,54 @@ public class ConstraintBasedAssignment {
     }
     return stateCountMap;
   }
+
+  /**
+   * compute best state for resource in CUSTOMIZED rebalancer mode
+   * @param liveParticipantMap
+   * @param stateModelDef
+   * @param preferenceMap
+   * @param currentStateMap
+   * @param disabledParticipantsForPartition
+   * @return
+   */
+  public static Map<ParticipantId, State> computeCustomizedBestStateForPartition(
+      Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
+      Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> disabledParticipantsForPartition) {
+    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
+
+    // if the resource is deleted, idealStateMap will be null/empty and
+    // we should drop all resources.
+    if (currentStateMap != null) {
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
+            && !disabledParticipantsForPartition.contains(participantId)) {
+          // if dropped and not disabled, transit to DROPPED
+          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+            participantId).equals(State.from(HelixDefinedState.ERROR)))
+            && disabledParticipantsForPartition.contains(participantId)) {
+          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (preferenceMap == null) {
+      return participantStateMap;
+    }
+
+    for (ParticipantId participantId : preferenceMap.keySet()) {
+      boolean notInErrorState =
+          currentStateMap == null || currentStateMap.get(participantId) == null
+              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
+
+      if (liveParticipantSet.contains(participantId) && notInErrorState
+          && !disabledParticipantsForPartition.contains(participantId)) {
+        participantStateMap.put(participantId, preferenceMap.get(participantId));
+      }
+    }
+    return participantStateMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 96b1ac8..7b143bd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -33,6 +33,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.FallbackRebalancer;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.context.RebalancerContext;
@@ -172,18 +173,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       }
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
+      RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+      StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
       ResourceAssignment resourceAssignment = null;
       if (rebalancerConfig != null) {
         HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
-        if (rebalancer != null) {
-          HelixManager manager = event.getAttribute("helixmanager");
-          rebalancer.init(manager);
-          resourceAssignment =
-              rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
+        HelixManager manager = event.getAttribute("helixmanager");
+        if (rebalancer == null) {
+          rebalancer = new FallbackRebalancer();
         }
+        rebalancer.init(manager);
+        resourceAssignment =
+            rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
       }
-      RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
-      StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
       if (resourceAssignment == null) {
         resourceAssignment =
             mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index 2f5ec1d..f04afd0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -68,6 +68,17 @@ public class ResourceCurrentState {
   }
 
   /**
+   * Get all the resources seen in the aggregated current state
+   * @return set of ResourceId
+   */
+  public Set<ResourceId> getResourceIds() {
+    Set<ResourceId> allResources = Sets.newHashSet();
+    allResources.addAll(_currentStateMap.keySet());
+    allResources.addAll(_pendingStateMap.keySet());
+    return allResources;
+  }
+
+  /**
    * @param resourceId
    * @param stateModelDefId
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/model/Partition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Partition.java b/helix-core/src/main/java/org/apache/helix/model/Partition.java
index 1d694ab..6a3e054 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Partition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Partition.java
@@ -21,7 +21,9 @@ package org.apache.helix.model;
 
 /**
  * A distinct partition of a resource
+ * Deprecated. Use {@link org.apache.helix.api.Partition}
  */
+@Deprecated
 public class Partition {
   private final String _partitionName;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/model/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Resource.java b/helix-core/src/main/java/org/apache/helix/model/Resource.java
index 1544514..437aca6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Resource.java
@@ -24,14 +24,12 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.helix.HelixConstants;
-import org.apache.log4j.Logger;
 
 /**
  * A resource contains a set of partitions and its replicas are managed by a state model
  */
+@Deprecated
 public class Resource {
-  private static Logger LOG = Logger.getLogger(Resource.class);
-
   private final String _resourceName;
   private final Map<String, Partition> _partitionMap;
   private String _stateModelDefRef;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
new file mode 100644
index 0000000..27004fe
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
@@ -0,0 +1,104 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("deprecation")
+public class TestUserDefRebalancerCompatibility extends
+    ZkStandAloneCMTestBaseWithPropertyServerCheck {
+  String db2 = TEST_DB + "2";
+  static boolean testRebalancerCreated = false;
+  static boolean testRebalancerInvoked = false;
+
+  public static class TestRebalancer implements Rebalancer {
+    @Override
+    public void init(HelixManager helixManager) {
+      testRebalancerCreated = true;
+    }
+
+    /**
+     * Very basic mapping that evenly assigns one replica of each partition to live nodes,
each of
+     * which is in the highest-priority state.
+     */
+    @Override
+    public IdealState computeResourceMapping(String resourceName, IdealState currentIdealState,
+        CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+      testRebalancerInvoked = true;
+      for (String partition : currentIdealState.getPartitionSet()) {
+        String instance = currentIdealState.getPreferenceList(partition).get(0);
+        currentIdealState.getPreferenceList(partition).clear();
+        currentIdealState.getPreferenceList(partition).add(instance);
+
+        currentIdealState.getInstanceStateMap(partition).clear();
+        currentIdealState.getInstanceStateMap(partition).put(instance, "MASTER");
+      }
+      currentIdealState.setReplicas("1");
+      return currentIdealState;
+    }
+  }
+
+  @Test
+  public void testCustomizedIdealStateRebalancer() throws InterruptedException {
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "MasterSlave");
+    _setupTool.addResourceProperty(CLUSTER_NAME, db2,
+        IdealStateProperty.REBALANCER_CLASS_NAME.toString(),
+        TestUserDefRebalancerCompatibility.TestRebalancer.class.getName());
+
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3);
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new TestCustomizedIdealStateRebalancer.ExternalViewBalancedVerifier(
+                _gZkClient, CLUSTER_NAME, db2));
+    Assert.assertTrue(result);
+    Thread.sleep(1000);
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
+    Assert.assertEquals(ev.getPartitionSet().size(), 60);
+    for (String partition : ev.getPartitionSet()) {
+      Assert.assertEquals(ev.getStateMap(partition).size(), 1);
+    }
+    IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
+    for (PartitionId partition : is.getPartitionIdSet()) {
+      Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
+      Assert.assertEquals(is.getParticipantStateMap(partition).size(), 3);
+    }
+    Assert.assertTrue(testRebalancerCreated);
+    Assert.assertTrue(testRebalancerInvoked);
+  }
+}


Mime
View raw message