helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch customizeView updated: Add two stages for customized state view aggregation. (#888)
Date Wed, 18 Mar 2020 06:08:27 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch customizeView
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/customizeView by this push:
     new 966a4b8  Add two stages for customized state view aggregation. (#888)
966a4b8 is described below

commit 966a4b8838d33ce340d4713bc83ee49d88604f12
Author: zhangmeng916 <56051770+zhangmeng916@users.noreply.github.com>
AuthorDate: Tue Mar 17 23:08:20 2020 -0700

    Add two stages for customized state view aggregation. (#888)
    
    1. One stage is the computation stage for customized state. It takes the Zookeeper data
of customized states and converts them to the formatted output used by the other stage.
    2. The other stage is customized view aggregation stage. It will take the output from
the customized state computation stage, and output the customized view to Zookeeper.
    3. The two stages together compute the customized view from the customized states.
    4. Unit tests are added to verify the correctness of the two stages.
---
 .../helix/common/caches/CustomizedStateCache.java  |   6 +-
 .../dataproviders/BaseControllerDataProvider.java  |   6 +-
 .../ResourceControllerDataProvider.java            |  96 +++++++++++++
 .../helix/controller/pipeline/AsyncWorkerType.java |   3 +-
 .../stages/CustomizedStateComputationStage.java    |  93 +++++++++++++
 .../stages/CustomizedViewAggregationStage.java     | 150 +++++++++++++++++++++
 .../apache/helix/model/CustomizedStateConfig.java  |   2 +-
 .../TestCustomizedStateComputationStage.java       | 102 ++++++++++++++
 .../controller/stages/TestCustomizedViewStage.java | 106 +++++++++++++++
 .../TestControllerDataProviderSelectiveUpdate.java |   4 +-
 10 files changed, 562 insertions(+), 6 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedStateCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedStateCache.java
index 0875b00..ba78953 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedStateCache.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 
 public class CustomizedStateCache extends ParticipantStateCache<CustomizedState> {
   private static final Logger LOG = LoggerFactory.getLogger(CurrentStateCache.class.getName());
-  private final Set<String> _aggregationEnabledTypes;
+  private Set<String> _aggregationEnabledTypes;
 
   public CustomizedStateCache(String clusterName, Set<String> aggregationEnabledTypes)
{
     this(createDefaultControlContextProvider(clusterName), aggregationEnabledTypes);
@@ -60,4 +60,8 @@ public class CustomizedStateCache extends ParticipantStateCache<CustomizedState>
     }
     return participantStateKeys;
   }
+
+  public void setAggregationEnabledTypes(Set<String> aggregationEnabledTypes) {
+    _aggregationEnabledTypes = aggregationEnabledTypes;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 0cd9355..51b2e80 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -752,8 +752,12 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
     return sb;
   }
 
+  protected PropertyCache<LiveInstance> getLiveInstanceCache() {
+    return _liveInstanceCache;
+  }
+
   @Override
   public String toString() {
     return genCacheContentStringBuilder().toString();
   }
-}
+}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index b1dc215..fc4ee1b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -19,8 +19,10 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,10 +32,14 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.common.caches.AbstractDataCache;
+import org.apache.helix.common.caches.CustomizedStateCache;
+import org.apache.helix.common.caches.CustomizedViewCache;
 import org.apache.helix.common.caches.PropertyCache;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.stages.MissingTopStateRecord;
+import org.apache.helix.model.CustomizedState;
+import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.ResourceAssignment;
 import org.slf4j.Logger;
@@ -52,6 +58,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
   // Resource control specific property caches
   private final PropertyCache<ExternalView> _externalViewCache;
   private final PropertyCache<ExternalView> _targetExternalViewCache;
+  private final CustomizedStateCache _customizedStateCache;
+  // a map from customized state type to customized view cache
+  private final Map<String, CustomizedViewCache> _customizedViewCacheMap;
 
   // maintain a cache of bestPossible assignment across pipeline runs
   // TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache.
@@ -64,6 +73,8 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
   private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap;
   private Map<String, Map<String, String>> _lastTopStateLocationMap;
 
+  private Set<String> _aggregationEnabledTypes = new HashSet<>();
+
   public ResourceControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER);
   }
