helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: [HELIX-543] Avoid moving partitions unnecessarily when auto-rebalancing.
Date Tue, 25 Oct 2016 16:32:22 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 22cdb9d68 -> 45ebe7675


[HELIX-543] Avoid moving partitions unnecessarily when auto-rebalancing.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/45ebe767
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/45ebe767
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/45ebe767

Branch: refs/heads/helix-0.6.x
Commit: 45ebe767533a9c014bf37c30e4a6a62652538b5a
Parents: 22cdb9d
Author: Lei Xia <lxia@linkedin.com>
Authored: Tue Oct 25 09:01:35 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Tue Oct 25 09:04:30 2016 -0700

----------------------------------------------------------------------
 .../strategy/AutoRebalanceStrategy.java         |  64 ++++++++++--
 .../java/org/apache/helix/model/IdealState.java |   1 +
 .../strategy/TestAutoRebalanceStrategy.java     |  59 +++++++++++
 .../SinglePartitionLeaderStandByTest.java       | 103 +++++++++++++++++++
 4 files changed, 221 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/45ebe767/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
index 8edecdc..0385959 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.rebalancer.strategy;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -83,18 +84,28 @@ public class AutoRebalanceStrategy implements RebalanceStrategy {
     if (liveNodes.size() == 0) {
       return znRecord;
     }
-    int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
-    int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
+
+    List<String> sortedAllNodes = new ArrayList<String>(allNodes);
+    Collections.sort(sortedAllNodes);
+
+    Comparator<String> currentStateNodeComparator =
+        new CurrentStateNodeComparator(currentMapping);
+
+    List<String> sortedLiveNodes = new ArrayList<String>(liveNodes);
+    Collections.sort(sortedLiveNodes, currentStateNodeComparator);
+
+    int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
+    int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
     _nodeMap = new HashMap<String, Node>();
     _liveNodesList = new ArrayList<Node>();
 
-    for (String id : allNodes) {
+    for (String id : sortedAllNodes) {
       Node node = new Node(id);
       node.capacity = 0;
       node.hasCeilingCapacity = false;
       _nodeMap.put(id, node);
     }
-    for (int i = 0; i < liveNodes.size(); i++) {
+    for (int i = 0; i < sortedLiveNodes.size(); i++) {
       boolean usingCeiling = false;
       int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) :
distFloor;
       if (distRemainder > 0 && targetSize < _maximumPerNode) {
@@ -102,7 +113,7 @@ public class AutoRebalanceStrategy implements RebalanceStrategy {
         distRemainder = distRemainder - 1;
         usingCeiling = true;
       }
-      Node node = _nodeMap.get(liveNodes.get(i));
+      Node node = _nodeMap.get(sortedLiveNodes.get(i));
       node.isAlive = true;
       node.capacity = targetSize;
       node.hasCeilingCapacity = usingCeiling;
@@ -113,7 +124,7 @@ public class AutoRebalanceStrategy implements RebalanceStrategy {
     _stateMap = generateStateMap();
 
     // compute the preferred mapping if all nodes were up
-    _preferredAssignment = computePreferredPlacement(allNodes);
+    _preferredAssignment = computePreferredPlacement(sortedAllNodes);
 
     // logger.info("preferred mapping:"+ preferredAssignment);
     // from current mapping derive the ones in preferred location
@@ -778,4 +789,45 @@ public class AutoRebalanceStrategy implements RebalanceStrategy {
       return nodeNames.get(index);
     }
   }
+
+  /**
+   * Sorter for live nodes that sorts firstly according to the number of partitions currently
+   * registered against a node (more partitions means sort earlier), then by node name.
+   * This prevents unnecessarily moving partitions due to the capacity assignment
+   * unnecessarily reducing the capacity of lower down elements.
+   */
+  private static class CurrentStateNodeComparator implements Comparator<String> {
+
+    /**
+     * The number of partitions that are active for each participant.
+     */
+    private final Map<String, Integer> partitionCounts;
+
+    /**
+     * Create it.
+     * @param currentMapping The current mapping of partitions to participants.
+     */
+    public CurrentStateNodeComparator(Map<String, Map<String, String>> currentMapping)
{
+      partitionCounts = new HashMap<String, Integer>();
+      for (Entry<String, Map<String, String>> entry : currentMapping.entrySet())
{
+        for (String participantId : entry.getValue().keySet()) {
+          Integer existing = partitionCounts.get(participantId);
+          partitionCounts.put(participantId, existing != null ? existing + 1 : 1);
+        }
+      }
+    }
+
+    @Override
+    public int compare(String o1, String o2) {
+      Integer c1 = partitionCounts.get(o1);
+      if (c1 == null) {
+        c1 = 0;
+      }
+      Integer c2 = partitionCounts.get(o2);
+      if (c2 == null) {
+        c2 = 0;
+      }
+      return c1 < c2 ? 1 : (c1 > c2 ? -1 : o1.toString().compareTo(o2.toString()));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/45ebe767/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 55d4734..3cc0456 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -410,6 +410,7 @@ public class IdealState extends HelixProperty {
     // HACK: if replica doesn't exists, use the length of the first list field
     // instead
     // TODO: remove it when Dbus fixed the IdealState writer
+    // TODO: replica could be "ANY_INSTANCE".
     String replica = _record.getSimpleField(IdealStateProperty.REPLICAS.toString());
     if (replica == null) {
       String firstPartition = null;

http://git-wip-us.apache.org/repos/asf/helix/blob/45ebe767/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index fcae903..818d603 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -791,4 +791,63 @@ public class TestAutoRebalanceStrategy {
       Assert.assertEquals(p.size(), nReplicas);
     }
   }
+
+  /**
+   * Tests the following scenario: there is only a single partition for a resource. Two nodes
up,
+   * partition should
+   * be assigned to one of them. Take down that node, partition should move. Bring back up
that
+   * node, partition should not move unnecessarily.
+   */
+  @Test
+  public void testWontMoveSinglePartitionUnnecessarily() {
+    final String RESOURCE = "resource";
+    final String partition = "resource_0";
+    final StateModelDefinition STATE_MODEL =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+    LinkedHashMap<String, Integer> stateCount = Maps.newLinkedHashMap();
+    stateCount.put("ONLINE", 1);
+    final String[] NODES = {"n0", "n1"};
+
+    // initial state, one node, no mapping
+    List<String> allNodes = Lists.newArrayList(NODES);
+    List<String> liveNodes = Lists.newArrayList(NODES);
+    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+    currentMapping.put(partition, new HashMap<String, String>());
+
+    // Both nodes there
+    List<String> partitions = Lists.newArrayList(partition);
+    Map<String, String> upperBounds = Maps.newHashMap();
+    for (String state : STATE_MODEL.getStatesPriorityList()) {
+      upperBounds.put(state, STATE_MODEL.getNumInstancesPerState(state));
+    }
+
+    ZNRecord znRecord =
+        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    Map<String, List<String>> preferenceLists = znRecord.getListFields();
+    List<String> preferenceList = preferenceLists.get(partition.toString());
+    Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+    Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+    String state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0));
+    Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition);
+    String preferredNode = preferenceList.get(0);
+    String otherNode = preferredNode.equals(NODES[0]) ? NODES[1] : NODES[0];
+    // ok, see what happens if we've got the partition on the other node (e.g. due to the
preferred
+    // node being down).
+    currentMapping.get(partition).put(otherNode, state);
+
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+
+    preferenceLists = znRecord.getListFields();
+    preferenceList = preferenceLists.get(partition.toString());
+    Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+    Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+    state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0));
+    Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition);
+    String finalPreferredNode = preferenceList.get(0);
+    // finally, make sure we haven't moved it.
+    Assert.assertEquals(finalPreferredNode, otherNode);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/45ebe767/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
new file mode 100644
index 0000000..d07914e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
@@ -0,0 +1,103 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * This is a simple integration test. We will use this until we have framework
+ * which helps us write integration tests easily
+ */
+
+public class SinglePartitionLeaderStandByTest extends ZkIntegrationTestBase {
+  @Test
+  public void test()
+      throws Exception {
+    String clusterName = TestHelper.getTestMethodName();
+    int n = 4;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Thread.currentThread().join();
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        2, // partitions per resource
+        n, // number of nodes
+        0, // replicas
+        "LeaderStandby", RebalanceMode.FULL_AUTO, false); // dont rebalance
+
+    // rebalance ideal-state to use ANY_LIVEINSTANCE for preference list
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey key = keyBuilder.idealStates("TestDB0");
+    IdealState idealState = accessor.getProperty(key);
+    idealState.setReplicas(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString());
+    idealState.getRecord()
+        .setListField("TestDB0_0", Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString()));
+    accessor.setProperty(key, idealState);
+
+    ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName,
"controller_0");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result = ClusterStateVerifier
+        .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
+
+    Assert.assertTrue(result);
+    //stop the first participatn
+    participants[0].syncStop();
+    Thread.sleep(10000);
+    for (int i = 0; i < 1; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}


Mime
View raw message