helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject helix git commit: [HELIX-698] Add periodic refresh to RoutingTableProvider
Date Thu, 19 Apr 2018 21:08:44 GMT
Repository: helix
Updated Branches:
  refs/heads/master e1faf2404 -> 0e4163f18


[HELIX-698] Add periodic refresh to RoutingTableProvider

There have been incidents where RoutingTableProvider was not getting a proper refresh potentially
due to the lag in ZKClient CallbackHandler or connectivity issues. This addition of periodic
refresh avoids cases where RoutingTableProvider is severely delayed by initiating periodic
refreshes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0e4163f1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0e4163f1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0e4163f1

Branch: refs/heads/master
Commit: 0e4163f18c1274c0f77320698e9dfbf42314810d
Parents: e1faf24
Author: Hunter Lee <narendly@gmail.com>
Authored: Thu Apr 19 13:42:37 2018 -0700
Committer: Hunter Lee <narendly@gmail.com>
Committed: Thu Apr 19 14:07:52 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/NotificationContext.java   |   1 +
 .../common/caches/BasicClusterDataCache.java    |   2 +-
 .../helix/spectator/RoutingTableProvider.java   | 225 ++++++++++++-------
 ...TestRoutingTableProviderPeriodicRefresh.java | 218 ++++++++++++++++++
 4 files changed, 359 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/NotificationContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
index dd76b60..9664f66 100644
--- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java
+++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
@@ -188,6 +188,7 @@ public class NotificationContext {
   public enum Type {
     INIT,
     CALLBACK,
+    PERIODIC_REFRESH,
     FINALIZE
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index d6e324d..06fcaf6 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -225,7 +225,7 @@ public class BasicClusterDataCache {
    */
   public void requireFullRefresh() {
     for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
-      _propertyDataChangedMap.put(type, Boolean.valueOf(true));
+      _propertyDataChangedMap.put(type, Boolean.TRUE);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 4076697..f72d66a 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.helix.HelixConstants;
@@ -53,25 +55,47 @@ import org.apache.helix.model.LiveInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener,
-    ConfigChangeListener, LiveInstanceChangeListener, CurrentStateChangeListener {
+public class RoutingTableProvider
+    implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener,
+               LiveInstanceChangeListener, CurrentStateChangeListener {
   private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
+  private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000; // 5 minutes
   private final AtomicReference<RoutingTable> _routingTableRef;
   private final HelixManager _helixManager;
   private final RouterUpdater _routerUpdater;
   private final PropertyType _sourceDataType;
   private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap;
 
+  // For periodic refresh
+  private long _lastRefreshTimestamp;
+  private boolean _isPeriodicRefreshEnabled = true; // Default is enabled
+  private long _periodRefreshInterval;
+  private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
+
   public RoutingTableProvider() {
     this(null);
   }
 
   public RoutingTableProvider(HelixManager helixManager) throws HelixException {
-    this(helixManager, PropertyType.EXTERNALVIEW);
+    this(helixManager, PropertyType.EXTERNALVIEW, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
   }
 
   public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType)
       throws HelixException {
+    this(helixManager, sourceDataType, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
+  }
+
+  /**
+   * Initialize an instance of RoutingTableProvider
+   *
+   * @param helixManager
+   * @param sourceDataType
+   * @param isPeriodicRefreshEnabled true if periodic refresh is enabled, false otherwise
+   * @param periodRefreshInterval only effective if isPeriodRefreshEnabled is true
+   * @throws HelixException
+   */
+  public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType,
+      boolean isPeriodicRefreshEnabled, long periodRefreshInterval) throws HelixException
{
     _routingTableRef = new AtomicReference<>(new RoutingTable());
     _helixManager = helixManager;
     _sourceDataType = sourceDataType;
@@ -79,41 +103,43 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
     String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
     _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
     _routerUpdater.start();
+
     if (_helixManager != null) {
       switch (_sourceDataType) {
-      case EXTERNALVIEW:
-        try {
-          _helixManager.addExternalViewChangeListener(this);
-        } catch (Exception e) {
-          shutdown();
-          logger.error("Failed to attach ExternalView Listener to HelixManager!");
-          throw new HelixException("Failed to attach ExternalView Listener to HelixManager!",
e);
-        }
-        break;
-
-      case TARGETEXTERNALVIEW:
-        // Check whether target external has been enabled or not
-        if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
-            _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
0)) {
-          shutdown();
-          throw new HelixException("Target External View is not enabled!");
-        }
+        case EXTERNALVIEW:
+          try {
+            _helixManager.addExternalViewChangeListener(this);
+          } catch (Exception e) {
+            shutdown();
+            logger.error("Failed to attach ExternalView Listener to HelixManager!");
+            throw new HelixException("Failed to attach ExternalView Listener to HelixManager!",
e);
+          }
+          break;
 
-        try {
-          _helixManager.addTargetExternalViewChangeListener(this);
-        } catch (Exception e) {
-          shutdown();
-          logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
-          throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!",
e);
-        }
-        break;
+        case TARGETEXTERNALVIEW:
+          // Check whether target external has been enabled or not
+          if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+              _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
0)) {
+            shutdown();
+            throw new HelixException("Target External View is not enabled!");
+          }
 
-      case CURRENTSTATES:
-        // CurrentState change listeners will be added later in LiveInstanceChange call.
-        break;
+          try {
+            _helixManager.addTargetExternalViewChangeListener(this);
+          } catch (Exception e) {
+            shutdown();
+            logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
+            throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!",
+                e);
+          }
+          break;
 
-      default:
-        throw new HelixException("Unsupported source data type: " + sourceDataType);
+        case CURRENTSTATES:
+          // CurrentState change listeners will be added later in LiveInstanceChange call.
+          break;
+
+        default:
+          throw new HelixException(String.format("Unsupported source data type: %s", sourceDataType));
       }
 
       try {
@@ -128,12 +154,40 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
             e);
       }
     }