@@ -106,6 +117,8 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
     _idealMappingCache = new HashMap<>();
     _missingTopStateMap = new HashMap<>();
     _lastTopStateLocationMap = new HashMap<>();
+    _customizedStateCache = new CustomizedStateCache(this, _aggregationEnabledTypes);
+    _customizedViewCacheMap = new HashMap<>();
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
@@ -123,8 +136,12 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
     }
 
     // Refresh resource controller specific property caches
+    refreshCustomizedStateConfig(accessor);
+    _customizedStateCache.setAggregationEnabledTypes(_aggregationEnabledTypes);
+    _customizedStateCache.refresh(accessor, getLiveInstanceCache().getPropertyMap());
     refreshExternalViews(accessor);
     refreshTargetExternalViews(accessor);
+    refreshCustomizedViewMap(accessor);
     LogUtil.logInfo(logger, getClusterEventId(), String.format(
         "END: ResourceControllerDataProvider.refresh() for cluster %s, started at %d took
%d for %s pipeline",
         getClusterName(), startTime, System.currentTimeMillis() - startTime, getPipelineName()));
@@ -139,6 +156,27 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
     }
   }
 
+  private void refreshCustomizedStateConfig(final HelixDataAccessor accessor) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_STATE_CONFIG)
+        .getAndSet(false)) {
+      CustomizedStateConfig customizedStateConfig =
+          accessor.getProperty(accessor.keyBuilder().customizedStateConfig());
+      if (customizedStateConfig != null) {
+        _aggregationEnabledTypes =
+            new HashSet<>(customizedStateConfig.getAggregationEnabledTypes());
+      } else {
+        _aggregationEnabledTypes.clear();
+      }
+      LogUtil.logInfo(logger, getClusterEventId(), String
+          .format("Reloaded CustomizedStateConfig for cluster %s, %s pipeline.",
+              getClusterName(), getPipelineName()));
+    } else {
+      LogUtil.logInfo(logger, getClusterEventId(), String
+          .format("No customized state config change for %s cluster, %s pipeline",
+              getClusterName(), getPipelineName()));
+    }
+  }
+
   private void refreshExternalViews(final HelixDataAccessor accessor) {
     // As we are not listening on external view change, external view will be
     // refreshed once during the cache's first refresh() call, or when full refresh is required
@@ -156,6 +194,45 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
     }
   }
 
+  public void refreshCustomizedViewMap(final HelixDataAccessor accessor) {
+    // As we are not listening on customized view change, customized view will be
+    // refreshed once during the cache's first refresh() call, or when full refresh is required
+    List<String> newStateTypes = accessor.getChildNames(accessor.keyBuilder().customizedViews());
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_VIEW).getAndSet(false))
{
+      for (String stateType : newStateTypes) {
+        if (!_customizedViewCacheMap.containsKey(stateType)) {
+          CustomizedViewCache newCustomizedViewCache =
+              new CustomizedViewCache(getClusterName(), stateType);
+          _customizedViewCacheMap.put(stateType, newCustomizedViewCache);
+        }
+        _customizedViewCacheMap.get(stateType).refresh(accessor);
+      }
+      Set<String> previousCachedStateTypes = _customizedViewCacheMap.keySet();
+      previousCachedStateTypes.removeAll(newStateTypes);
+      logger.info("Remove customizedView for state: " + previousCachedStateTypes);
+      removeCustomizedViewTypes(new ArrayList<>(previousCachedStateTypes));
+    }
+  }
+
+  /**
+   * Provides the customized state of the node for a given state type (resource -> customizedState)
+   * @param instanceName
+   * @param customizedStateType
+   * @return
+   */
+  public Map<String, CustomizedState> getCustomizedState(String instanceName,
+      String customizedStateType) {
+    return _customizedStateCache.getParticipantState(instanceName, customizedStateType);
+  }
+
+  public Set<String> getAggregationEnabledCustomizedStateTypes() {
+    return _aggregationEnabledTypes;
+  }
+
+  protected void setAggregationEnabledCustomizedStateTypes(Set<String> aggregationEnabledTypes)
{
+    _aggregationEnabledTypes = aggregationEnabledTypes;
+  }
+
   public ExternalView getTargetExternalView(String resourceName) {
     return _targetExternalViewCache.getPropertyByName(resourceName);
   }
