[HELIX-788] HELIX: Fix DefaultPipeline so that it doesn't rebalance task resources
Helix CHO testing indicated that the default pipeline was rebalancing task framework resources.
This RB fixes this.
Changelist:
1. Change resourceMap to resourceToRebalance, which separates generic and task resources
2. Make logger use LogUtil to distinguish two pipelines
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/59536d39
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/59536d39
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/59536d39
Branch: refs/heads/master
Commit: 59536d39c85d3535408a40a46a1a60a4105ee6e4
Parents: dc25bac
Author: narendly <narendly@gmail.com>
Authored: Fri Nov 2 14:19:16 2018 -0700
Committer: narendly <narendly@gmail.com>
Committed: Fri Nov 2 14:19:16 2018 -0700
----------------------------------------------------------------------
.../stages/IntermediateStateCalcStage.java | 29 ++++++++++----------
.../stages/ResourceComputationStage.java | 2 +-
.../stages/TestStateTransitionPrirority.java | 3 +-
3 files changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 915a90f..9768bf7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -64,18 +64,19 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
BestPossibleStateOutput bestPossibleStateOutput =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+ Map<String, Resource> resourceToRebalance =
+ event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
- if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null
+ if (currentStateOutput == null || bestPossibleStateOutput == null || resourceToRebalance
== null
|| cache == null) {
throw new StageException(String.format("Missing attributes in event: %s. "
- + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache
(%s)",
- event, currentStateOutput, bestPossibleStateOutput, resourceMap, cache));
+ + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache
(%s)",
+ event, currentStateOutput, bestPossibleStateOutput, resourceToRebalance, cache));
}
IntermediateStateOutput intermediateStateOutput =
- compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput);
+ compute(event, resourceToRebalance, currentStateOutput, bestPossibleStateOutput);
event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), intermediateStateOutput);
// Make sure no instance has more replicas/partitions assigned than maxPartitionPerInstance.
If
@@ -146,9 +147,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
String resourceName = resourcePriority.getResourceName();
if (!bestPossibleStateOutput.containsResource(resourceName)) {
- logger.warn(
- "Skip calculating intermediate state for resource {} because the best possible
state is not available.",
- resourceName);
+ LogUtil.logInfo(logger, _eventId, String.format(
+ "Skip calculating intermediate state for resource %s because the best possible
state is not available.",
+ resourceName));
continue;
}
@@ -228,8 +229,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
instancePartitionCounts.put(instance, 0);
}
int partitionCount = instancePartitionCounts.get(instance); // Number of replicas
(from
- // different partitions) held
- // in this instance
+ // different partitions)
held
+ // in this instance
partitionCount++;
if (partitionCount > maxPartitionPerInstance) {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
@@ -376,7 +377,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
// ErrorOrRecovery is set
threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance();
partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold
is
- // set
+ // set
} else {
if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) {
// 0 is the default value so the old threshold has been set
@@ -736,8 +737,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
return RebalanceType.NONE; // No further action required
} else {
return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order
to
- // achieve BestPossibleState, load balance may be required
- // to shift replicas around
+ // achieve BestPossibleState, load balance may be
required
+ // to shift replicas around
}
}
@@ -904,4 +905,4 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
return matchedState;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 02a175a..15bae65 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -54,7 +54,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
Map<String, IdealState> idealStates = cache.getIdealStates();
- Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
+ Map<String, Resource> resourceMap = new LinkedHashMap<>();
Map<String, Resource> resourceToRebalance = new LinkedHashMap<>();
if (idealStates != null && idealStates.size() > 0) {
http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
index ef5139e..d4a6aaf 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java
@@ -176,7 +176,8 @@ public class TestStateTransitionPrirority extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.name(),
Collections.singletonMap(resourceName, resource));
- event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resource);
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ Collections.singletonMap(resourceName, resource));
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
runStage(event, new ReadClusterDataStage());
|