+
+    // For periodic refresh
+    if (isPeriodicRefreshEnabled) {
+      _lastRefreshTimestamp = System.currentTimeMillis(); // Initialize timestamp with current
time
+      _periodRefreshInterval = periodRefreshInterval;
+      // Construct a periodic refresh context
+      final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager);
+      periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
+      // Create a thread that runs at specified interval
+      _periodicRefreshExecutor = new ScheduledThreadPoolExecutor(1);
+      _periodicRefreshExecutor.scheduleAtFixedRate(new Runnable() {
+        @Override
+        public void run() {
+          // If enough time has elapsed since last refresh, queue a refresh event
+          if (_lastRefreshTimestamp + _periodRefreshInterval < System.currentTimeMillis())
{
+            // changeType is irrelevant for NotificationContext.Type.PERIODIC_REFRESH
+            _routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance,
+                null);
+          }
+        }
+      }, _periodRefreshInterval, _periodRefreshInterval, TimeUnit.MILLISECONDS);
+    } else {
+      _isPeriodicRefreshEnabled = false;
+    }
   }
 
   /**
    * Shutdown current RoutingTableProvider. Once it is shutdown, it should never be reused.
    */
   public void shutdown() {
+    if (_periodicRefreshExecutor != null) {
+      _periodicRefreshExecutor.purge();
+      _periodicRefreshExecutor.shutdown();
+    }
     _routerUpdater.shutdown();
     if (_helixManager != null) {
       PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
@@ -147,7 +201,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
         case CURRENTSTATES:
           NotificationContext context = new NotificationContext(_helixManager);
           context.setType(NotificationContext.Type.FINALIZE);
-          updateCurrentStatesListeners(Collections.<LiveInstance>emptyList(), context);
+          updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
           break;
         default:
           break;
@@ -158,7 +212,6 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
   /**
    * Get an snapshot of current RoutingTable information. The snapshot is immutable, it reflects
the
    * routing table information at the time this method is called.
-   *
    * @return snapshot of current routing table.
    */
   public RoutingTableSnapshot getRoutingTableSnapshot() {
@@ -167,29 +220,30 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
 
   /**
    * Add RoutingTableChangeListener with user defined context
-   *
    * @param routingTableChangeListener
    * @param context user defined context
    */
-  public void addRoutingTableChangeListener(final RoutingTableChangeListener routingTableChangeListener,
-      Object context) {
+  public void addRoutingTableChangeListener(
+      final RoutingTableChangeListener routingTableChangeListener, Object context) {
     _routingTableChangeListenerMap.put(routingTableChangeListener, new ListenerContext(context));
+    logger.info("Attach RoutingTableProviderChangeListener {}",
+        routingTableChangeListener.getClass().getName());
   }
 
   /**
    * Remove RoutingTableChangeListener
-   *
    * @param routingTableChangeListener
    */
   public Object removeRoutingTableChangeListener(
       final RoutingTableChangeListener routingTableChangeListener) {
+    logger.info("Detach RoutingTableProviderChangeListener {}",
+        routingTableChangeListener.getClass().getName());
     return _routingTableChangeListenerMap.remove(routingTableChangeListener);
   }
 
   /**
    * returns the instances for {resource,partition} pair that are in a specific
    * {state}
-   *
    * This method will be deprecated, please use the
    * {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method.
    * @param resourceName
@@ -198,7 +252,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
    * @param state
    * @return empty list if there is no instance in a given state
    */
-  public List<InstanceConfig> getInstances(String resourceName, String partitionName,
String state) {
+  public List<InstanceConfig> getInstances(String resourceName, String partitionName,
+      String state) {
     return getInstancesForResource(resourceName, partitionName, state);
   }
 
@@ -211,21 +266,19 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
    * @param state
    * @return empty list if there is no instance in a given state
    */
-  public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
String state) {
+  public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
+      String state) {
     return _routingTableRef.get().getInstancesForResource(resourceName, partitionName, state);
   }
 
   /**
    * returns the instances for {resource group,partition} pair in all resources belongs to
the given
    * resource group that are in a specific {state}.
-   *
    * The return results aggregate all partition states from all the resources in the given
resource
    * group.
-   *
    * @param resourceGroupName
    * @param partitionName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
@@ -237,26 +290,22 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
   /**
    * returns the instances for {resource group,partition} pair contains any of the given
tags
    * that are in a specific {state}.
-   *
    * Find all resources belongs to the given resource group that have any of the given resource
tags
    * and return the aggregated partition states from all these resources.
-   *
    * @param resourceGroupName
    * @param partitionName
    * @param state
    * @param resourceTags
-   *
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
       String partitionName, String state, List<String> resourceTags) {
-    return _routingTableRef.get()
-        .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags);
+    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
+        state, resourceTags);
   }
 
   /**
    * returns all instances for {resource} that are in a specific {state}
-   *
    * This method will be deprecated, please use the
    * {@link #getInstancesForResource(String, String) getInstancesForResource} method.
    * @param resourceName
@@ -279,10 +328,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
 
   /**
    * returns all instances for all resources in {resource group} that are in a specific {state}
-   *
    * @param resourceGroupName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
String state) {
@@ -292,10 +339,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
   /**
    * returns all instances for resources contains any given tags in {resource group} that
are in a
    * specific {state}
-   *
    * @param resourceGroupName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
String state,
@@ -333,13 +378,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
       NotificationContext changeContext) {
     HelixConstants.ChangeType changeType = changeContext.getChangeType();
     if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType))
{
-      logger.warn("onExternalViewChange called with dis-matched change types. Source data
type "
-          + _sourceDataType + ", changed data type: " + changeType);
+      logger.warn(
+          "onExternalViewChange called with mismatched change types. Source data type {},
changed data type: {}",
+          _sourceDataType, changeType);
       return;
     }
     // Refresh with full list of external view.
     if (externalViewList != null && externalViewList.size() > 0) {
-      // keep this here for back-compatibility, application can call onExternalViewChange
directly with externalview list supplied.
+      // keep this here for back-compatibility, application can call onExternalViewChange
directly
+      // with externalview list supplied.
       refresh(externalViewList, changeContext);
     } else {
       ClusterEventType eventType;
@@ -348,8 +395,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
       } else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) {
         eventType = ClusterEventType.TargetExternalViewChange;
       } else {
-        logger.warn("onExternalViewChange called with dis-matched change types. Source data
type "
-            + _sourceDataType + ", change type: " + changeType);
+        logger.warn(
+            "onExternalViewChange called with mismatched change types. Source data type {},
change type: {}",
+            _sourceDataType, changeType);
         return;
       }
       _routerUpdater.queueEvent(changeContext, eventType, changeType);
@@ -366,8 +414,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
 
   @Override
   @PreFetch(enabled = false)
-  public void onConfigChange(List<InstanceConfig> configs,
-      NotificationContext changeContext) {
+  public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext)
{
     onInstanceConfigChange(configs, changeContext);
   }
 
@@ -386,8 +433,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
 
   @Override
   @PreFetch(enabled = false)
-  public void onStateChange(String instanceName,
-      List<CurrentState> statesInfo, NotificationContext changeContext) {
+  public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+      NotificationContext changeContext) {
     if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
       _routerUpdater.queueEvent(changeContext, ClusterEventType.CurrentStateChange,
           HelixConstants.ChangeType.CURRENT_STATE);
@@ -410,7 +457,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
 
     if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
       // on finalize, should remove all current-state listeners
-      logger.info("remove current-state listeners. lastSeenSessions: " + _lastSeenSessions);
+      logger.info("remove current-state listeners. lastSeenSessions: {}", _lastSeenSessions);
       liveInstances = Collections.emptyList();
     }
 
@@ -433,11 +480,12 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
           try {
             // add current-state listeners for new sessions
             manager.addCurrentStateChangeListener(this, instanceName, session);
-            logger.info(manager.getInstanceName() + " added current-state listener for instance:
"
-                + instanceName + ", session: " + session + ", listener: " + this);
+            logger.info(
+                "{} added current-state listener for instance: {}, session: {}, listener:
{}",
+                manager.getInstanceName(), instanceName, session, this);
           } catch (Exception e) {
-            logger.error("Fail to add current state listener for instance: " + instanceName
-                + " with session: " + session, e);
+            logger.error("Fail to add current state listener for instance: {} with session:
{}",
+                instanceName, session, e);
           }
         }
       }
@@ -447,8 +495,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
         if (!curSessions.containsKey(session)) {
           String instanceName = lastSessions.get(session).getInstanceName();
           manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
-          logger.info("remove current-state listener for instance:" + instanceName + ", session:
"
-              + session);
+          logger.info("remove current-state listener for instance: {}, session: {}", instanceName,
+              session);
         }
       }
 
@@ -476,10 +524,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances)
{
     long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances);
-    _routingTableRef.set(newRoutingTable);
-    logger.info("Refreshed the RoutingTable for cluster " + (_helixManager != null ? _helixManager
-        .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime)
+ "ms.");
-    notifyRoutingTableChange();
+    resetRoutingTableAndNotify(startTime, newRoutingTable);
   }
 
   protected void refresh(Map<String, Map<String, Map<String, CurrentState>>>
currentStateMap,
@@ -487,10 +532,20 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
     long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable =
         new RoutingTable(currentStateMap, instanceConfigs, liveInstances);
+    resetRoutingTableAndNotify(startTime, newRoutingTable);
+  }
+
+  private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable) {
     _routingTableRef.set(newRoutingTable);
-    logger.info("Refresh the RoutingTable for cluster " + (_helixManager != null ? _helixManager
-        .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime)
+ "ms.");
+    logger.info("Refresh the RoutingTable for cluster {}, takes {} ms.",
+        (_helixManager != null ? _helixManager.getClusterName() : null),
+        (System.currentTimeMillis() - startTime));
     notifyRoutingTableChange();
+
+    // Update timestamp for last refresh
+    if (_isPeriodicRefreshEnabled) {
+      _lastRefreshTimestamp = System.currentTimeMillis();
+    }
   }
 
   private void notifyRoutingTableChange() {
@@ -505,7 +560,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
     private final RoutingDataCache _dataCache;
 
     public RouterUpdater(String clusterName, PropertyType sourceDataType) {
-      super("Helix-RouterUpdater");
+      super("Helix-RouterUpdater-event_process");
       _dataCache = new RoutingDataCache(clusterName, sourceDataType);
     }
 
@@ -519,30 +574,26 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
         // refresh routing table.
         HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
         if (manager == null) {
-          logger.error("HelixManager is null for router update event : " + event);
+          logger.error(String.format("HelixManager is null for router update event: %s",
event));
           throw new HelixException("HelixManager is null for router update event.");
         }
         _dataCache.refresh(manager.getHelixDataAccessor());
-
         switch (_sourceDataType) {
           case EXTERNALVIEW:
             refresh(_dataCache.getExternalViews().values(),
                 _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
             break;
-
           case TARGETEXTERNALVIEW:
             refresh(_dataCache.getTargetExternalViews().values(),
                 _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
             break;
-
           case CURRENTSTATES:
             refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
                 _dataCache.getLiveInstances().values());
             break;
-
           default:
-            logger.warn("Unsupported source data type: " + _sourceDataType
-                + ", stop refreshing the routing table!");
+            logger.warn("Unsupported source data type: {}, stop refreshing the routing table!",
+                _sourceDataType);
         }
       }
     }
@@ -550,12 +601,14 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
     public void queueEvent(NotificationContext context, ClusterEventType eventType,
         HelixConstants.ChangeType changeType) {
       ClusterEvent event = new ClusterEvent(_clusterName, eventType);
-      if (context == null || context.getType() != NotificationContext.Type.CALLBACK) {
+      if (context == null || context.getType() != NotificationContext.Type.CALLBACK
+          || context.getType() == NotificationContext.Type.PERIODIC_REFRESH) {
         _dataCache.requireFullRefresh();
       } else {
         _dataCache.notifyDataChange(changeType, context.getPathChanged());
       }
 
+      // Null check for manager in the following line is done in handleEvent()
       event.addAttribute(AttributeName.helixmanager.name(), context.getManager());
       event.addAttribute(AttributeName.changeContext.name(), context);
       queueEvent(event);

http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
new file mode 100644
index 0000000..dac7617
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
@@ -0,0 +1,218 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProviderPeriodicRefresh extends ZkIntegrationTestBase {
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TestRoutingTableProviderPeriodicRefresh.class);
+
+  private static final String STATE_MODEL = BuiltInStateModelDefinitions.MasterSlave.name();
+  private static final String TEST_DB = "TestDB";
+  private static final String CLASS_NAME = TestRoutingTableProvider.class.getSimpleName();
+  private static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private static final int PARTICIPANT_NUMBER = 3;
+  private static final int PARTICIPANT_START_PORT = 12918;
+
+  private static final int PARTITION_NUMBER = 20;
+  private static final int REPLICA_NUMBER = 3;
+
+  private HelixManager _spectator;
+  private HelixManager _spectator_2;
+  private HelixManager _spectator_3;
+  private List<MockParticipantManager> _participants = new ArrayList<>();
+  private List<String> _instances = new ArrayList<>();
+  private ClusterControllerManager _controller;
+  private HelixClusterVerifier _clusterVerifier;
+  private MockRoutingTableProvider _routingTableProvider;
+  private MockRoutingTableProvider _routingTableProviderNoPeriodicRefresh;
+  private MockRoutingTableProvider _routingTableProviderLongPeriodicRefresh;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out
+        .println("START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances, STATE_MODEL,
+        PARTITION_NUMBER, REPLICA_NUMBER);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // start speculator - initialize it with a Mock
+    _spectator = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator",
+        InstanceType.SPECTATOR, ZK_ADDR);
+    _spectator.connect();
+
+    _spectator_2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator_2",
+        InstanceType.SPECTATOR, ZK_ADDR);
+    _spectator_2.connect();
+
+    _spectator_3 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator_3",
+        InstanceType.SPECTATOR, ZK_ADDR);
+    _spectator_3.connect();
+
+    _routingTableProvider =
+        new MockRoutingTableProvider(_spectator, PropertyType.EXTERNALVIEW, true, 1000L);
+    _spectator.addExternalViewChangeListener(_routingTableProvider);
+    _spectator.addLiveInstanceChangeListener(_routingTableProvider);
+    _spectator.addInstanceConfigChangeListener(_routingTableProvider);
+
+    _routingTableProviderNoPeriodicRefresh =
+        new MockRoutingTableProvider(_spectator_2, PropertyType.EXTERNALVIEW, false, 1000L);
+    _spectator_2.addExternalViewChangeListener(_routingTableProviderNoPeriodicRefresh);
+    _spectator_2.addLiveInstanceChangeListener(_routingTableProviderNoPeriodicRefresh);
+    _spectator_2.addInstanceConfigChangeListener(_routingTableProviderNoPeriodicRefresh);
+
+    _routingTableProviderLongPeriodicRefresh =
+        new MockRoutingTableProvider(_spectator_3, PropertyType.EXTERNALVIEW, true, 3000000L);
+    _spectator_3.addExternalViewChangeListener(_routingTableProviderLongPeriodicRefresh);
+    _spectator_3.addLiveInstanceChangeListener(_routingTableProviderLongPeriodicRefresh);
+    _spectator_3.addInstanceConfigChangeListener(_routingTableProviderLongPeriodicRefresh);
+
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // stop participants
+    for (MockParticipantManager p : _participants) {
+      p.syncStop();
+    }
+
+    _controller.syncStop();
+    _spectator.disconnect();
+    _spectator_2.disconnect();
+    _spectator_3.disconnect();
+    _gSetupTool.deleteCluster(CLUSTER_NAME);
+  }
+
+  public class MockRoutingTableProvider extends RoutingTableProvider {
+    private volatile int _refreshCount = 0;
+    private static final boolean DEBUG = false;
+
+    public MockRoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType,
+        boolean isPeriodicRefreshEnabled, long periodRefreshInterval) {
+      super(helixManager, sourceDataType, isPeriodicRefreshEnabled, periodRefreshInterval);
+    }
+
+    @Override
+    public synchronized void refresh(List<ExternalView> externalViewList,
+        NotificationContext changeContext) {
+      super.refresh(externalViewList, changeContext);
+      _refreshCount++;
+      if (DEBUG) {
+        print();
+      }
+    }
+
+    @Override
+    public synchronized void refresh(Collection<ExternalView> externalViews,
+        Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance>
liveInstances) {
+      super.refresh(externalViews, instanceConfigs, liveInstances);
+      _refreshCount++;
+      if (DEBUG) {
+        print();
+      }
+    }
+
+    @Override
+    protected synchronized void refresh(
+        Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+        Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance>
liveInstances) {
+      super.refresh(currentStateMap, instanceConfigs, liveInstances);
+      _refreshCount++;
+      if (DEBUG) {
+        print();
+      }
+    }
+
+    // Log statements for debugging purposes
+    private void print() {
+      logger.error("Refresh happened; count: {}", getRefreshCount());
+      logger.error("timestamp: {}", System.currentTimeMillis());
+    }
+
+    synchronized int getRefreshCount() {
+      return _refreshCount;
+    }
+  }
+
+  @Test
+  public void testPeriodicRefresh() throws InterruptedException {
+    // Wait so that initial refreshes finish (not triggered by periodic refresh timer)
+    Thread.sleep(1000L);
+
+    // Test short refresh
+    int prevRefreshCount = _routingTableProvider.getRefreshCount();
+    // Wait for one timer duration
+    Thread.sleep(1000L);
+    // The timer should have gone off, incrementing the refresh count
+    Assert.assertEquals(_routingTableProvider.getRefreshCount(), prevRefreshCount + 1);
+
+    // Test no periodic refresh
+    prevRefreshCount = _routingTableProviderNoPeriodicRefresh.getRefreshCount();
+    // Wait
+    Thread.sleep(2000);
+    // The timer should NOT have gone off, the refresh count must stay the same
+    Assert.assertEquals(_routingTableProviderNoPeriodicRefresh.getRefreshCount(), prevRefreshCount);
+
+    // Test long periodic refresh
+    prevRefreshCount = _routingTableProviderLongPeriodicRefresh.getRefreshCount();
+    // Wait
+    Thread.sleep(2000);
+    // The timer should NOT have gone off yet, the refresh count must stay the same
+    Assert.assertEquals(_routingTableProviderLongPeriodicRefresh.getRefreshCount(),
+        prevRefreshCount);
+
+    // Call shutdown to make sure they are shutting down properly
+    _routingTableProvider.shutdown();
+    _routingTableProviderNoPeriodicRefresh.shutdown();
+    _routingTableProviderLongPeriodicRefresh.shutdown();
+  }
+}


Mime
View raw message