helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: [HELIX-653] Fix enable/disable partition in instances for resource specific
Date Thu, 09 Feb 2017 21:24:28 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 118691e0b -> befcc65c2


[HELIX-653] Fix enable/disable partition in instances for resource specific

Helix currently enable/disable partition in instances across all the resources if partition
is same. Fix it with resource associated partition enable/disable.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/befcc65c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/befcc65c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/befcc65c

Branch: refs/heads/helix-0.6.x
Commit: befcc65c2e67df9767ab5d2eca837339894c5581
Parents: 118691e
Author: Junkai Xue <jxue@linkedin.com>
Authored: Thu Feb 9 11:36:32 2017 -0800
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Thu Feb 9 11:36:32 2017 -0800

----------------------------------------------------------------------
 .../rebalancer/AbstractRebalancer.java          |   2 +-
 .../controller/rebalancer/CustomRebalancer.java |   2 +-
 .../rebalancer/DelayedAutoRebalancer.java       |   2 +-
 .../controller/stages/ClusterDataCache.java     |   4 +-
 .../controller/stages/ReadClusterDataStage.java |   8 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  21 +--
 .../org/apache/helix/model/InstanceConfig.java  | 128 ++++++++++++++++---
 .../monitoring/mbeans/ClusterStatusMonitor.java |  16 ++-
 .../monitoring/mbeans/InstanceMonitor.java      |  27 ++--
 .../monitoring/mbeans/InstanceMonitorMBean.java |   6 +
 .../java/org/apache/helix/util/HelixUtil.java   |  14 ++
 .../helix/manager/zk/TestZkHelixAdmin.java      |  54 ++++++++
 12 files changed, 228 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 7bf2153..9cd2f96 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -88,7 +88,7 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
       Map<String, String> currentStateMap =
           currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
       Set<String> disabledInstancesForPartition =
