This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 909f962 Fix RoutingTableProvider statePropagationLatency metric reporting bug (#365)
909f962 is described below
commit 909f9621f22892238802417a32a6261ac9b8e6be
Author: kaisun2000 <52840222+kaisun2000@users.noreply.github.com>
AuthorDate: Tue Aug 6 11:58:16 2019 -0700
Fix RoutingTableProvider statePropagationLatency metric reporting bug (#365)
Issue:
CurrentStateCache updating snapshot would miss all the existing partitions that having
state change.
RoutingTableProvider callback on the main event thread. Time is not accounted in log.
Description:
fix the bug by updating the snapshot with the correct reloadkeys.
enhanced log to accout for user callback code separately.
Tests:
mvn test passed.
---
.../helix/common/caches/AbstractDataCache.java | 13 ++-
.../helix/common/caches/CurrentStateCache.java | 7 +-
.../helix/common/caches/CurrentStateSnapshot.java | 22 +++-
.../helix/common/caches/ExternalViewCache.java | 4 +-
.../helix/common/caches/IdealStateCache.java | 4 +-
.../apache/helix/common/caches/PropertyCache.java | 5 +-
.../helix/spectator/RoutingTableProvider.java | 23 +++--
.../common/caches/TestCurrentStateSnapshot.java | 112 +++++++++++++++++++++
8 files changed, 169 insertions(+), 21 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
index aaae8ef..4387083 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
@@ -20,9 +20,11 @@ package org.apache.helix.common.caches;
*/
import com.google.common.collect.Maps;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
@@ -48,15 +50,17 @@ public abstract class AbstractDataCache<T extends HelixProperty>
{
* Selectively fetch Helix Properties from ZK by comparing the version of local cached
one with the one on ZK.
* If version on ZK is newer, fetch it from zk and update local cache.
* @param accessor the HelixDataAccessor
- * @param reloadKeys keys needs to be reload
+ * @param reloadKeysIn keys needs to be reload
* @param cachedKeys keys already exists in the cache
* @param cachedPropertyMap cached map of propertykey -> property object
+ * @param reloadedKeys keys actually reloaded; may include more keys than reloadKeysIn
* @return updated properties map
*/
protected Map<PropertyKey, T> refreshProperties(
- HelixDataAccessor accessor, List<PropertyKey> reloadKeys, List<PropertyKey>
cachedKeys,
- Map<PropertyKey, T> cachedPropertyMap) {
+ HelixDataAccessor accessor, Set<PropertyKey> reloadKeysIn, List<PropertyKey>
cachedKeys,
+ Map<PropertyKey, T> cachedPropertyMap, Set<PropertyKey> reloadedKeys) {
// All new entries from zk not cached locally yet should be read from ZK.
+ List<PropertyKey> reloadKeys = new ArrayList<>(reloadKeysIn);
Map<PropertyKey, T> refreshedPropertyMap = Maps.newHashMap();
List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys);
for (int i = 0; i < cachedKeys.size(); i++) {
@@ -77,6 +81,9 @@ public abstract class AbstractDataCache<T extends HelixProperty> {
}
}
+ reloadedKeys.clear();
+ reloadedKeys.addAll(reloadKeys);
+
List<T> reloadedProperty = accessor.getProperty(reloadKeys, true);
Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
for (T property : reloadedProperty) {
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index e584956..c4801cc 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -135,13 +135,14 @@ public class CurrentStateCache extends AbstractDataCache<CurrentState>
{
Set<PropertyKey> cachedKeys = new HashSet<>(_currentStateCache.keySet());
cachedKeys.retainAll(currentStateKeys);
+ Set<PropertyKey> reloadedKeys = new HashSet<>();
Map<PropertyKey, CurrentState> newStateCache = Collections.unmodifiableMap(
- refreshProperties(accessor, new ArrayList<>(reloadKeys), new ArrayList<>(cachedKeys),
- _currentStateCache));
+ refreshProperties(accessor, reloadKeys, new ArrayList<>(cachedKeys),
+ _currentStateCache, reloadedKeys));
// if the cache was not initialized, the previous state should not be included in the
snapshot
if (_initialized) {
- _snapshot = new CurrentStateSnapshot(newStateCache, _currentStateCache, reloadKeys);
+ _snapshot = new CurrentStateSnapshot(newStateCache, _currentStateCache, reloadedKeys);
} else {
_snapshot = new CurrentStateSnapshot(newStateCache);
_initialized = true;
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
index dff9861..1e405a4 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateSnapshot.java
@@ -8,8 +8,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class CurrentStateSnapshot extends AbstractDataSnapshot<CurrentState> {
+ private static final Logger LOG = LoggerFactory.getLogger(CurrentStateSnapshot.class.getName());
+
private Set<PropertyKey> _updatedStateKeys = null;
private Map<PropertyKey, CurrentState> _prevStateMap = null;
@@ -32,6 +37,7 @@ public class CurrentStateSnapshot extends AbstractDataSnapshot<CurrentState>
{
if (_updatedStateKeys != null && _prevStateMap != null) {
// Note if the prev state map is empty, this is the first time refresh.
// So the update is not considered as "recent" change.
+ int driftCnt = 0; // clock drift count for comparing timestamp
for (PropertyKey propertyKey : _updatedStateKeys) {
CurrentState prevState = _prevStateMap.get(propertyKey);
CurrentState curState = _properties.get(propertyKey);
@@ -39,11 +45,25 @@ public class CurrentStateSnapshot extends AbstractDataSnapshot<CurrentState>
{
Map<String, Long> partitionUpdateEndTimes = null;
for (String partition : curState.getPartitionStateMap().keySet()) {
long newEndTime = curState.getEndTime(partition);
- if (prevState == null || prevState.getEndTime(partition) < newEndTime) {
+ // if prevState is null, and newEndTime is -1, we should not record -1 in endTimeMap;
otherwise,
+ // statePropagation latency calculation in RoutingTableProvider would spit out
extremely large metrics.
+ if ((prevState == null || prevState.getEndTime(partition) < newEndTime) &&
newEndTime != -1) {
if (partitionUpdateEndTimes == null) {
partitionUpdateEndTimes = new HashMap<>();
}
partitionUpdateEndTimes.put(partition, newEndTime);
+ } else if (prevState != null && prevState.getEndTime(partition) > newEndTime)
{
+ // This can happen due to clock drift.
+ // updatedStateKeys is the path to resource in an instance config.
+ // Thus, the space of inner loop is Sigma{replica(i) * partition(i)}; i over
all resources in the cluster
+ // This space can be large. In order not to print too many lines, we print first
warning for the first case.
+ // If clock drift turns out to be common, we can consider print out more logs,
or expose an metric.
+ if (driftCnt < 1) {
+ LOG.warn(
+ "clock drift. partition:" + partition + " curState:" + curState.getState(partition)
+ " prevState: "
+ + prevState.getState(partition));
+ }
+ driftCnt++;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
index 473f7a5..49be1e6 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/ExternalViewCache.java
@@ -95,8 +95,8 @@ public class ExternalViewCache extends AbstractDataCache<ExternalView>
{
reloadKeys.removeAll(cachedKeys);
Map<PropertyKey, ExternalView> updatedMap =
- refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys),
- cachedExternalViewMap);
+ refreshProperties(accessor, reloadKeys, new ArrayList<>(cachedKeys),
+ cachedExternalViewMap, new HashSet<>());
Map<String, ExternalView> newExternalViewMap = Maps.newHashMap();
for (ExternalView externalView : updatedMap.values()) {
newExternalViewMap.put(externalView.getResourceName(), externalView);
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
index 01b16e4..0d2bf96 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/IdealStateCache.java
@@ -85,8 +85,8 @@ public class IdealStateCache extends AbstractDataCache<IdealState>
{
reloadKeys.removeAll(cachedKeys);
Map<PropertyKey, IdealState> updatedMap =
- refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys),
- cachedIdealStateMap);
+ refreshProperties(accessor, reloadKeys, new ArrayList<>(cachedKeys),
+ cachedIdealStateMap, new HashSet<>());
Map<String, IdealState> newIdealStateMap = Maps.newHashMap();
for (IdealState idealState : updatedMap.values()) {
newIdealStateMap.put(idealState.getResourceName(), idealState);
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java
index 6010c1d..ad77524 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/PropertyCache.java
@@ -22,6 +22,7 @@ package org.apache.helix.common.caches;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -172,8 +173,8 @@ public class PropertyCache<T extends HelixProperty> extends AbstractDataCache<T>
private void doRefreshWithSelectiveUpdate(final HelixDataAccessor accessor) {
SelectivePropertyRefreshInputs<T> input =
genSelectiveUpdateInput(accessor, _objCache, _keyFuncs);
- Map<PropertyKey, T> updatedData = refreshProperties(accessor, input.getReloadKeys(),
- input.getCachedKeys(), input.getCachedPropertyMap());
+ Map<PropertyKey, T> updatedData = refreshProperties(accessor, new HashSet<>(input.getReloadKeys()),
+ input.getCachedKeys(), input.getCachedPropertyMap(), new HashSet<>());
_objCache = propertyKeyMapToStringMap(updatedData, _keyFuncs);
// need to separate keys so we can potentially update cache map asynchronously while
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index cf666f8..f94ee7e 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -558,10 +558,13 @@ public class RoutingTableProvider
private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable) {
_routingTableRef.set(newRoutingTable);
- logger.info("Refresh the RoutingTable for cluster {}, takes {} ms.",
- (_helixManager != null ? _helixManager.getClusterName() : null),
+ String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
+ logger.info("Refreshed the RoutingTable for cluster {}, took {} ms.", clusterName,
(System.currentTimeMillis() - startTime));
- notifyRoutingTableChange();
+
+ // TODO: move the callback user code logic to separate thread upon routing table statePropagation
latency
+ // integration test result. If the latency is more than 2 secs, we need to change this
part.
+ notifyRoutingTableChange(clusterName);
// Update timestamp for last refresh
if (_isPeriodicRefreshEnabled) {
@@ -569,12 +572,16 @@ public class RoutingTableProvider
}
}
- private void notifyRoutingTableChange() {
- for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap
- .entrySet()) {
- entry.getKey().onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()),
- entry.getValue().getContext());
+ private void notifyRoutingTableChange(String clusterName) {
+ // This call back is called in the main event queue of RoutingTableProvider. We add log
to record time spent
+ // here. Potentially, we should call this callback in a separate thread if this is a
bottleneck.
+ long startTime = System.currentTimeMillis();
+ for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap.entrySet())
{
+ entry.getKey()
+ .onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()), entry.getValue().getContext());
}
+ logger.info("RoutingTableProvider user callback time for cluster {}, took {} ms.", clusterName,
+ (System.currentTimeMillis() - startTime));
}
private class RouterUpdater extends ClusterEventProcessor {
diff --git a/helix-core/src/test/java/org/apache/helix/common/caches/TestCurrentStateSnapshot.java
b/helix-core/src/test/java/org/apache/helix/common/caches/TestCurrentStateSnapshot.java
new file mode 100644
index 0000000..ff125ea
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/common/caches/TestCurrentStateSnapshot.java
@@ -0,0 +1,112 @@
+package org.apache.helix.common.caches;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.MockAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Unit test for {@link CurrentStateSnapshot}
+ */
+public class TestCurrentStateSnapshot {
+
+ // This test makes sure that currentStateEndTimes calculation would record correct partition
replica.
+ // Specifically, if a replicate has not endTime field set, we should not put an entry into
currentStateEndTime
+ // calculation. Otherwise, we see huge statePropagation latency of 1.4Tms.
+ @Test(description = "test getNewCurrentStateEndTimes")
+ public void testGetNewCurrentStateEndTimes() {
+ String instance1 = "instance1";
+ String session1 = "session1";
+ String resource1 = "resource1";
+
+ String partition1 = "partition1";
+ String partition2 = "partition2";
+
+ PropertyKey key = new PropertyKey.Builder("cluster").currentState(instance1, session1,
resource1);
+
+ CurrentState nxtState = new CurrentState(resource1);
+ // partition 1, expect to record in endTimesMap
+ nxtState.setState(partition1, "SLAVE");
+ nxtState.setEndTime(partition1, 200);
+ // partition 2, expect to not record in endTimeMap. This is fixing current 1.4T observed
timestamp issue
+ nxtState.setState(partition2, "MASTER");
+
+ Map<PropertyKey, CurrentState> currentStateMap = new HashMap<>();
+ Map<PropertyKey, CurrentState> nextStateMap = new HashMap<>();
+ nextStateMap.put(key, nxtState);
+
+ Set<PropertyKey> updateKeys = new HashSet<>();
+ updateKeys.add(key);
+
+ CurrentStateSnapshot snapshot = new CurrentStateSnapshot(nextStateMap, currentStateMap,
updateKeys);
+
+ Map<PropertyKey, Map<String, Long>> endTimesMap = snapshot.getNewCurrentStateEndTimes();
+
+ Assert.assertEquals(endTimesMap.size(), 1);
+ Assert.assertTrue(endTimesMap.get(key).get(partition1) == 200);
+ }
+
+ // This test makes sure that all the changed current State is reflected in newCurrentStateEndTimes
calculation.
+ // Previously, we have bugs that all newly created current state would be reflected in
newCurrentStateEndTimes
+ // calculation.
+ @Test(description = "testRefreshCurrentStateCache")
+ public void testRefreshCurrentStateCache() {
+ String instanceName = "instance1";
+ long instanceSession = 12345;
+ String resourceName = "resource";
+ String partitionName = "resource_partition1";
+
+ MockAccessor accessor = new MockAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // construct liveInstance
+ ZNRecord record = new ZNRecord(instanceName);
+ record.setEphemeralOwner(instanceSession);
+ LiveInstance instance = new LiveInstance(record);
+
+ boolean retVal = accessor.setProperty(keyBuilder.liveInstance(instanceName), instance);
+ Assert.assertTrue(retVal);
+
+ // construct currentstate
+ CurrentState originState = new CurrentState(resourceName);
+ originState.setEndTime(partitionName, 100);
+
+ CurrentState currentState = new CurrentState(resourceName);
+ currentState.setEndTime(partitionName, 300);
+ retVal = accessor.setProperty(keyBuilder.currentState(instanceName, instance.getEphemeralOwner(),
resourceName),
+ originState);
+ Assert.assertTrue(retVal);
+
+ CurrentStateCache cache = new CurrentStateCache("cluster");
+
+ Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+ liveInstanceMap.put(instanceName, instance);
+
+ retVal = cache.refresh(accessor, liveInstanceMap);
+ Assert.assertTrue(retVal);
+
+ retVal = accessor.setProperty(keyBuilder.currentState(instanceName, instance.getEphemeralOwner(),
resourceName),
+ currentState);
+ Assert.assertTrue(retVal);
+
+ retVal = cache.refresh(accessor, liveInstanceMap);
+ Assert.assertTrue(retVal);
+
+ CurrentStateSnapshot snapshot = cache.getSnapshot();
+
+ Map<PropertyKey, Map<String, Long>> endTimesMap = snapshot.getNewCurrentStateEndTimes();
+
+ Assert.assertEquals(endTimesMap.size(), 1);
+ // note, without this fix, the endTimesMap would be size zero.
+ Assert.assertTrue(endTimesMap.get(keyBuilder.currentState(instanceName, instance.getEphemeralOwner(),
resourceName))
+ .get(partitionName) == 300);
+ }
+}
|