helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch master updated: Fix RoutingTableProvider statePropagationLatency metric reporting bug (#365)
Date Tue, 06 Aug 2019 18:58:21 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 909f962  Fix RoutingTableProvider statePropagationLatency metric reporting bug (#365)
909f962 is described below

commit 909f9621f22892238802417a32a6261ac9b8e6be
Author: kaisun2000 <52840222+kaisun2000@users.noreply.github.com>
AuthorDate: Tue Aug 6 11:58:16 2019 -0700

    Fix RoutingTableProvider statePropagationLatency metric reporting bug (#365)
    
    Issue:
    
    CurrentStateCache updating snapshot would miss all the existing partitions that having
state change.
    
    RoutingTableProvider callback on the main event thread. Time is not accounted in log.
    
    Description:
    fix the bug by updating the snapshot with the correct reloadkeys.
    
    enhanced log to accout for user callback code separately.
    
    Tests:
    mvn test passed.
---
 .../helix/common/caches/AbstractDataCache.java     |  13 ++-
 .../helix/common/caches/CurrentStateCache.java     |   7 +-
 .../helix/common/caches/CurrentStateSnapshot.java  |  22 +++-
 .../helix/common/caches/ExternalViewCache.java     |   4 +-
 .../helix/common/caches/IdealStateCache.java       |   4 +-
 .../apache/helix/common/caches/PropertyCache.java  |   5 +-
 .../helix/spectator/RoutingTableProvider.java      |  23 +++--
 .../common/caches/TestCurrentStateSnapshot.java    | 112 +++++++++++++++++++++
 8 files changed, 169 insertions(+), 21 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
index aaae8ef..4387083 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
@@ -20,9 +20,11 @@ package org.apache.helix.common.caches;
  */
 
 import com.google.common.collect.Maps;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
@@ -48,15 +50,17 @@ public abstract class AbstractDataCache<T extends HelixProperty>
{
    * Selectively fetch Helix Properties from ZK by comparing the version of local cached
one with the one on ZK.
    * If version on ZK is newer, fetch it from zk and update local cache.
    * @param accessor the HelixDataAccessor
-   * @param reloadKeys keys needs to be reload
+   * @param reloadKeysIn keys needs to be reload
    * @param cachedKeys keys already exists in the cache
    * @param cachedPropertyMap cached map of propertykey -> property object
+   * @param reloadedKeys keys actually reloaded; may include more keys than reloadKeysIn
    * @return updated properties map
    */
   protected Map<PropertyKey, T> refreshProperties(
-      HelixDataAccessor accessor, List<PropertyKey> reloadKeys, List<PropertyKey>
cachedKeys,
-      Map<PropertyKey, T> cachedPropertyMap) {
+      HelixDataAccessor accessor, Set<PropertyKey> reloadKeysIn, List<PropertyKey>
cachedKeys,
+      Map<PropertyKey, T> cachedPropertyMap, Set<PropertyKey> reloadedKeys) {
     // All new entries from zk not cached locally yet should be read from ZK.
+    List<PropertyKey> reloadKeys = new ArrayList<>(reloadKeysIn);
     Map<PropertyKey, T> refreshedPropertyMap = Maps.newHashMap();
     List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys);
     for (int i = 0; i < cachedKeys.size(); i++) {
@@ -77,6 +81,9 @@ public abstract class AbstractDataCache<T extends HelixProperty> {
       }
     }
 
+    reloadedKeys.clear();
+    reloadedKeys.addAll(reloadKeys);
+
     List<T> reloadedProperty = accessor.getProperty(reloadKeys, true);
     Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
     for (T property : reloadedProperty) {
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index e584956..c4801cc 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -135,13 +135,14 @@ public class CurrentStateCache extends AbstractDataCache<CurrentState>
{
     Set<PropertyKey> cachedKeys = new HashSet<>(_currentStateCache.keySet());
     cachedKeys.retainAll(currentStateKeys);
 
+    Set<PropertyKey> reloadedKeys = new HashSet<>();
     Map<PropertyKey, CurrentState> newStateCache = Collections.unmodifiableMap(
-        refreshProperties(accessor, new ArrayList<>(reloadKeys), new ArrayList<>(cachedKeys),
-            _currentStateCache));
+        refreshProperties(accessor, reloadKeys, new ArrayList<>(cachedKeys),
+            _currentStateCache, reloadedKeys));
 
     // if the cache was not initialized, the previous state should not be included in the
snapshot
     if (_initialized) {
-      _snapshot = new CurrentStateSnapshot(newStateCache, _currentStateCache, reloadKeys);
+      _snapshot = new CurrentStateSnapshot(newStateCache, _currentStateCache, reloadedKeys);
     } else {
       _snapshot = new CurrentStateSnapshot(newStateCache);
       _initialized = true;
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
index dff9861..1e405a4 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
@@ -8,8 +8,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 public class CurrentStateSnapshot extends AbstractDataSnapshot<CurrentState> {
+  private static final Logger LOG = LoggerFactory.getLogger(CurrentStateSnapshot.class.getName());
+
   private Set<PropertyKey> _updatedStateKeys = null;
   private Map<PropertyKey, CurrentState> _prevStateMap = null;
 
@@ -32,6 +37,7 @@ public class CurrentStateSnapshot extends AbstractDataSnapshot<CurrentState>
{
     if (_updatedStateKeys != null && _prevStateMap != null) {
       // Note if the prev state map is empty, this is the first time refresh.
       // So the update is not considered as "recent" change.
+      int driftCnt = 0; // clock drift count for comparing timestamp
       for (PropertyKey propertyKey : _updatedStateKeys) {
         CurrentState prevState = _prevStateMap.get(propertyKey);
         CurrentState curState = _properties.get(propertyKey);
@@ -39,11 +45,25 @@ public class CurrentStateSnapshot extends AbstractDataSnapshot<CurrentState>
{
         Map<String, Long> partitionUpdateEndTimes = null;
         for (String partition : curState.getPartitionStateMap().keySet()) {
           long newEndTime = curState.getEndTime(partition);
-          if (prevState == null || prevState.getEndTime(partition) < newEndTime) {
+          // if prevState is null, and newEndTime is -1, we should not record -1 in endTimeMap;
otherwise,
+          // statePropagation latency calculation in RoutingTableProvider would spit out
extremely large metrics.
+          if ((prevState == null || prevState.getEndTime(partition) < newEndTime) &&
newEndTime != -1) {
             if (partitionUpdateEndTimes == null) {
               partitionUpdateEndTimes = new HashMap<>();
             }
             partitionUpdateEndTimes.put(partition, newEndTime);
+          } else if (prevState != null && prevState.getEndTime(partition) > newEndTime)
{
+            // This can happen due to clock drift.
+            // updatedStateKeys is the path to resource in an instance config.
+            // Thus, the space of inner loop is Sigma{replica(i) * partition(i)}; i over
all resources in the cluster
+            // This space can be large. In order not to print too many lines, we print first
warning for the first case.
+            // If clock drift turns out to be common, we can consider print out more logs,
or expose an metric.
+            if (driftCnt < 1) {
+              LOG.warn(
+                  "clock drift. partition:" + partition + " curState:" + curState.getState(partition)
+ " prevState: "
+                      + prevState.getState(partition));
+            }
+            driftCnt++;
           }
         }
 
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
index 473f7a5..49be1e6 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
@@ -95,8 +95,8 @@ public class ExternalViewCache extends AbstractDataCache<ExternalView>
{
     reloadKeys.removeAll(cachedKeys);
 
     Map<PropertyKey, ExternalView> updatedMap =
-        refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys),
-            cachedExternalViewMap);
+        refreshProperties(accessor, reloadKeys, new ArrayList<>(cachedKeys),
+            cachedExternalViewMap, new HashSet<>());
     Map<String, ExternalView> newExternalViewMap = Maps.newHashMap();
     for (ExternalView externalView : updatedMap.values()) {
       newExternalViewMap.put(externalView.getResourceName(), externalView);
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
index 01b16e4..0d2bf96 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
@@ -85,8 +85,8 @@ public class IdealStateCache extends AbstractDataCache<IdealState>
{
     reloadKeys.removeAll(cachedKeys);
 
     Map<PropertyKey, IdealState> updatedMap =
-        refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys),
-            cachedIdealStateMap);
+        refreshProperties(accessor, reloadKeys, new ArrayList<>(cachedKeys),
+            cachedIdealStateMap, new HashSet<>());
     Map<String, IdealState> newIdealStateMap = Maps.newHashMap();
     for (IdealState idealState : updatedMap.values()) {
       newIdealStateMap.put(idealState.getResourceName(), idealState);
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java
index 6010c1d..ad77524 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java
@@ -22,6 +22,7 @@ package org.apache.helix.common.caches;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -172,8 +173,8 @@ public class PropertyCache<T extends HelixProperty> extends AbstractDataCache<T>
   private void doRefreshWithSelectiveUpdate(final HelixDataAccessor accessor) {
     SelectivePropertyRefreshInputs<T> input =
         genSelectiveUpdateInput(accessor, _objCache, _keyFuncs);
-    Map<PropertyKey, T> updatedData = refreshProperties(accessor, input.getReloadKeys(),
-        input.getCachedKeys(), input.getCachedPropertyMap());
+    Map<PropertyKey, T> updatedData = refreshProperties(accessor, new HashSet<>(input.getReloadKeys()),
+        input.getCachedKeys(), input.getCachedPropertyMap(), new HashSet<>());
     _objCache = propertyKeyMapToStringMap(updatedData, _keyFuncs);
 
     // need to separate keys so we can potentially update cache map asynchronously while
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index cf666f8..f94ee7e 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -558,10 +558,13 @@ public class RoutingTableProvider
 
   private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable) {
     _routingTableRef.set(newRoutingTable);
-    logger.info("Refresh the RoutingTable for cluster {}, takes {} ms.",
-        (_helixManager != null ? _helixManager.getClusterName() : null),
+    String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
+    logger.info("Refreshed the RoutingTable for cluster {}, took {} ms.", clusterName,
         (System.currentTimeMillis() - startTime));
-    notifyRoutingTableChange();
+
+    // TODO: move the callback user code logic to separate thread upon routing table statePropagation
latency
+    // integration test result. If the latency is more than 2 secs, we need to change this
part.
+    notifyRoutingTableChange(clusterName);
 
     // Update timestamp for last refresh
     if (_isPeriodicRefreshEnabled) {
@@ -569,12 +572,16 @@ public class RoutingTableProvider
     }
   }
 
-  private void notifyRoutingTableChange() {
-    for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap
-        .entrySet()) {
-      entry.getKey().onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()),
-          entry.getValue().getContext());
+  private void notifyRoutingTableChange(String clusterName) {
+    // This call back is called in the main event queue of RoutingTableProvider. We add log
to record time spent
+    // here. Potentially, we should call this callback in a separate thread if this is a
bottleneck.
+    long startTime = System.currentTimeMillis();
+    for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap.entrySet())
{
+      entry.getKey()
+          .onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()), entry.getValue().getContext());
     }
+    logger.info("RoutingTableProvider user callback time for cluster {}, took {} ms.", clusterName,
+        (System.currentTimeMillis() - startTime));
   }
 
   private class RouterUpdater extends ClusterEventProcessor {
diff --git a/helix-core/src/test/java/org/apache/helix/common/caches/TestCurrentStateSnapshot.java
b/helix-core/src/test/java/org/apache/helix/common/caches/TestCurrentStateSnapshot.java
new file mode 100644
index 0000000..ff125ea
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/common/caches/TestCurrentStateSnapshot.java
@@ -0,0 +1,112 @@
+package org.apache.helix.common.caches;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.MockAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Unit test for {@link CurrentStateSnapshot}
+ */
+public class TestCurrentStateSnapshot {
+
+  // This test makes sure that currentStateEndTimes calculation would record correct partition
replica.
+  // Specifically, if a replicate has not endTime field set, we should not put an entry into
currentStateEndTime
+  // calculation. Otherwise, we see huge statePropagation latency of 1.4Tms.
+  @Test(description = "test getNewCurrentStateEndTimes")
+  public void testGetNewCurrentStateEndTimes() {
+    String instance1 = "instance1";
+    String session1 = "session1";
+    String resource1 = "resource1";
+
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+
+    PropertyKey key = new PropertyKey.Builder("cluster").currentState(instance1, session1,
resource1);
+
+    CurrentState nxtState = new CurrentState(resource1);
+    // partition 1, expect to record in endTimesMap
+    nxtState.setState(partition1, "SLAVE");
+    nxtState.setEndTime(partition1, 200);
+    // partition 2, expect to not record in endTimeMap. This is fixing current 1.4T observed
timestamp issue
+    nxtState.setState(partition2, "MASTER");
+
+    Map<PropertyKey, CurrentState> currentStateMap = new HashMap<>();
+    Map<PropertyKey, CurrentState> nextStateMap = new HashMap<>();
+    nextStateMap.put(key, nxtState);
+
+    Set<PropertyKey> updateKeys = new HashSet<>();
+    updateKeys.add(key);
+
+    CurrentStateSnapshot snapshot = new CurrentStateSnapshot(nextStateMap, currentStateMap,
updateKeys);
+
+    Map<PropertyKey, Map<String, Long>> endTimesMap = snapshot.getNewCurrentStateEndTimes();
+
+    Assert.assertEquals(endTimesMap.size(), 1);
+    Assert.assertTrue(endTimesMap.get(key).get(partition1) == 200);
+  }
+
+  // This test makes sure that all the changed current State is reflected in newCurrentStateEndTimes
calculation.
+  // Previously, we have bugs that all newly created current state would be reflected in
newCurrentStateEndTimes
+  // calculation.
+  @Test(description = "testRefreshCurrentStateCache")
+  public void testRefreshCurrentStateCache() {
+    String instanceName = "instance1";
+    long instanceSession = 12345;
+    String resourceName = "resource";
+    String partitionName = "resource_partition1";
+
+    MockAccessor accessor = new MockAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // construct liveInstance
+    ZNRecord record = new ZNRecord(instanceName);
+    record.setEphemeralOwner(instanceSession);
+    LiveInstance instance = new LiveInstance(record);
+
+    boolean retVal = accessor.setProperty(keyBuilder.liveInstance(instanceName), instance);
+    Assert.assertTrue(retVal);
+
+    // construct currentstate
+    CurrentState originState = new CurrentState(resourceName);
+    originState.setEndTime(partitionName, 100);
+
+    CurrentState currentState = new CurrentState(resourceName);
+    currentState.setEndTime(partitionName, 300);
+    retVal = accessor.setProperty(keyBuilder.currentState(instanceName, instance.getEphemeralOwner(),
resourceName),
+        originState);
+    Assert.assertTrue(retVal);
+
+    CurrentStateCache cache = new CurrentStateCache("cluster");
+
+    Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+    liveInstanceMap.put(instanceName, instance);
+
+    retVal = cache.refresh(accessor, liveInstanceMap);
+    Assert.assertTrue(retVal);
+
+    retVal = accessor.setProperty(keyBuilder.currentState(instanceName, instance.getEphemeralOwner(),
resourceName),
+        currentState);
+    Assert.assertTrue(retVal);
+
+    retVal = cache.refresh(accessor, liveInstanceMap);
+    Assert.assertTrue(retVal);
+
+    CurrentStateSnapshot snapshot = cache.getSnapshot();
+
+    Map<PropertyKey, Map<String, Long>> endTimesMap = snapshot.getNewCurrentStateEndTimes();
+
+    Assert.assertEquals(endTimesMap.size(), 1);
+    // note, without this fix, the endTimesMap would be size zero.
+    Assert.assertTrue(endTimesMap.get(keyBuilder.currentState(instanceName, instance.getEphemeralOwner(),
resourceName))
+        .get(partitionName) == 300);
+  }
+}


Mime
View raw message