Added: incubator/helix/site-content/xref/org/apache/helix/tools/IdealStateCalculatorForStorageNode.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/xref/org/apache/helix/tools/IdealStateCalculatorForStorageNode.html?rev=1479756&view=auto ============================================================================== --- incubator/helix/site-content/xref/org/apache/helix/tools/IdealStateCalculatorForStorageNode.html (added) +++ incubator/helix/site-content/xref/org/apache/helix/tools/IdealStateCalculatorForStorageNode.html Tue May 7 03:10:53 2013 @@ -0,0 +1,805 @@ + + + + +IdealStateCalculatorForStorageNode xref + + + +
View Javadoc
+
+1   package org.apache.helix.tools;
+2   
+3   /*
+4    * Licensed to the Apache Software Foundation (ASF) under one
+5    * or more contributor license agreements.  See the NOTICE file
+6    * distributed with this work for additional information
+7    * regarding copyright ownership.  The ASF licenses this file
+8    * to you under the Apache License, Version 2.0 (the
+9    * "License"); you may not use this file except in compliance
+10   * with the License.  You may obtain a copy of the License at
+11   *
+12   *   http://www.apache.org/licenses/LICENSE-2.0
+13   *
+14   * Unless required by applicable law or agreed to in writing,
+15   * software distributed under the License is distributed on an
+16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+17   * KIND, either express or implied.  See the License for the
+18   * specific language governing permissions and limitations
+19   * under the License.
+20   */
+21  
+22  import java.util.ArrayList;
+23  import java.util.Collections;
+24  import java.util.List;
+25  import java.util.Map;
+26  import java.util.Random;
+27  import java.util.TreeMap;
+28  
+29  import org.apache.helix.HelixException;
+30  import org.apache.helix.ZNRecord;
+31  import org.apache.helix.model.IdealState.IdealStateProperty;
+32  
+33  
+34  /**
+35   * IdealStateCalculatorForStorageNode tries to optimally allocate master/slave partitions among
+36   * espresso storage nodes.
+37   *
+38   * Given a batch of storage nodes, the partition and replication factor, the algorithm first given a initial state
+39   * When new batches of storage nodes are added, the algorithm will calculate the new ideal state such that the total
+40   * partition movements are minimized.
+41   *
+42   */
+43  public class IdealStateCalculatorForStorageNode
+44  {
+45    static final String _MasterAssignmentMap = "MasterAssignmentMap";
+46    static final String _SlaveAssignmentMap = "SlaveAssignmentMap";
+47    static final String _partitions = "partitions";
+48    static final String _replicas = "replicas";
+49  
+50    /**
+51     * Calculate the initial ideal state given a batch of storage instances, the replication factor and
+52     * number of partitions
+53     *
+54     * 1. Calculate the master assignment by random shuffling
+55     * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
+56     * 3. for each storage instance, calculate the i-th slave assignment map
+57     * 4. Combine the i-th slave assignment maps together
+58     *
+59     * @param instanceNames
+60     *          list of storage node instances
+61     * @param partitions
+62     *          number of partitions
+63     * @param replicas
+64     *          The number of replicas (slave partitions) per master partition
+65     * @param masterStateValue
+66     *          master state value: e.g. "MASTER" or "LEADER"
+67     * @param slaveStateValue
+68     *          slave state value: e.g. "SLAVE" or "STANDBY"
+69     * @param resourceName
+70     * @return a ZNRecord that contain the idealstate info
+71     */
+72    public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions, int replicas, String resourceName,
+73                                               String masterStateValue, String slaveStateValue)
+74    {
+75      Collections.sort(instanceNames);
+76      if(instanceNames.size() < replicas + 1)
+77      {
+78        throw new HelixException("Number of instances must not be less than replicas + 1. "
+79                                        + "instanceNr:" + instanceNames.size()
+80                                        + ", replicas:" + replicas);
+81      }
+82      else if(partitions < instanceNames.size())
+83      {
+84        ZNRecord idealState = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames, partitions, replicas, resourceName, 12345, masterStateValue, slaveStateValue);
+85        int i = 0;
+86        for(String partitionId : idealState.getMapFields().keySet())
+87        {
+88          Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+89          List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+90          String masterInstance = "";
+91          for(String instanceName : partitionAssignmentMap.keySet())
+92          {
+93            if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
+94                && masterInstance.equals(""))
+95            {
+96              masterInstance = instanceName;
+97            }
+98            else
+99            {
+100             partitionAssignmentPriorityList.add(instanceName);
+101           }
+102         }
+103         Collections.shuffle(partitionAssignmentPriorityList, new Random(i++));
+104         partitionAssignmentPriorityList.add(0, masterInstance);
+105         idealState.setListField(partitionId, partitionAssignmentPriorityList);
+106       }
+107       return idealState;
+108     }
+109 
+110     Map<String, Object> result = calculateInitialIdealState(instanceNames, partitions, replicas);
+111 
+112     return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
+113   }
+114 
+115   public static ZNRecord calculateIdealStateBatch(List<List<String>> instanceBatches, int partitions, int replicas, String resourceName,
+116                                                   String masterStateValue, String slaveStateValue)
+117   {
+118     Map<String, Object> result = calculateInitialIdealState(instanceBatches.get(0), partitions, replicas);
+119 
+120     for(int i = 1; i < instanceBatches.size(); i++)
+121     {
+122       result = calculateNextIdealState(instanceBatches.get(i), result);
+123     }
+124 
+125     return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
+126   }
+127 
+128   /**
+129    * Convert the internal result (stored as a Map<String, Object>) into ZNRecord.
+130    */
+131   public static ZNRecord convertToZNRecord(Map<String, Object> result, String resourceName,
+132                                     String masterStateValue, String slaveStateValue)
+133   {
+134     Map<String, List<Integer>> nodeMasterAssignmentMap
+135     = (Map<String, List<Integer>>) (result.get(_MasterAssignmentMap));
+136     Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
+137         = (Map<String, Map<String, List<Integer>>>)(result.get(_SlaveAssignmentMap));
+138 
+139     int partitions = (Integer)(result.get("partitions"));
+140 
+141     ZNRecord idealState = new ZNRecord(resourceName);
+142     idealState.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+143 
+144 
+145     for(String instanceName : nodeMasterAssignmentMap.keySet())
+146     {
+147       for(Integer partitionId : nodeMasterAssignmentMap.get(instanceName))
+148       {
+149         String partitionName = resourceName+"_"+partitionId;
+150         if(!idealState.getMapFields().containsKey(partitionName))
+151         {
+152           idealState.setMapField(partitionName, new TreeMap<String, String>());
+153         }
+154         idealState.getMapField(partitionName).put(instanceName, masterStateValue);
+155       }
+156     }
+157 
+158     for(String instanceName : nodeSlaveAssignmentMap.keySet())
+159     {
+160       Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+161 
+162       for(String slaveNode: slaveAssignmentMap.keySet())
+163       {
+164         List<Integer> slaveAssignment = slaveAssignmentMap.get(slaveNode);
+165         for(Integer partitionId: slaveAssignment)
+166         {
+167           String partitionName = resourceName+"_"+partitionId;
+168           idealState.getMapField(partitionName).put(slaveNode, slaveStateValue);
+169         }
+170       }
+171     }
+172     // generate the priority list of instances per partition. Master should be at front and slave follows.
+173 
+174     for(String partitionId : idealState.getMapFields().keySet())
+175     {
+176       Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+177       List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+178       String masterInstance = "";
+179       for(String instanceName : partitionAssignmentMap.keySet())
+180       {
+181         if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
+182             && masterInstance.equals(""))
+183         {
+184           masterInstance = instanceName;
+185         }
+186         else
+187         {
+188           partitionAssignmentPriorityList.add(instanceName);
+189         }
+190       }
+191       Collections.shuffle(partitionAssignmentPriorityList);
+192       partitionAssignmentPriorityList.add(0, masterInstance);
+193       idealState.setListField(partitionId, partitionAssignmentPriorityList);
+194     }
+195     assert(result.containsKey("replicas"));
+196     idealState.setSimpleField(IdealStateProperty.REPLICAS.toString(), result.get("replicas").toString());
+197     return idealState;
+198   }
+199   /**
+200    * Calculate the initial ideal state given a batch of storage instances, the replication factor and
+201    * number of partitions
+202    *
+203    * 1. Calculate the master assignment by random shuffling
+204    * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
+205    * 3. for each storage instance, calculate the i-th slave assignment map
+206    * 4. Combine the i-th slave assignment maps together
+207    *
+208    * @param instanceNames
+209    *          list of storage node instances
+210    * @param weight
+211    *          weight for the initial storage node (each node has the same weight)
+212    * @param partitions
+213    *          number of partitions
+214    * @param replicas
+215    *          The number of replicas (slave partitions) per master partition
+216    * @return a map that contain the idealstate info
+217    */
+218   public static Map<String, Object> calculateInitialIdealState(List<String> instanceNames, int partitions, int replicas)
+219   {
+220     Random r = new Random(54321);
+221     assert(replicas <= instanceNames.size() - 1);
+222 
+223     ArrayList<Integer> masterPartitionAssignment = new ArrayList<Integer>();
+224     for(int i = 0;i< partitions; i++)
+225     {
+226       masterPartitionAssignment.add(i);
+227     }
+228     // shuffle the partition id array
+229     Collections.shuffle(masterPartitionAssignment, new Random(r.nextInt()));
+230 
+231     // 1. Generate the random master partition assignment
+232     //    instanceName -> List of master partitions on that instance
+233     Map<String, List<Integer>> nodeMasterAssignmentMap = new TreeMap<String, List<Integer>>();
+234     for(int i = 0; i < masterPartitionAssignment.size(); i++)
+235     {
+236       String instanceName = instanceNames.get(i % instanceNames.size());
+237       if(!nodeMasterAssignmentMap.containsKey(instanceName))
+238       {
+239         nodeMasterAssignmentMap.put(instanceName, new ArrayList<Integer>());
+240       }
+241       nodeMasterAssignmentMap.get(instanceName).add(masterPartitionAssignment.get(i));
+242     }
+243 
+244     // instanceName -> slave assignment for its master partitions
+245     // slave assignment: instanceName -> list of slave partitions on it
+246     List<Map<String, Map<String, List<Integer>>>> nodeSlaveAssignmentMapsList = new ArrayList<Map<String, Map<String, List<Integer>>>>(replicas);
+247 
+248     Map<String, Map<String, List<Integer>>> firstNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
+249     Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
+250 
+251     if(replicas > 0)
+252     {
+253       // 2. For each node, calculate the evenly distributed slave as the first slave assignment
+254       // We will figure out the 2nd ...replicas-th slave assignment based on the first level slave assignment
+255       for(int i = 0; i < instanceNames.size(); i++)
+256       {
+257         List<String> slaveInstances = new ArrayList<String>();
+258         ArrayList<Integer> slaveAssignment = new ArrayList<Integer>();
+259         TreeMap<String, List<Integer>> slaveAssignmentMap = new TreeMap<String, List<Integer>>();
+260 
+261         for(int j = 0;j < instanceNames.size(); j++)
+262         {
+263           if(j != i)
+264           {
+265             slaveInstances.add(instanceNames.get(j));
+266             slaveAssignmentMap.put(instanceNames.get(j), new ArrayList<Integer>());
+267           }
+268         }
+269         // Get the number of master partitions on instanceName
+270         List<Integer> masterAssignment =  nodeMasterAssignmentMap.get(instanceNames.get(i));
+271         // do a random shuffling as in step 1, so that the first-level slave are distributed among rest instances
+272 
+273 
+274         for(int j = 0;j < masterAssignment.size(); j++)
+275         {
+276           slaveAssignment.add(j);
+277         }
+278         Collections.shuffle(slaveAssignment, new Random(r.nextInt()));
+279 
+280         Collections.shuffle(slaveInstances, new Random(instanceNames.get(i).hashCode()));
+281 
+282         // Get the slave assignment map of node instanceName
+283         for(int j = 0;j < masterAssignment.size(); j++)
+284         {
+285           String slaveInstanceName = slaveInstances.get(slaveAssignment.get(j) % slaveInstances.size());
+286           if(!slaveAssignmentMap.containsKey(slaveInstanceName))
+287           {
+288             slaveAssignmentMap.put(slaveInstanceName, new ArrayList<Integer>());
+289           }
+290           slaveAssignmentMap.get(slaveInstanceName).add(masterAssignment.get(j));
+291         }
+292         firstNodeSlaveAssignmentMap.put(instanceNames.get(i), slaveAssignmentMap);
+293       }
+294       nodeSlaveAssignmentMapsList.add(firstNodeSlaveAssignmentMap);
+295       // From the first slave assignment map, calculate the rest slave assignment maps
+296       for(int replicaOrder = 1; replicaOrder < replicas; replicaOrder++)
+297       {
+298         // calculate the next slave partition assignment map
+299         Map<String, Map<String, List<Integer>>> nextNodeSlaveAssignmentMap
+300           = calculateNextSlaveAssignemntMap(firstNodeSlaveAssignmentMap, replicaOrder);
+301         nodeSlaveAssignmentMapsList.add(nextNodeSlaveAssignmentMap);
+302       }
+303 
+304       // Combine the calculated 1...replicas-th slave assignment map together
+305 
+306       for(String instanceName : nodeMasterAssignmentMap.keySet())
+307       {
+308         Map<String, List<Integer>> combinedSlaveAssignmentMap =  new TreeMap<String, List<Integer>>();
+309 
+310         for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
+311         {
+312           Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+313 
+314           for(String slaveInstance : slaveAssignmentMap.keySet())
+315           {
+316             if(!combinedSlaveAssignmentMap.containsKey(slaveInstance))
+317             {
+318               combinedSlaveAssignmentMap.put(slaveInstance, new ArrayList<Integer>());
+319             }
+320             combinedSlaveAssignmentMap.get(slaveInstance).addAll(slaveAssignmentMap.get(slaveInstance));
+321           }
+322         }
+323         migrateSlaveAssignMapToNewInstances(combinedSlaveAssignmentMap, new ArrayList<String>());
+324         combinedNodeSlaveAssignmentMap.put(instanceName, combinedSlaveAssignmentMap);
+325       }
+326     }
+327     /*
+328     // Print the result master and slave assignment maps
+329     System.out.println("Master assignment:");
+330     for(String instanceName : nodeMasterAssignmentMap.keySet())
+331     {
+332       System.out.println(instanceName+":");
+333       for(Integer x : nodeMasterAssignmentMap.get(instanceName))
+334       {
+335         System.out.print(x+" ");
+336       }
+337       System.out.println();
+338       System.out.println("Slave assignment:");
+339 
+340       int slaveOrder = 1;
+341       for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
+342       {
+343         System.out.println("Slave assignment order :" + (slaveOrder++));
+344         Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+345         for(String slaveName : slaveAssignmentMap.keySet())
+346         {
+347           System.out.print("\t" + slaveName +":\n\t" );
+348           for(Integer x : slaveAssignmentMap.get(slaveName))
+349           {
+350             System.out.print(x + " ");
+351           }
+352           System.out.println("\n");
+353         }
+354       }
+355       System.out.println("\nCombined slave assignment map");
+356       Map<String, List<Integer>> slaveAssignmentMap = combinedNodeSlaveAssignmentMap.get(instanceName);
+357       for(String slaveName : slaveAssignmentMap.keySet())
+358       {
+359         System.out.print("\t" + slaveName +":\n\t" );
+360         for(Integer x : slaveAssignmentMap.get(slaveName))
+361         {
+362           System.out.print(x + " ");
+363         }
+364         System.out.println("\n");
+365       }
+366     }*/
+367     Map<String, Object> result = new TreeMap<String, Object>();
+368     result.put("MasterAssignmentMap", nodeMasterAssignmentMap);
+369     result.put("SlaveAssignmentMap", combinedNodeSlaveAssignmentMap);
+370     result.put("replicas", new Integer(replicas));
+371     result.put("partitions", new Integer(partitions));
+372     return result;
+373   }
+374   /**
+375    * In the case there are more than 1 slave, we use the following algorithm to calculate the n-th slave
+376    * assignment map based on the first level slave assignment map.
+377    *
+378    * @param firstInstanceSlaveAssignmentMap  the first slave assignment map for all instances
+379    * @param order of the slave
+380    * @return the n-th slave assignment map for all the instances
+381    * */
+382   static Map<String, Map<String, List<Integer>>> calculateNextSlaveAssignemntMap(Map<String, Map<String, List<Integer>>> firstInstanceSlaveAssignmentMap, int replicaOrder)
+383   {
+384     Map<String, Map<String, List<Integer>>> result = new TreeMap<String, Map<String, List<Integer>>>();
+385 
+386     for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
+387     {
+388       Map<String, List<Integer>> resultAssignmentMap = new TreeMap<String, List<Integer>>();
+389       result.put(currentInstance, resultAssignmentMap);
+390     }
+391 
+392     for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
+393     {
+394       Map<String, List<Integer>> previousSlaveAssignmentMap = firstInstanceSlaveAssignmentMap.get(currentInstance);
+395       Map<String, List<Integer>> resultAssignmentMap = result.get(currentInstance);
+396       int offset = replicaOrder - 1;
+397       for(String instance : previousSlaveAssignmentMap.keySet())
+398       {
+399         List<String> otherInstances = new ArrayList<String>(previousSlaveAssignmentMap.size() - 1);
+400         // Obtain an array of other instances
+401         for(String otherInstance : previousSlaveAssignmentMap.keySet())
+402         {
+403           otherInstances.add(otherInstance);
+404         }
+405         Collections.sort(otherInstances);
+406         int instanceIndex = -1;
+407         for(int index = 0;index < otherInstances.size(); index++)
+408         {
+409           if(otherInstances.get(index).equalsIgnoreCase(instance))
+410           {
+411             instanceIndex = index;
+412           }
+413         }
+414         assert(instanceIndex >= 0);
+415         if(instanceIndex == otherInstances.size() - 1)
+416         {
+417           instanceIndex --;
+418         }
+419         // Since we need to evenly distribute the slaves on "instance" to other partitions, we
+420         // need to remove "instance" from the array.
+421         otherInstances.remove(instance);
+422 
+423         // distribute previous slave assignment to other instances.
+424         List<Integer> previousAssignmentList = previousSlaveAssignmentMap.get(instance);
+425         for(int i = 0; i < previousAssignmentList.size(); i++)
+426         {
+427 
+428           // Evenly distribute the previousAssignmentList to the remaining other instances
+429           int newInstanceIndex = (i + offset + instanceIndex) % otherInstances.size();
+430           String newInstance = otherInstances.get(newInstanceIndex);
+431           if(!resultAssignmentMap.containsKey(newInstance))
+432           {
+433             resultAssignmentMap.put(newInstance, new ArrayList<Integer>());
+434           }
+435           resultAssignmentMap.get(newInstance).add(previousAssignmentList.get(i));
+436         }
+437       }
+438     }
+439     return result;
+440   }
+441 
+442   /**
+443    * Given the current idealState, and the list of new Instances needed to be added, calculate the
+444    * new Ideal state.
+445    *
+446    * 1. Calculate how many master partitions should be moved to the new cluster of instances
+447    * 2. assign the number of master partitions px to be moved to each previous node
+448    * 3. for each previous node,
+449    *    3.1 randomly choose px nodes, move them to temp list
+450    *    3.2 for each px nodes, remove them from the slave assignment map; record the map position of
+451    *        the partition;
+452    *    3.3 calculate # of new nodes that should be put in the slave assignment map
+453    *    3.4 even-fy the slave assignment map;
+454    *    3.5 randomly place # of new nodes that should be placed in
+455    *
+456    * 4. from all the temp master node list get from 3.1,
+457    *    4.1 randomly assign them to nodes in the new cluster
+458    *
+459    * 5. for each node in the new cluster,
+460    *    5.1 assemble the slave assignment map
+461    *    5.2 even-fy the slave assignment map
+462    *
+463    * @param newInstances
+464    *          list of new added storage node instances
+465    * @param weight
+466    *          weight for the new storage nodes (each node has the same weight)
+467    * @param previousIdealState
+468    *          The previous ideal state
+469    * @return a map that contain the updated idealstate info
+470    * */
+471   public static Map<String, Object> calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState)
+472   {
+473     // Obtain the master / slave assignment info maps
+474     Collections.sort(newInstances);
+475     Map<String, List<Integer>> previousMasterAssignmentMap
+476         = (Map<String, List<Integer>>) (previousIdealState.get("MasterAssignmentMap"));
+477     Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
+478         = (Map<String, Map<String, List<Integer>>>)(previousIdealState.get("SlaveAssignmentMap"));
+479 
+480     List<String> oldInstances = new ArrayList<String>();
+481     for(String oldInstance : previousMasterAssignmentMap.keySet())
+482     {
+483       oldInstances.add(oldInstance);
+484     }
+485 
+486     int previousInstanceNum = previousMasterAssignmentMap.size();
+487     int partitions = (Integer)(previousIdealState.get("partitions"));
+488 
+489     // TODO: take weight into account when calculate this
+490 
+491     int totalMasterParitionsToMove
+492         = partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
+493     int numMastersFromEachNode = totalMasterParitionsToMove / previousInstanceNum;
+494     int remain = totalMasterParitionsToMove % previousInstanceNum;
+495 
+496     // Note that when remain > 0, we should make [remain] moves with (numMastersFromEachNode + 1) partitions.
+497     // And we should first choose those (numMastersFromEachNode + 1) moves from the instances that has more
+498     // master partitions
+499     List<Integer> masterPartitionListToMove = new ArrayList<Integer>();
+500 
+501     // For corresponding moved slave partitions, keep track of their original location; the new node does not
+502     // need to migrate all of them.
+503     Map<String, List<Integer>> slavePartitionsToMoveMap = new TreeMap<String, List<Integer>>();
+504 
+505     // Make sure that the instances that holds more master partitions are put in front
+506     List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
+507     for(String oldInstance : previousMasterAssignmentMap.keySet())
+508     {
+509       List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
+510       if(masterAssignmentList.size() > numMastersFromEachNode)
+511       {
+512         bigList.add(oldInstance);
+513       }
+514       else
+515       {
+516         smallList.add(oldInstance);
+517       }
+518     }
+519     // "sort" the list, such that the nodes that has more master partitions moves more partitions to the
+520     // new added batch of instances.
+521     bigList.addAll(smallList);
+522     int totalSlaveMoves = 0;
+523     for(String oldInstance : bigList)
+524     {
+525       List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
+526       int numToChoose = numMastersFromEachNode;
+527       if(remain > 0)
+528       {
+529         numToChoose = numMastersFromEachNode + 1;
+530         remain --;
+531       }
+532       // randomly remove numToChoose of master partitions to the new added nodes
+533       ArrayList<Integer> masterPartionsMoved = new ArrayList<Integer>();
+534       randomSelect(masterAssignmentList, masterPartionsMoved, numToChoose);
+535 
+536       masterPartitionListToMove.addAll(masterPartionsMoved);
+537       Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(oldInstance);
+538       removeFromSlaveAssignmentMap(slaveAssignmentMap, masterPartionsMoved, slavePartitionsToMoveMap);
+539 
+540       // Make sure that for old instances, the slave placement map is evenly distributed
+541       // Trace the "local slave moves", which should together contribute to most of the slave migrations
+542       int movesWithinInstance = migrateSlaveAssignMapToNewInstances(slaveAssignmentMap, newInstances);
+543       // System.out.println("local moves: "+ movesWithinInstance);
+544       totalSlaveMoves += movesWithinInstance;
+545     }
+546     // System.out.println("local slave moves total: "+ totalSlaveMoves);
+547     // calculate the master /slave assignment for the new added nodes
+548 
+549     // We already have the list of master partitions that will migrate to new batch of instances,
+550     // shuffle the partitions and assign them to new instances
+551     Collections.shuffle(masterPartitionListToMove, new Random(12345));
+552     for(int i = 0;i < newInstances.size(); i++)
+553     {
+554       String newInstance = newInstances.get(i);
+555       List<Integer> masterPartitionList = new ArrayList<Integer>();
+556       for(int j = 0;j < masterPartitionListToMove.size(); j++)
+557       {
+558         if(j % newInstances.size() == i)
+559         {
+560           masterPartitionList.add(masterPartitionListToMove.get(j));
+561         }
+562       }
+563 
+564       Map<String, List<Integer>> slavePartitionMap = new TreeMap<String, List<Integer>>();
+565       for(String oldInstance : oldInstances)
+566       {
+567         slavePartitionMap.put(oldInstance, new ArrayList<Integer>());
+568       }
+569       // Build the slave assignment map for the new instance, based on the saved information
+570       // about those slave partition locations in slavePartitionsToMoveMap
+571       for(Integer x : masterPartitionList)
+572       {
+573         for(String oldInstance : slavePartitionsToMoveMap.keySet())
+574         {
+575           List<Integer> slaves = slavePartitionsToMoveMap.get(oldInstance);
+576           if(slaves.contains(x))
+577           {
+578             slavePartitionMap.get(oldInstance).add(x);
+579           }
+580         }
+581       }
+582       // add entry for other new instances into the slavePartitionMap
+583       List<String> otherNewInstances = new ArrayList<String>();
+584       for(String instance : newInstances)
+585       {
+586         if(!instance.equalsIgnoreCase(newInstance))
+587         {
+588           otherNewInstances.add(instance);
+589         }
+590       }
+591       // Make sure that slave partitions are evenly distributed
+592       migrateSlaveAssignMapToNewInstances(slavePartitionMap, otherNewInstances);
+593 
+594       // Update the result in the result map. We can reuse the input previousIdealState map as
+595       // the result.
+596       previousMasterAssignmentMap.put(newInstance, masterPartitionList);
+597       nodeSlaveAssignmentMap.put(newInstance, slavePartitionMap);
+598 
+599     }
+600     /*
+601     // Print content of the master/ slave assignment maps
+602     for(String instanceName : previousMasterAssignmentMap.keySet())
+603     {
+604       System.out.println(instanceName+":");
+605       for(Integer x : previousMasterAssignmentMap.get(instanceName))
+606       {
+607         System.out.print(x+" ");
+608       }
+609       System.out.println("\nmaster partition moved:");
+610 
+611       System.out.println();
+612       System.out.println("Slave assignment:");
+613 
+614       Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+615       for(String slaveName : slaveAssignmentMap.keySet())
+616       {
+617         System.out.print("\t" + slaveName +":\n\t" );
+618         for(Integer x : slaveAssignmentMap.get(slaveName))
+619         {
+620           System.out.print(x + " ");
+621         }
+622         System.out.println("\n");
+623       }
+624     }
+625 
+626     System.out.println("Master partitions migrated to new instances");
+627     for(Integer x : masterPartitionListToMove)
+628     {
+629         System.out.print(x+" ");
+630     }
+631     System.out.println();
+632 
+633     System.out.println("Slave partitions migrated to new instances");
+634     for(String oldInstance : slavePartitionsToMoveMap.keySet())
+635     {
+636         System.out.print(oldInstance + ": ");
+637         for(Integer x : slavePartitionsToMoveMap.get(oldInstance))
+638         {
+639           System.out.print(x+" ");
+640         }
+641         System.out.println();
+642     }
+643     */
+644     return previousIdealState;
+645   }
+646   
+647   public ZNRecord calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState,
+648        String resourceName, String masterStateValue, String slaveStateValue)
+649   {
+650     return convertToZNRecord(calculateNextIdealState(newInstances, previousIdealState), resourceName, masterStateValue, slaveStateValue);
+651   }
+652   /**
+653    * Given the list of master partition that will be migrated away from the storage instance,
+654    * Remove their entries from the local instance slave assignment map.
+655    *
+656    * @param slaveAssignmentMap  the local instance slave assignment map
+657    * @param masterPartionsMoved the list of master partition ids that will be migrated away
+658    * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
+659    *        used by new added storage nodes.
+660    * */
+661   static void removeFromSlaveAssignmentMap( Map<String, List<Integer>>slaveAssignmentMap, List<Integer> masterPartionsMoved, Map<String, List<Integer>> removedAssignmentMap)
+662   {
+663     for(String instanceName : slaveAssignmentMap.keySet())
+664     {
+665       List<Integer> slaveAssignment = slaveAssignmentMap.get(instanceName);
+666       for(Integer partitionId: masterPartionsMoved)
+667       {
+668         if(slaveAssignment.contains(partitionId))
+669         {
+670           slaveAssignment.remove(partitionId);
+671           if(!removedAssignmentMap.containsKey(instanceName))
+672           {
+673             removedAssignmentMap.put(instanceName, new ArrayList<Integer>());
+674           }
+675           removedAssignmentMap.get(instanceName).add(partitionId);
+676         }
+677       }
+678     }
+679   }
+680 
+681   /**
+682    * Since some new storage instances are added, each existing storage instance should migrate some
+683    * slave partitions to the new added instances.
+684    *
+685    * The algorithm keeps moving one partition to from the instance that hosts most slave partitions
+686    * to the instance that hosts least number of partitions, until max-min <= 1.
+687    *
+688    * In this way we can guarantee that all instances hosts almost same number of slave partitions, also
+689    * slave partitions are evenly distributed.
+690    *
+691    * @param slaveAssignmentMap  the local instance slave assignment map
+692    * @param masterPartionsMoved the list of master partition ids that will be migrated away
+693    * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
+694    *        used by new added storage nodes.
+695    * */
+696   static int migrateSlaveAssignMapToNewInstances(Map<String, List<Integer>> nodeSlaveAssignmentMap, List<String> newInstances)
+697   {
+698     int moves = 0;
+699     boolean done = false;
+700     for(String newInstance : newInstances)
+701     {
+702       nodeSlaveAssignmentMap.put(newInstance, new ArrayList<Integer>());
+703     }
+704     while(!done)
+705     {
+706       List<Integer> maxAssignment = null, minAssignment = null;
+707       int minCount = Integer.MAX_VALUE, maxCount = Integer.MIN_VALUE;

[... 89 lines stripped ...]