helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] helix git commit: Support selective update for IdealState
Date Mon, 02 Apr 2018 18:22:19 GMT
Repository: helix
Updated Branches:
  refs/heads/master 6b6ba3c26 -> 84ce019e0


Support selective update for IdealState

To have better performance for Pinot, we need some improvements for rebalance pipeline. One
of the improvement is trying to read as less as possible. So use version based IdealState
read will be very helpful.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/84ce019e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/84ce019e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/84ce019e

Branch: refs/heads/master
Commit: 84ce019e02dab4ae1c2eb101e387acab1ebf6308
Parents: f9e43c0
Author: Junkai Xue <jxue@linkedin.com>
Authored: Mon Mar 26 16:02:52 2018 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Mon Apr 2 11:21:59 2018 -0700

----------------------------------------------------------------------
 .../common/caches/BasicClusterDataCache.java    | 50 ++++++++++++++++++
 .../helix/common/caches/CurrentStateCache.java  | 43 +++-------------
 .../controller/stages/ClusterDataCache.java     | 54 +++++++++++++++++---
 .../TestClusterDataCacheSelectiveUpdate.java    | 21 ++++++--
 4 files changed, 122 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/84ce019e/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index 117d18a..d6e324d 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -19,12 +19,16 @@ package org.apache.helix.common.caches;
  * under the License.
  */
 