@@ -183,6 +260,14 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
   }
 
   /**
+   * Get local cached customized view map
+   * @return
+   */
+  public Map<String, CustomizedViewCache> getCustomizedViewCacheMap() {
+    return _customizedViewCacheMap;
+  }
+
+  /**
    * Remove dead external views from map
    * @param resourceNames
    */
@@ -193,6 +278,17 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
     }
   }
 
+  /**
+   * Remove dead customized views for certain state types from map
+   * @param stateTypeNames
+   */
+
+  private void removeCustomizedViewTypes(List<String> stateTypeNames) {
+    for (String stateType : stateTypeNames) {
+      _customizedViewCacheMap.remove(stateType);
+    }
+  }
+
   public Map<String, Map<String, MissingTopStateRecord>> getMissingTopStateMap()
{
     return _missingTopStateMap;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index ac938dc..e39050a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -31,5 +31,6 @@ public enum AsyncWorkerType {
   PersistAssignmentWorker,
   ExternalViewComputeWorker,
   MaintenanceRecoveryWorker,
-  TaskJobPurgeWorker
+  TaskJobPurgeWorker,
+  CustomizedStateViewComputeWorker
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
new file mode 100644
index 0000000..cb5f1a3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
@@ -0,0 +1,93 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CustomizedState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CustomizedStateComputationStage extends AbstractBaseStage {
+  private static Logger LOG = LoggerFactory.getLogger(CustomizedStateComputationStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
+    ResourceControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    final Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+    Set<String> aggregationEnabledTypes = cache.getAggregationEnabledCustomizedStateTypes();
+
+    if (cache == null || resourceMap == null) {
+      throw new StageException(
+          "Missing attributes in event:" + event + ". Requires DataCache|RESOURCE");
+    }
+
+    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+    final CustomizedStateOutput customizedStateOutput = new CustomizedStateOutput();
+
+    for (LiveInstance instance : liveInstances.values()) {
+      String instanceName = instance.getInstanceName();
+      // update customized states.
+      for (String customizedStateType : aggregationEnabledTypes) {
+        Map<String, CustomizedState> customizedStateMap =
+            cache.getCustomizedState(instanceName, customizedStateType);
+        updateCustomizedStates(instanceName, customizedStateType, customizedStateMap,
+            customizedStateOutput, resourceMap);
+      }
+    }
+    event.addAttribute(AttributeName.CUSTOMIZED_STATE.name(), customizedStateOutput);
+  }
+
+  // update customized state in CustomizedStateOutput
+  private void updateCustomizedStates(String instanceName, String customizedStateType,
+      Map<String, CustomizedState> customizedStates, CustomizedStateOutput customizedStateOutput,
+      Map<String, Resource> resourceMap) {
+    // for each CustomizedState, update corresponding entry in CustomizedStateOutput
+    for (CustomizedState customizedState : customizedStates.values()) {
+      String resourceName = customizedState.getResourceName();
+      Resource resource = resourceMap.get(resourceName);
+      if (resource == null) {
+        continue;
+      }
+
+      Map<String, String> partitionStateMap = customizedState
+          .getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE);
+      for (String partitionName : partitionStateMap.keySet()) {
+        Partition partition = resource.getPartition(partitionName);
+        if (partition != null) {
+          customizedStateOutput
+              .setCustomizedState(customizedStateType, resourceName, partition, instanceName,
+                  customizedState.getState(partitionName));
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
new file mode 100644
index 0000000..de0bc1c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
@@ -0,0 +1,150 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.common.caches.CustomizedViewCache;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CustomizedView;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
+  private static Logger LOG = LoggerFactory.getLogger(CustomizedViewAggregationStage.class);
+
+  @Override
+  public AsyncWorkerType getAsyncWorkerType() {
+    return AsyncWorkerType.CustomizedStateViewComputeWorker;
+  }
+
+  @Override
+  public void execute(final ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+    ResourceControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+
+    if (manager == null || resourceMap == null || cache == null) {
+      throw new StageException(
+          "Missing attributes in event:" + event + ". Requires ClusterManager|RESOURCES|DataCache");
+    }
+
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    CustomizedStateOutput customizedStateOutput =
+        event.getAttribute(AttributeName.CUSTOMIZED_STATE.name());
+
+    Map<String, CustomizedViewCache> customizedViewCacheMap = cache.getCustomizedViewCacheMap();
+
+    // remove stale customized view type from ZK and cache
+    List<String> customizedViewTypesToRemove = new ArrayList<>();
+    for (String stateType : customizedViewCacheMap.keySet()) {
+      if (!customizedStateOutput.getAllStateTypes().contains(stateType)) {
+        LogUtil.logInfo(LOG, _eventId, "Remove customizedView for stateType: " + stateType);
+        dataAccessor.removeProperty(keyBuilder.customizedView(stateType));
+        customizedViewTypesToRemove.add(stateType);
+      }
+    }
+
+    List<CustomizedView> updatedCustomizedViews = new ArrayList<>();
+    // update customized view
+    for (String stateType : customizedStateOutput.getAllStateTypes()) {
+      Map<String, CustomizedView> curCustomizedViews = new HashMap<>();
+      CustomizedViewCache customizedViewCache = customizedViewCacheMap.get(stateType);
+      if (customizedViewCache != null) {
+        curCustomizedViews = customizedViewCache.getCustomizedViewMap();
+      }
+
+      for (Resource resource : resourceMap.values()) {
+        try {
+          computeCustomizedStateView(resource, stateType, customizedStateOutput, curCustomizedViews,
+              updatedCustomizedViews);
+
+          List<PropertyKey> keys = new ArrayList<>();
+          for (Iterator<CustomizedView> it = updatedCustomizedViews.iterator(); it.hasNext();
) {
+            CustomizedView view = it.next();
+            String resourceName = view.getResourceName();
+            keys.add(keyBuilder.customizedView(stateType, resourceName));
+          }
+          // add/update customized-views
+          if (updatedCustomizedViews.size() > 0) {
+            dataAccessor.setChildren(keys, updatedCustomizedViews);
+          }
+
+          // remove stale customized views
+          for (String resourceName : curCustomizedViews.keySet()) {
+            if (!resourceMap.keySet().contains(resourceName)) {
+              LogUtil.logInfo(LOG, _eventId, "Remove customizedView for resource: " + resourceName);
+              dataAccessor.removeProperty(keyBuilder.customizedView(stateType, resourceName));
+            }
+          }
+        } catch (HelixException ex) {
+          LogUtil.logError(LOG, _eventId,
+              "Failed to calculate customized view for resource " + resource.getResourceName(),
ex);
+        }
+      }
+    }
+  }
+
+  private void computeCustomizedStateView(final Resource resource, final String stateType,
+      CustomizedStateOutput customizedStateOutput,
+      final Map<String, CustomizedView> curCustomizedViews,
+      List<CustomizedView> updatedCustomizedViews) {
+    String resourceName = resource.getResourceName();
+    CustomizedView view = new CustomizedView(resource.getResourceName());
+
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> customizedStateMap =
+          customizedStateOutput.getPartitionCustomizedStateMap(stateType, resourceName, partition);
+      if (customizedStateMap != null && customizedStateMap.size() > 0) {
+        for (String instance : customizedStateMap.keySet()) {
+          view.setState(partition.getPartitionName(), instance, customizedStateMap.get(instance));
+        }
+      }
+    }
+
+    CustomizedView curCustomizedView = curCustomizedViews.get(resourceName);
+
+    // compare the new customized view with current one, set only on different
+    if (curCustomizedView == null || !curCustomizedView.getRecord().equals(view.getRecord()))
{
+      // Add customized view to the list which will be written to ZK later.
+      updatedCustomizedViews.add(view);
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/CustomizedStateConfig.java b/helix-core/src/main/java/org/apache/helix/model/CustomizedStateConfig.java
index 67101f1..704660e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/CustomizedStateConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/CustomizedStateConfig.java
@@ -26,7 +26,7 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 
 /**
- * CustomizedStateAggregation configurations
+ * Customized state configurations
  */
 public class CustomizedStateConfig extends HelixProperty {
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedStateComputationStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedStateComputationStage.java
new file mode 100644
index 0000000..f8c1689
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedStateComputationStage.java
@@ -0,0 +1,102 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.CustomizedState;
+import org.apache.helix.model.CustomizedStateConfig;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestCustomizedStateComputationStage extends BaseStageTest {
+  private final String RESOURCE_NAME = "testResourceName";
+  private final String PARTITION_NAME = "testResourceName_0";
+  private final String CUSTOMIZED_STATE_NAME = "customizedState1";
+  private final String INSTANCE_NAME = "localhost_1";
+
+  @Test
+  public void testEmptyCustomizedState() {
+    Map<String, Resource> resourceMap = getResourceMap();
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(),
+        new ResourceControllerDataProvider(_clusterName));
+    CustomizedStateComputationStage stage = new CustomizedStateComputationStage();
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+    CustomizedStateOutput output = event.getAttribute(AttributeName.CUSTOMIZED_STATE.name());
+    AssertJUnit.assertEquals(output
+        .getPartitionCustomizedStateMap(CUSTOMIZED_STATE_NAME, RESOURCE_NAME,
+            new Partition("testResourceName_0")).size(), 0);
+  }
+
+  @Test
+  public void testSimpleCustomizedState() {
+    // setup resource
+    Map<String, Resource> resourceMap = getResourceMap();
+
+    setupLiveInstances(5);
+
+    // Add CustomizedStateAggregation Config
+    CustomizedStateConfig config = new CustomizedStateConfig();
+    List<String> aggregationEnabledTypes = new ArrayList<>();
+    aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME);
+
+    config.setAggregationEnabledTypes(aggregationEnabledTypes);
+
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(),
+        new ResourceControllerDataProvider(_clusterName));
+    CustomizedStateComputationStage stage = new CustomizedStateComputationStage();
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+    CustomizedStateOutput output1 = event.getAttribute(AttributeName.CUSTOMIZED_STATE.name());
+    AssertJUnit.assertEquals(output1
+        .getPartitionCustomizedStateMap(CUSTOMIZED_STATE_NAME, RESOURCE_NAME,
+            new Partition(PARTITION_NAME)).size(), 0);
+
+    // Add a customized state
+    CustomizedState customizedState = new CustomizedState(RESOURCE_NAME);
+    customizedState.setState(PARTITION_NAME, "STARTED");
+    accessor.setProperty(
+        keyBuilder.customizedState(INSTANCE_NAME, CUSTOMIZED_STATE_NAME, RESOURCE_NAME),
+        customizedState);
+
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, stage);
+    CustomizedStateOutput output2 = event.getAttribute(AttributeName.CUSTOMIZED_STATE.name());
+    Partition partition = new Partition(PARTITION_NAME);
+    AssertJUnit.assertEquals(output2
+        .getPartitionCustomizedState(CUSTOMIZED_STATE_NAME, RESOURCE_NAME, partition,
+            INSTANCE_NAME), "STARTED");
+  }
+}
\ No newline at end of file
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
new file mode 100644
index 0000000..a8a167b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
@@ -0,0 +1,106 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.CustomizedState;
+import org.apache.helix.model.CustomizedStateConfig;
+import org.apache.helix.model.CustomizedView;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestCustomizedViewStage extends ZkUnitTestBase {
+  private final String RESOURCE_NAME = "testResourceName";
+  private final String PARTITION_NAME = "testResourceName_0";
+  private final String CUSTOMIZED_STATE_NAME = "customizedState1";
+  private final String INSTANCE_NAME = "localhost_1";
+
+  @Test
+  public void testCachedCustomizedViews() throws Exception {
+    String clusterName = "CLUSTER_" + TestHelper.getTestMethodName();
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+    setupLiveInstances(clusterName, new int[]{0, 1});
+    setupStateModel(clusterName);
+
+    ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
+    ResourceControllerDataProvider cache = new ResourceControllerDataProvider(clusterName);
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+    CustomizedStateConfig config = new CustomizedStateConfig();
+    List<String> aggregationEnabledTypes = new ArrayList<>();
+    aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME);
+    config.setAggregationEnabledTypes(aggregationEnabledTypes);
+
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+    CustomizedState customizedState = new CustomizedState(RESOURCE_NAME);
+    customizedState.setState(PARTITION_NAME, "STARTED");
+    accessor
+        .setProperty(keyBuilder.customizedState(INSTANCE_NAME, "customizedState1", RESOURCE_NAME),
+            customizedState);
+
+    CustomizedViewAggregationStage customizedViewComputeStage =
+        new CustomizedViewAggregationStage();
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+    runPipeline(event, dataRefresh);
+    runStage(event, new ResourceComputationStage());
+    runStage(event, new CustomizedStateComputationStage());
+    runStage(event, customizedViewComputeStage);
+    Assert.assertEquals(cache.getCustomizedViewCacheMap().values(),
+        accessor.getChildValues(accessor.keyBuilder().customizedViews()));
+
+    // Assure there is no customized view got updated when running the stage again
+    List<CustomizedView> oldCustomizedViews =
+        accessor.getChildValues(accessor.keyBuilder().customizedViews());
+    runStage(event, customizedViewComputeStage);
+    List<CustomizedView> newCustomizedViews =
+        accessor.getChildValues(accessor.keyBuilder().customizedViews());
+    Assert.assertEquals(oldCustomizedViews, newCustomizedViews);
+    for (int i = 0; i < oldCustomizedViews.size(); i++) {
+      Assert.assertEquals(oldCustomizedViews.get(i).getStat().getVersion(),
+          newCustomizedViews.get(i).getStat().getVersion());
+    }
+
+    if (manager.isConnected()) {
+      manager.disconnect(); // For DummyClusterManager, this is not necessary
+    }
+    deleteLiveInstances(clusterName);
+    deleteCluster(clusterName);
+  }
+}
\ No newline at end of file
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerDataProviderSelectiveUpdate.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerDataProviderSelectiveUpdate.java
index a34c064..320aaf2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerDataProviderSelectiveUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerDataProviderSelectiveUpdate.java
@@ -49,7 +49,7 @@ public class TestControllerDataProviderSelectiveUpdate extends ZkStandAloneCMTes
     Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
     Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), NODE_NR);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR);
-    Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 1);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 2);
 
     accessor.clearReadCounters();
 
@@ -94,7 +94,7 @@ public class TestControllerDataProviderSelectiveUpdate extends ZkStandAloneCMTes
     Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
     Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), NODE_NR);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR);
-    Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 1);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), NODE_NR + 2);
 
     accessor.clearReadCounters();
 


Mime
View raw message