-          cache.getDisabledInstancesForPartition(partition.toString());
+          cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
       List<String> preferenceList = ConstraintBasedAssignment
           .getPreferenceList(partition, idealState,
               Collections.unmodifiableSet(cache.getLiveInstances().keySet()));

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index a4f5e83..fede2b3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -65,7 +65,7 @@ public class CustomRebalancer extends AbstractRebalancer {
       Map<String, String> currentStateMap =
           currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
       Set<String> disabledInstancesForPartition =
-          cache.getDisabledInstancesForPartition(partition.toString());
+          cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
       Map<String, String> idealStateMap =
           idealState.getInstanceStateMap(partition.getPartitionName());
       Map<String, String> bestStateForPartition =

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 1f1d94f..9f52d4f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -318,7 +318,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
       Map<String, String> currentStateMap =
           currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
       Set<String> disabledInstancesForPartition =
-          cache.getDisabledInstancesForPartition(partition.toString());
+          cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
       List<String> preferenceList =
           ConstraintBasedAssignment.getPreferenceList(partition, idealState, activeNodes);
       Map<String, String> bestStateForPartition = ConstraintBasedAssignment

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 9cccc64..e72354f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -487,12 +487,12 @@ public class ClusterDataCache {
    * @param partition
    * @return
    */
-  public Set<String> getDisabledInstancesForPartition(String partition) {
+  public Set<String> getDisabledInstancesForPartition(String resource, String partition)
{
     Set<String> disabledInstancesSet = new HashSet<String>();
     for (String instance : _instanceConfigMap.keySet()) {
       InstanceConfig config = _instanceConfigMap.get(instance);
       if (config.getInstanceEnabled() == false
-          || config.getInstanceEnabledForPartition(partition) == false) {
+          || config.getInstanceEnabledForPartition(resource, partition) == false) {
         disabledInstancesSet.add(instance);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index abce878..521d315 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -67,7 +67,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
       Set<String> instanceSet = Sets.newHashSet();
       Set<String> liveInstanceSet = Sets.newHashSet();
       Set<String> disabledInstanceSet = Sets.newHashSet();
-      Map<String, Set<String>> disabledPartitions = Maps.newHashMap();
+      Map<String, Map<String, String>> disabledPartitions = Maps.newHashMap();
       Map<String, Set<String>> tags = Maps.newHashMap();
       Map<String, LiveInstance> liveInstanceMap = _cache.getLiveInstances();
       for (Map.Entry<String, InstanceConfig> e : _cache.getInstanceConfigMap().entrySet())
{
@@ -80,11 +80,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
         if (!config.getInstanceEnabled()) {
           disabledInstanceSet.add(instanceName);
         }
-        List<String> disabledPartitionsList = config.getDisabledPartitions();
-        Set<String> partitionNames =
-            disabledPartitionsList != null ? new HashSet<String>(config.getDisabledPartitions())
-                : new HashSet<String>();
-        disabledPartitions.put(instanceName, partitionNames);
+        disabledPartitions.put(instanceName, config.getDisabledPartitionsMap());
         Set<String> instanceTags = Sets.newHashSet(config.getTags());
         tags.put(instanceName, instanceTags);
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index b1ce406..378777f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -70,6 +70,7 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
+import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.apache.log4j.Logger;
 
@@ -300,24 +301,12 @@ public class ZKHelixAdmin implements HelixAdmin {
               + ", participant config is null");
         }
 
-        // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
-        List<String> list =
-            currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
-        Set<String> disabledPartitions = new HashSet<String>();
-        if (list != null) {
-          disabledPartitions.addAll(list);
+        InstanceConfig instanceConfig = new InstanceConfig(currentData);
+        for (String partitionName : partitionNames) {
+          instanceConfig.setInstanceEnabledForPartition(resourceName, partitionName, enabled);
         }
 
-        if (enabled) {
-          disabledPartitions.removeAll(partitionNames);
-        } else {
-          disabledPartitions.addAll(partitionNames);
-        }
-
-        list = new ArrayList<String>(disabledPartitions);
-        Collections.sort(list);
-        currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(),
list);
-        return currentData;
+        return instanceConfig.getRecord();
       }
     }, AccessOption.PERSISTENT);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 52edaa7..0db18fd 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -21,12 +21,15 @@ package org.apache.helix.model;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
 /**
@@ -223,12 +226,15 @@ public class InstanceConfig extends HelixProperty {
    * @param partition the partition name to check
    * @return true if the instance is enabled for the partition, false otherwise
    */
-  public boolean getInstanceEnabledForPartition(String partition) {
-    // Map<String, String> disabledPartitionMap =
-    // _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
-    List<String> disabledPartitions =
-        _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
-    if (disabledPartitions != null && disabledPartitions.contains(partition)) {
+  public boolean getInstanceEnabledForPartition(String resource, String partition) {
+    // TODO: Remove this old partition list check once old get API removed.
+    List<String> oldDisabledPartition =
+        _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+    Map<String, String> disabledPartitionsMap =
+        _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+    if ((disabledPartitionsMap != null && disabledPartitionsMap.containsKey(resource)
&& HelixUtil
+        .deserializeByComma(disabledPartitionsMap.get(resource)).contains(partition))
+        || oldDisabledPartition != null && oldDisabledPartition.contains(partition))
{
       return false;
     } else {
       return true;
@@ -237,34 +243,126 @@ public class InstanceConfig extends HelixProperty {
 
   /**
    * Get the partitions disabled by this instance
+   * This method will be deprecated since we persist disabled partitions
+   * based on instance and resource. The result will not be accurate as we
+   * union all the partitions disabled.
+   *
    * @return a list of partition names
    */
+  @Deprecated
   public List<String> getDisabledPartitions() {
-    return _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+    List<String> oldDisabled =
+        _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+    if (!_record.getMapFields().containsKey(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+        && oldDisabled == null) {
+      return null;
+    }
+
+    Set<String> disabledPartitions = new HashSet<String>();
+    if (oldDisabled != null) {
+      disabledPartitions.addAll(oldDisabled);
+    }
+
+    for (String perResource : _record
+        .getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).values()) {
+      disabledPartitions.addAll(HelixUtil.deserializeByComma(perResource));
+    }
+
+    return new ArrayList<String>(disabledPartitions);
+  }
+
+  /**
+   * Get the partitions disabled by resource on this instance
+   * @param resourceName  The resource of disabled partitions
+   * @return              A list of partition names if exists, otherwise will be null
+   */
+  public List<String> getDisabledPartitions(String resourceName) {
+    // TODO: Remove this logic getting data from list field when getDisabledParition() removed.
+    List<String> oldDisabled =
+        _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+    if ((!_record.getMapFields().containsKey(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+        || !_record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+        .containsKey(resourceName)) && oldDisabled == null) {
+      return null;
+    }
+
+    Set<String> disabledPartitions = new HashSet<String>();
+    if (oldDisabled != null) {
+      disabledPartitions.addAll(oldDisabled);
+    }
+
+    disabledPartitions.addAll(HelixUtil.deserializeByComma(
+        _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+            .get(resourceName)));
+
+    return new ArrayList<String>(disabledPartitions);
   }
 
   /**
-   * Set the enabled state for a partition on this instance
+   * Get a map that mapping resource name to disabled partitions
+   * @return A map of resource name mapping to disabled partitions
+   */
+  public Map<String, String> getDisabledPartitionsMap() {
+    return _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+  }
+
+  /**
+   * Set the enabled state for a partition on this instance across all the resources
+   *
    * @param partitionName the partition to set
    * @param enabled true to enable, false to disable
    */
+  @Deprecated
   public void setInstanceEnabledForPartition(String partitionName, boolean enabled) {
-    List<String> list =
-        _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+    Map<String, String> disabledPartitionMap =
+        _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+    for (String resourceName : disabledPartitionMap.keySet()) {
+      setInstanceEnabledForPartition(resourceName, partitionName, enabled);
+    }
+  }
+
+  public void setInstanceEnabledForPartition(String resourceName, String partitionName,
+      boolean enabled) {
+    // Get old disabled partitions if exists
+    // TODO: Remove this when getDisabledParition() removed.
+    List<String> oldDisabledPartitions =
+        _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+
+    Map<String, String> currentDisabled =
+        _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
     Set<String> disabledPartitions = new HashSet<String>();
-    if (list != null) {
-      disabledPartitions.addAll(list);
+
+    if (currentDisabled != null && currentDisabled.containsKey(resourceName)) {
+      disabledPartitions.addAll(HelixUtil.deserializeByComma(currentDisabled.get(resourceName)));
     }
 
     if (enabled) {
       disabledPartitions.remove(partitionName);
+      if (oldDisabledPartitions != null && oldDisabledPartitions.contains(partitionName))
{
+        oldDisabledPartitions.remove(partitionName);
+      }
     } else {
       disabledPartitions.add(partitionName);
     }
 
-    list = new ArrayList<String>(disabledPartitions);
-    Collections.sort(list);
-    _record.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
+    List<String> disabledPartitionList = new ArrayList<String>(disabledPartitions);
+    Collections.sort(disabledPartitionList);
+    if (currentDisabled == null) {
+      currentDisabled = new HashMap<String, String>();
+    }
+
+    if (disabledPartitionList != null) {
+      currentDisabled.put(resourceName, HelixUtil.serializeByComma(disabledPartitionList));
+    }
+
+    if (!currentDisabled.isEmpty()) {
+      _record.setMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name(), currentDisabled);
+    }
+
+    if (oldDisabledPartitions != null && !oldDisabledPartitions.isEmpty()) {
+      _record.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name(),
+          oldDisabledPartitions);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 90cdc5a..22d7209 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -46,6 +46,7 @@ import org.apache.helix.task.TaskState;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -74,7 +75,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private Set<String> _liveInstances = Collections.emptySet();
   private Set<String> _instances = Collections.emptySet();
   private Set<String> _disabledInstances = Collections.emptySet();
-  private Map<String, Set<String>> _disabledPartitions = Collections.emptyMap();
+  private Map<String, Map<String, String>> _disabledPartitions = Collections.emptyMap();
   private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
 
   private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap =
@@ -130,11 +131,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
     return _disabledInstances.size();
   }
 
-  @Override
-  public long getDisabledPartitionsGauge() {
+  @Override public long getDisabledPartitionsGauge() {
     int numDisabled = 0;
-    for (String instance : _disabledPartitions.keySet()) {
-      numDisabled += _disabledPartitions.get(instance).size();
+    for (Map<String, String> perInstance : _disabledPartitions.values()) {
+      for (String partitions : perInstance.values()) {
+        if (partitions != null) {
+          numDisabled += HelixUtil.deserializeByComma(partitions).size();
+        }
+      }
     }
     return numDisabled;
   }
@@ -196,7 +200,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
    * @param tags a map of instance name to the set of tags on it
    */
   public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String>
instanceSet,
-      Set<String> disabledInstanceSet, Map<String, Set<String>> disabledPartitions,
+      Set<String> disabledInstanceSet, Map<String, Map<String, String>>
disabledPartitions,
       Map<String, Set<String>> tags) {
     // Unregister beans for instances that are no longer configured
     Set<String> toUnregister = Sets.newHashSet(_instanceMbeanMap.keySet());

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index dc4a0a5..46d8a96 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -21,8 +21,12 @@ package org.apache.helix.monitoring.mbeans;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.util.HelixUtil;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -34,7 +38,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
   private final String _clusterName;
   private final String _participantName;
   private List<String> _tags;
-  private List<String> _disabledPartitions;
+  private long _disabledPartitions;
   private boolean _isUp;
   private boolean _isEnabled;
   private long _totalMessageReceived;
@@ -48,7 +52,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
     _clusterName = clusterName;
     _participantName = participantName;
     _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
-    _disabledPartitions = Collections.emptyList();
+    _disabledPartitions = 0L;
     _isUp = false;
     _isEnabled = false;
     _totalMessageReceived = 0;
@@ -75,6 +79,11 @@ public class InstanceMonitor implements InstanceMonitorMBean {
     return _totalMessageReceived;
   }
 
+  @Override
+  public long getDisabledPartitions() {
+    return _disabledPartitions;
+  }
+
   /**
    * Get all the tags currently on this instance
    * @return list of tags
@@ -110,7 +119,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
    * @param isLive true if running, false otherwise
    * @param isEnabled true if enabled, false if disabled
    */
-  public synchronized void updateInstance(Set<String> tags, Set<String> disabledPartitions,
+  public synchronized void updateInstance(Set<String> tags, Map<String, String>
disabledPartitions,
       boolean isLive, boolean isEnabled) {
     if (tags == null || tags.isEmpty()) {
       _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
@@ -118,11 +127,13 @@ public class InstanceMonitor implements InstanceMonitorMBean {
       _tags = Lists.newArrayList(tags);
       Collections.sort(_tags);
     }
-    if (disabledPartitions == null) {
-      _disabledPartitions = Collections.emptyList();
-    } else {
-      _disabledPartitions = Lists.newArrayList(disabledPartitions);
-      Collections.sort(_disabledPartitions);
+    _disabledPartitions = 0L;
+    if (disabledPartitions != null) {
+      for (String partitions : disabledPartitions.values()) {
+        if (partitions != null) {
+          _disabledPartitions += HelixUtil.deserializeByComma(partitions).size();
+        }
+      }
     }
     _isUp = isLive;
     _isEnabled = isEnabled;

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
index 4d949b1..a3221d8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
@@ -42,4 +42,10 @@ public interface InstanceMonitorMBean extends SensorNameProvider {
    * @return The total number of messages sent to this instance
    */
   public long getTotalMessageReceived();
+
+  /**
+   * Get the total disabled partitions number for this instance
+   * @return The total number of disabled partitions
+   */
+  public long getDisabledPartitions();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 15d2f7b..4adf8ab 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -19,6 +19,9 @@ package org.apache.helix.util;
  * under the License.
  */
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -68,6 +71,17 @@ public final class HelixUtil {
     return path.substring(path.lastIndexOf('/') + 1);
   }
 
+  public static String serializeByComma(List<String> objects) {
+    return String.join(",", objects);
+  }
+
+  public static List<String> deserializeByComma(String object) {
+    if (object.length() == 0) {
+      return Collections.EMPTY_LIST;
+    }
+    return Arrays.asList(object.split(","));
+  }
+
   /**
    * parse a csv-formated key-value pairs
    * @param keyValuePairs : csv-formatted key-value pairs. e.g. k1=v1,k2=v2,...

http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index e8f8f56..bb1b079 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -20,6 +20,7 @@ package org.apache.helix.manager.zk;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -407,4 +408,57 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     AssertJUnit.assertEquals(allResources.size(), 4);
     AssertJUnit.assertEquals(resourcesWithTag.size(), 2);
   }
+  @Test
+  public void testEnableDisablePartitions() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    String instanceName = "TestInstance";
+    String testResourcePrefix = "TestResource";
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+    admin.addInstance(clusterName, new InstanceConfig(instanceName));
+
+    // Test disable instances with resources
+    admin.enablePartition(false, clusterName, instanceName, testResourcePrefix + "0",
+        Arrays.asList(new String[]{"1", "2"}));
+    admin.enablePartition(false, clusterName, instanceName, testResourcePrefix + "1",
+        Arrays.asList(new String[]{"2", "3", "4"}));
+    InstanceConfig instanceConfig = admin.getInstanceConfig(clusterName, instanceName);
+    Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "0").size(),
2);
+    Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "1").size(),
3);
+
+    // Test enable partition across resources
+    instanceConfig.setInstanceEnabledForPartition("2", true);
+    Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "0").size(),
1);
+    Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "1").size(),
2);
+
+    // Test disable partition across resources
+    instanceConfig.setInstanceEnabledForPartition("10", false);
+    Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "0").size(),
2);
+    Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "1").size(),
3);
+  }
+
+  @Test
+  public void testLegacyEnableDisablePartition() {
+    String instanceName = "TestInstanceLegacy";
+    String testResourcePrefix = "TestResourceLegacy";
+    ZNRecord record = new ZNRecord(instanceName);
+    List<String> disabledPartitions =
+        new ArrayList<String>(Arrays.asList(new String[] { "1", "2", "3" }));
+    record.setListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name(),
+        disabledPartitions);
+    InstanceConfig instanceConfig = new InstanceConfig(record);
+    instanceConfig.setInstanceEnabledForPartition(testResourcePrefix, "2", false);
+    Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix).size(),
3);
+    Assert.assertEquals(instanceConfig.getRecord()
+            .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
+        3);
+    instanceConfig.setInstanceEnabledForPartition(testResourcePrefix, "2", true);
+    Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix).size(),
2);
+    Assert.assertEquals(instanceConfig.getRecord()
+            .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
+        2);
+  }
 }


Mime
View raw message