+import com.google.common.collect.Maps;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
@@ -105,6 +109,52 @@ public class BasicClusterDataCache {
   }
 
   /**
+   * Selective update Helix Cache by version
+   * @param accessor the HelixDataAccessor
+   * @param reloadKeys keys needs to be reload
+   * @param cachedKeys keys already exists in the cache
+   * @param cachedPropertyMap cached map of propertykey -> property object
+   * @param <T> the type of metadata
+   * @return
+   */
+  public static  <T extends HelixProperty> Map<PropertyKey, T> updateReloadProperties(
+      HelixDataAccessor accessor, List<PropertyKey> reloadKeys, List<PropertyKey>
cachedKeys,
+      Map<PropertyKey, T> cachedPropertyMap) {
+    // All new entries from zk not cached locally yet should be read from ZK.
+    Map<PropertyKey, T> refreshedPropertyMap = Maps.newHashMap();
+    List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys);
+    for (int i = 0; i < cachedKeys.size(); i++) {
+      PropertyKey key = cachedKeys.get(i);
+      HelixProperty.Stat stat = stats.get(i);
+      if (stat != null) {
+        T property = cachedPropertyMap.get(key);
+        if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat))
{
+          refreshedPropertyMap.put(key, property);
+        } else {
+          // need update from zk
+          reloadKeys.add(key);
+        }
+      } else {
+        LOG.warn("Stat is null for key: " + key);
+        reloadKeys.add(key);
+      }
+    }
+
+    List<T> reloadedProperty = accessor.getProperty(reloadKeys, true);
+    Iterator<PropertyKey> reloadKeyIter = reloadKeys.iterator();
+    for (T property : reloadedProperty) {
+      PropertyKey key = reloadKeyIter.next();
+      if (property != null) {
+        refreshedPropertyMap.put(key, property);
+      } else {
+        LOG.warn("Reload property is null for key: " + key);
+      }
+    }
+
+    return refreshedPropertyMap;
+  }
+
+  /**
    * Retrieves the ExternalView for all resources
    *
    * @return

http://git-wip-us.apache.org/repos/asf/helix/blob/84ce019e/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index 711a6d6..abf84b4 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
 import org.slf4j.Logger;
@@ -124,42 +125,14 @@ public class CurrentStateCache {
     List<PropertyKey> cachedKeys = Lists.newLinkedList(_currentStateCache.keySet());
     cachedKeys.retainAll(currentStateKeys);
 
-    List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys);
-    Map<PropertyKey, CurrentState> currentStatesMap = Maps.newHashMap();
-    for (int i = 0; i < cachedKeys.size(); i++) {
-      PropertyKey key = cachedKeys.get(i);
-      HelixProperty.Stat stat = stats.get(i);
-      if (stat != null) {
-        CurrentState property = _currentStateCache.get(key);
-        if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat))
{
-          currentStatesMap.put(key, property);
-        } else {
-          // need update from zk
-          reloadKeys.add(key);
-        }
-      } else {
-        LOG.warn("stat is null for key: " + key);
-        reloadKeys.add(key);
-      }
-    }
-
-    List<CurrentState> currentStates = accessor.getProperty(reloadKeys, true);
-    Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
-    for (CurrentState currentState : currentStates) {
-      PropertyKey key = csKeyIter.next();
-      if (currentState != null) {
-        currentStatesMap.put(key, currentState);
-      } else {
-        LOG.warn("CurrentState null for key: " + key);
-      }
-    }
-
-    _currentStateCache = Collections.unmodifiableMap(currentStatesMap);
+    _currentStateCache = Collections.unmodifiableMap(BasicClusterDataCache
+        .updateReloadProperties(accessor, reloadKeys, cachedKeys, _currentStateCache));
 
-    LOG.info("# of CurrentStates reload: " + reloadKeys.size() + ", skipped:" + (currentStateKeys.size()
-        - reloadKeys.size()));
-    LOG.info("Takes " + (System.currentTimeMillis() - start) + " ms to reload new current
states for cluster: "
-        + _clusterName);
+    LOG.info(
+        "# of CurrentStates reload: " + reloadKeys.size() + ", skipped:" + (currentStateKeys.size()
+            - reloadKeys.size()));
+    LOG.info("Takes " + (System.currentTimeMillis() - start)
+        + " ms to reload new current states for cluster: " + _clusterName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/84ce019e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 734b7f7..9f48529 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -19,10 +19,12 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,6 +34,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.BasicClusterDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
 import org.apache.helix.common.caches.TaskDataCache;
@@ -72,7 +75,7 @@ public class ClusterDataCache {
   private Map<String, LiveInstance> _liveInstanceMap;
   private Map<String, LiveInstance> _liveInstanceCacheMap;
   private Map<String, IdealState> _idealStateMap;
-  private Map<String, IdealState> _idealStateCacheMap;
+  private Map<String, IdealState> _idealStateCacheMap = Maps.newHashMap();
   private Map<String, StateModelDefinition> _stateModelDefMap;
   private Map<String, InstanceConfig> _instanceConfigMap;
   private Map<String, InstanceConfig> _instanceConfigCacheMap;
@@ -142,9 +145,9 @@ public class ClusterDataCache {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, Boolean.valueOf(false));
       clearCachedResourceAssignments();
-      _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates(), true);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Reload IdealStates: " + _idealStateCacheMap.keySet() + ". Takes " + (
+      _idealStateCacheMap = refreshIdealStates(accessor);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Reload IdealStates: " + _idealStateCacheMap.keySet() + ". Takes " + (
             System.currentTimeMillis() - start) + " ms");
       }
     }
@@ -154,21 +157,27 @@ public class ClusterDataCache {
       clearCachedResourceAssignments();
       _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
       _updateInstanceOfflineTime = true;
-      LOG.debug("Reload LiveInstances: " + _liveInstanceCacheMap.keySet());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reload LiveInstances: " + _liveInstanceCacheMap.keySet());
+      }
     }
 
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
       clearCachedResourceAssignments();
       _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(),
true);
-      LOG.debug("Reload InstanceConfig: " + _instanceConfigCacheMap.keySet());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reload InstanceConfig: " + _instanceConfigCacheMap.keySet());
+      }
     }
 
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, Boolean.valueOf(false));
       clearCachedResourceAssignments();
       _resourceConfigCacheMap = accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(),
true);
-      LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size());
+      }
     }
 
     _idealStateMap = new HashMap<>(_idealStateCacheMap);
@@ -663,6 +672,35 @@ public class ClusterDataCache {
     }
   }
 
+  private Map<String, IdealState> refreshIdealStates(HelixDataAccessor accessor) {
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<PropertyKey> currentIdealStateKeys = new ArrayList<>();
+    for (String idealState : accessor.getChildNames(keyBuilder.idealStates())) {
+      currentIdealStateKeys.add(keyBuilder.idealStates(idealState));
+    }
+
+    List<PropertyKey> cachedKeys = new ArrayList<>();
+    Map<PropertyKey, IdealState> cachedIdealStateMap = Maps.newHashMap();
+    for (String idealState : _idealStateCacheMap.keySet()) {
+      cachedKeys.add(keyBuilder.idealStates(idealState));
+      cachedIdealStateMap
+          .put(keyBuilder.idealStates(idealState), _idealStateCacheMap.get(idealState));
+    }
+    cachedKeys.retainAll(currentIdealStateKeys);
+
+    List<PropertyKey> reloadKeys = new LinkedList<>(currentIdealStateKeys);
+    reloadKeys.removeAll(cachedKeys);
+
+    Map<PropertyKey, IdealState> updatedMap = BasicClusterDataCache
+        .updateReloadProperties(accessor, reloadKeys, cachedKeys, cachedIdealStateMap);
+    Map<String, IdealState> newIdealStateMap = Maps.newHashMap();
+    for (IdealState idealState : updatedMap.values()) {
+      newIdealStateMap.put(idealState.getResourceName(), idealState);
+    }
+
+    return Collections.unmodifiableMap(newIdealStateMap);
+  }
+
   /**
    * Return the JobContext by resource name
    * @param resourceName
@@ -826,7 +864,7 @@ public class ClusterDataCache {
   }
 
 
-  protected void clearCachedResourceAssignments() {
+  public void clearCachedResourceAssignments() {
     _resourceAssignmentCache.clear();
     _idealMappingCache.clear();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/84ce019e/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
index de4e9d1..3271a71 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
@@ -65,10 +65,10 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase
     Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1);
 
     accessor.clearReadCounters();
-    // refresh again should read only idealstates.
+    // refresh again should read nothing as ideal state is same
     cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE);
     cache.refresh(accessor);
-    Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.LIVEINSTANCES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), 0);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 1);
@@ -116,9 +116,24 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase
 
     accessor.clearReadCounters();
 
-    // refresh again should read only new current states
+    // refresh again should read only new current states and new idealstate
+    cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE);
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.CURRENTSTATES), NODE_NR);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 1);
+
+    // Add more resources
+    accessor.clearReadCounters();
+
+    _setupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_2", _PARTITIONS, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_2", _replica);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_3", _PARTITIONS, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_3", _replica);
+
+    // Totally four resources. Two of them are newly added.
+    cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE);
+    cache.refresh(accessor);
+    Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 2);
   }
 
 


Mime
View raw message