helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 18/44: IntermediateStateCalcStage style change
Date Sat, 25 May 2019 01:19:52 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 402dd6d0b4fb25625b111318dfbd0c4892ff790b
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Fri Mar 29 12:13:48 2019 -0700

    IntermediateStateCalcStage style change
    
    This diff includes code style fixes and refactor using Java 8 features.
    
    RB=1613452
    BUG=HELIX-1742
    G=helix-reviewers
    A=jjwang
    
    Signed-off-by: Hunter Lee <hulee@linkedin.com>
---
 .../stages/IntermediateStateCalcStage.java         | 106 ++++++++++-----------
 1 file changed, 48 insertions(+), 58 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 888bd12..b6ab425 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -68,7 +68,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     Map<String, Resource> resourceToRebalance =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
-    ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
+    ResourceControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
 
     if (currentStateOutput == null || bestPossibleStateOutput == null || resourceToRebalance
== null
         || cache == null) {
@@ -137,7 +138,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
               dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
         }
       }
-      Collections.sort(prioritizedResourceList, new ResourcePriorityComparator());
+      prioritizedResourceList.sort(new ResourcePriorityComparator());
     }
 
     ClusterStatusMonitor clusterStatusMonitor =
@@ -160,8 +161,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       IdealState idealState = dataCache.getIdealState(resourceName);
       if (idealState == null) {
         // If IdealState is null, use an empty one
-        LogUtil.logInfo(logger, _eventId, String
-            .format("IdealState for resource %s does not exist; resource may not exist anymore",
+        LogUtil.logInfo(logger, _eventId,
+            String.format(
+                "IdealState for resource %s does not exist; resource may not exist anymore",
                 resourceName));
         idealState = new IdealState(resourceName);
         idealState.setStateModelDefRef(resource.getStateModelDefRef());
@@ -183,8 +185,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     if (clusterStatusMonitor != null) {
       clusterStatusMonitor.setResourceRebalanceStates(failedResources,
           ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
-      clusterStatusMonitor
-          .setResourceRebalanceStates(output.resourceSet(), ResourceMonitor.RebalanceStatus.NORMAL);
+      clusterStatusMonitor.setResourceRebalanceStates(output.resourceSet(),
+          ResourceMonitor.RebalanceStatus.NORMAL);
     }
 
     return output;
@@ -202,8 +204,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
    * @param intermediateStateOutput
    * @param maxPartitionPerInstance
    */
-  private void validateMaxPartitionsPerInstance(ClusterEvent event, ResourceControllerDataProvider
cache,
-      IntermediateStateOutput intermediateStateOutput, int maxPartitionPerInstance) {
+  private void validateMaxPartitionsPerInstance(ClusterEvent event,
+      ResourceControllerDataProvider cache, IntermediateStateOutput intermediateStateOutput,
+      int maxPartitionPerInstance) {
     Map<String, PartitionStateMap> resourceStatesMap =
         intermediateStateOutput.getResourceStatesMap();
     Map<String, Integer> instancePartitionCounts = new HashMap<>();
@@ -232,8 +235,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
             instancePartitionCounts.put(instance, 0);
           }
           int partitionCount = instancePartitionCounts.get(instance); // Number of replicas
(from
-                                                                      // different partitions)
held
-                                                                      // in this instance
+          // different partitions) held
+          // in this instance
           partitionCount++;
           if (partitionCount > maxPartitionPerInstance) {
             HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
@@ -346,19 +349,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
 
     if (!partitionsNeedRecovery.isEmpty()) {
-      LogUtil.logInfo(logger, _eventId, String
-          .format("Recovery balance needed for %s partitions: %s", resourceName,
-              partitionsNeedRecovery));
+      LogUtil.logInfo(logger, _eventId, String.format(
+          "Recovery balance needed for %s partitions: %s", resourceName, partitionsNeedRecovery));
     }
     if (!partitionsNeedLoadBalance.isEmpty()) {
-      LogUtil.logInfo(logger, _eventId, String
-          .format("Load balance needed for %s partitions: %s", resourceName,
-              partitionsNeedLoadBalance));
+      LogUtil.logInfo(logger, _eventId, String.format("Load balance needed for %s partitions:
%s",
+          resourceName, partitionsNeedLoadBalance));
     }
     if (!partitionsWithErrorStateReplica.isEmpty()) {
-      LogUtil.logInfo(logger, _eventId, String
-          .format("Partition currently has an ERROR replica in %s partitions: %s", resourceName,
-              partitionsWithErrorStateReplica));
+      LogUtil.logInfo(logger, _eventId,
+          String.format("Partition currently has an ERROR replica in %s partitions: %s",
+              resourceName, partitionsWithErrorStateReplica));
     }
 
     chargePendingTransition(resource, currentStateOutput, throttleController,
@@ -383,7 +384,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       // ErrorOrRecovery is set
       threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance();
       partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold
is
-                                                       // set
+      // set
     } else {
       if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) {
         // 0 is the default value so the old threshold has been set
@@ -413,8 +414,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           intermediatePartitionStateMap);
     }
 
-    LogUtil
-        .logDebug(logger, _eventId, String.format("End processing resource: %s", resourceName));
+    LogUtil.logDebug(logger, _eventId, String.format("End processing resource: %s", resourceName));
     return intermediatePartitionStateMap;
   }
 
@@ -530,17 +530,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
         currentStateOutput.getCurrentStateMap(resourceName);
     List<Partition> partitionsNeedRecoveryPrioritized = new ArrayList<>(partitionsNeedRecovery);
 
-    // TODO: Remove this sort by partition name when Java 1.8 is used
     // We want the result of the intermediate state calculation to be deterministic. We sort
here by
     // partition name to ensure that the order is consistent for inputs fed into
     // PartitionPriorityComparator sort
-    Collections.sort(partitionsNeedRecoveryPrioritized, new Comparator<Partition>()
{
-      @Override
-      public int compare(Partition partition1, Partition partition2) {
-        return partition1.getPartitionName().compareTo(partition2.getPartitionName());
-      }
-    });
-    Collections.sort(partitionsNeedRecoveryPrioritized, new PartitionPriorityComparator(
+    partitionsNeedRecoveryPrioritized.sort(Comparator.comparing(Partition::getPartitionName));
+    partitionsNeedRecoveryPrioritized.sort(new PartitionPriorityComparator(
         bestPossiblePartitionStateMap.getStateMap(), currentStateMap, topState, true));
 
     // For each partition, apply throttling if needed.
@@ -585,17 +579,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     List<Partition> partitionsNeedLoadRebalancePrioritized =
         new ArrayList<>(partitionsNeedLoadbalance);
 
-    // TODO: Remove this sort by partition name when Java 1.8 is used
     // We want the result of the intermediate state calculation to be deterministic. We sort
here by
     // partition name to ensure that the order is consistent for inputs fed into
     // PartitionPriorityComparator sort
-    Collections.sort(partitionsNeedLoadRebalancePrioritized, new Comparator<Partition>()
{
-      @Override
-      public int compare(Partition partition1, Partition partition2) {
-        return partition1.getPartitionName().compareTo(partition2.getPartitionName());
-      }
-    });
-    Collections.sort(partitionsNeedLoadRebalancePrioritized, new PartitionPriorityComparator(
+    partitionsNeedLoadRebalancePrioritized.sort(Comparator.comparing(Partition::getPartitionName));
+    partitionsNeedLoadRebalancePrioritized.sort(new PartitionPriorityComparator(
         bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", false));
 
     for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
@@ -618,10 +606,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
           intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache);
     }
-    LogUtil.logInfo(logger, _eventId, String.format(
-        "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing"
-            + " load-balance but throttled (not load-balanced): %d",
-        resourceName, partitionsNeedLoadbalance.size(), partitionsLoadbalanceThrottled.size()));
+    LogUtil.logInfo(logger, _eventId,
+        String.format(
+            "For resource %s: Num of partitions needing load-balance: %d, Num of partitions
needing"
+                + " load-balance but throttled (not load-balanced): %d",
+            resourceName, partitionsNeedLoadbalance.size(), partitionsLoadbalanceThrottled.size()));
     return partitionsLoadbalanceThrottled;
   }
 
@@ -656,9 +645,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId, String
-            .format("Throttled on partition: %s in resource: %s", partition.getPartitionName(),
-                resourceName));
+        LogUtil.logDebug(logger, _eventId,
+            String.format("Throttled on partition: %s in resource: %s",
+                partition.getPartitionName(), resourceName));
       }
     } else {
       // throttle if any of the instances are not able to accept state transitions
@@ -671,8 +660,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
             hasReachedThrottlingLimit = true;
             if (logger.isDebugEnabled()) {
-              LogUtil.logDebug(logger, _eventId, String
-                  .format("Throttled because of instance: %s for partition: %s in resource:
%s",
+              LogUtil.logDebug(logger, _eventId,
+                  String.format(
+                      "Throttled because of instance: %s for partition: %s in resource: %s",
                       instance, partition.getPartitionName(), resourceName));
             }
             break;
@@ -782,8 +772,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       return RebalanceType.NONE; // No further action required
     } else {
       return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order
to
-                                         // achieve BestPossibleState, load balance may be
required
-                                         // to shift replicas around
+      // achieve BestPossibleState, load balance may be required
+      // to shift replicas around
     }
   }
 
@@ -806,25 +796,25 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       PartitionStateMap intermediateStateMap) {
 
     if (logger.isDebugEnabled()) {
-      LogUtil.logDebug(logger, _eventId, String
-          .format("Partitions need recovery: %s\nPartitions get throttled on recovery: %s",
+      LogUtil.logDebug(logger, _eventId,
+          String.format("Partitions need recovery: %s\nPartitions get throttled on recovery:
%s",
               recoveryPartitions, recoveryThrottledPartitions));
-      LogUtil.logDebug(logger, _eventId, String
-          .format("Partitions need loadbalance: %s\nPartitions get throttled on load-balance:
%s",
+      LogUtil.logDebug(logger, _eventId,
+          String.format(
+              "Partitions need loadbalance: %s\nPartitions get throttled on load-balance:
%s",
               loadbalancePartitions, loadbalanceThrottledPartitions));
     }
 
     for (Partition partition : allPartitions) {
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId, String
-            .format("%s : Best possible map: %s", partition,
-                bestPossibleStateMap.getPartitionMap(partition)));
+        LogUtil.logDebug(logger, _eventId, String.format("%s : Best possible map: %s", partition,
+            bestPossibleStateMap.getPartitionMap(partition)));
         LogUtil.logDebug(logger, _eventId, String.format("%s : Current State: %s", partition,
             currentStateOutput.getCurrentStateMap(resource, partition)));
         LogUtil.logDebug(logger, _eventId, String.format("%s: Pending state: %s", partition,
             currentStateOutput.getPendingMessageMap(resource, partition)));
-        LogUtil.logDebug(logger, _eventId, String
-            .format("%s: Intermediate state: %s", partition, intermediateStateMap.getPartitionMap(partition)));
+        LogUtil.logDebug(logger, _eventId, String.format("%s: Intermediate state: %s", partition,
+            intermediateStateMap.getPartitionMap(partition)));
       }
     }
   }


Mime
View raw message