sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kal...@apache.org
Subject [2/2] sentry git commit: SENTRY-2109: Fix the logic of identifying HMS out of Sync and handle gaps and out-of-sequence notifications.(Kalyan Kumar kalvagadda, reviewed-by Vadim Spector, Na Li and Arjun Mishra)
Date Thu, 01 Feb 2018 22:53:09 GMT
SENTRY-2109: Fix the logic of identifying HMS out of Sync and handle gaps and out-of-sequence notifications.(Kalyan Kumar kalvagadda, reviewed-by Vadim Spector, Na Li and Arjun Mishra)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/afcaa499
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/afcaa499
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/afcaa499

Branch: refs/heads/master
Commit: afcaa4997afae428522e7f0d7fb5917e9e58580d
Parents: 1b71cfb
Author: Kalyan Kumar Kalvagadda <kkalyan@cloudera.com>
Authored: Thu Feb 1 16:52:15 2018 -0600
Committer: Kalyan Kumar Kalvagadda <kkalyan@cloudera.com>
Committed: Thu Feb 1 16:52:15 2018 -0600

----------------------------------------------------------------------
 .../exception/SentryOutOfSyncException.java     |  26 +++
 .../db/service/persistent/HMSFollower.java      | 104 +++------
 .../db/service/persistent/SentryStore.java      |  30 ++-
 .../service/thrift/HiveNotificationFetcher.java | 204 +++++++++++++++--
 .../sentry/service/thrift/SentryService.java    |   4 +
 .../sentry/service/thrift/ServiceConstants.java |   6 +
 .../db/service/persistent/TestHMSFollower.java  |  86 ++++---
 .../TestHMSFollowerSentryStoreIntegration.java  |   4 +-
 .../db/service/persistent/TestSentryStore.java  |  32 +--
 .../thrift/TestHiveNotificationFetcher.java     | 222 ++++++++++++++++++-
 .../TestHiveNotificationFetcherCache.java       | 203 +++++++++++++++++
 .../e2e/dbprovider/TestSnapshotCreation.java    |  87 ++++++++
 ...tSnapshotCreationWithShorterHMSEventTtl.java | 105 +++++++++
 ...shotWithLongerHMSFollowerLongerInterval.java | 114 ++++++++++
 .../tests/e2e/hdfs/TestHDFSIntegrationBase.java |  43 +++-
 15 files changed, 1107 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java
new file mode 100644
index 0000000..d2b1945
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.exception;
+
+public class SentryOutOfSyncException extends Exception {
+  private static final long serialVersionUID = 1L;
+  public SentryOutOfSyncException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
index 2f2b984..45e4305 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
@@ -18,18 +18,19 @@
 
 package org.apache.sentry.provider.db.service.persistent;
 
-import org.apache.sentry.core.common.utils.PubSub;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-
 import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME;
 import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
 import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jdo.JDODataStoreException;
+
+import org.apache.sentry.core.common.exception.SentryOutOfSyncException;
+import org.apache.sentry.core.common.utils.PubSub;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.thrift.TException;
@@ -109,7 +110,7 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
     notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf);
     client = new SentryHMSClient(authzConf, hiveConnectionFactory);
     hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync
-    notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory);
+    notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory, authzConf);
 
     // subscribe to full update notification
     if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) {
@@ -147,25 +148,25 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
   @Override
   public void run() {
     SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
-    long lastProcessedNotificationId;
+    long maxNotificationId;
     try {
       try {
-        // Initializing lastProcessedNotificationId based on the latest persisted notification ID.
-        lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID();
+        // Initializing maxNotificationId based on the latest persisted notification ID.
+        maxNotificationId = sentryStore.getMaxNotificationID();
       } catch (Exception e) {
         LOGGER.error("Failed to get the last processed notification id from sentry store, "
             + "Skipping the processing", e);
         return;
       }
       // Wake any clients connected to this service waiting for HMS already processed notifications.
-      wakeUpWaitingClientsForSync(lastProcessedNotificationId);
+      wakeUpWaitingClientsForSync(maxNotificationId);
       // Only the leader should listen to HMS updates
       if (!isLeader()) {
         // Close any outstanding connections to HMS
         close();
         return;
       }
-      syncupWithHms(lastProcessedNotificationId);
+      syncupWithHms(maxNotificationId);
     } finally {
       SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
     }
@@ -189,8 +190,9 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
    *
    * <p>Clients connections waiting for an event notification will be
    * woken up afterwards.
+   * @param maxNotificationId Max of all event-id's that sentry has processed.
    */
-  private void syncupWithHms(long notificationId) {
+  private void syncupWithHms(long maxNotificationId) {
     try {
       client.connect();
       connectedToHms = true;
@@ -201,18 +203,17 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
     }
 
     try {
+      Collection<NotificationEvent> notifications;
       /* Before getting notifications, it checks if a full HMS snapshot is required. */
-      if (isFullSnapshotRequired(notificationId)) {
+      if (isFullSnapshotRequired(maxNotificationId)) {
         createFullSnapshot();
         return;
       }
-
-      Collection<NotificationEvent> notifications =
-          notificationFetcher.fetchNotifications(notificationId);
-
-      // After getting notifications, it checks if the HMS did some clean-up and notifications
-      // are out-of-sync with Sentry.
-      if (areNotificationsOutOfSync(notifications, notificationId)) {
+      try {
+        notifications = notificationFetcher.fetchNotifications(maxNotificationId);
+      } catch (SentryOutOfSyncException e) {
+        LOGGER.error("An error occurred while fetching HMS notifications: {}",
+                e.getMessage());
         createFullSnapshot();
         return;
       }
@@ -225,7 +226,7 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
       }
 
       // Continue with processing new notifications if no snapshots are done.
-      processNotifications(notifications);
+      processNotifications(notifications, maxNotificationId);
     } catch (TException e) {
       LOGGER.error("An error occurred while fetching HMS notifications: ", e);
       close();
@@ -284,52 +285,6 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
   }
 
   /**
-   * Checks if the HMS and Sentry processed notifications are out-of-sync.
-   * This could happen because the HMS did some clean-up of old notifications
-   * and Sentry was not requesting notifications during that time.
-   *
-   * @param events All new notifications to check for an out-of-sync.
-   * @param latestProcessedId The latest notification processed by Sentry to check against the
-   *        list of notifications events.
-   * @return True if an out-of-sync is found; False otherwise.
-   */
-  private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events,
-      long latestProcessedId) {
-    if (events.isEmpty()) {
-      return false;
-    }
-
-    /*
-     * If the sequence of notifications has a gap, then an out-of-sync might
-     * have happened due to the following issue:
-     *
-     * - HDFS sync was disabled or Sentry was shutdown for a time period longer than
-     * the HMS notification clean-up thread causing old notifications to be deleted.
-     *
-     * HMS notifications may contain both gaps in the sequence and duplicates
-     * (the same ID repeated more then once for different events).
-     *
-     * To accept duplicates (see NotificationFetcher for more info), then a gap is found
-     * if the 1st notification received is higher than the current ID processed + 1.
-     * i.e.
-     *   1st ID = 3, latest ID = 3 (duplicate found but no gap detected)
-     *   1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected)
-     *   1st ID = 5, latest ID = 3 (a gap is detected)
-     */
-
-    List<NotificationEvent> eventList = (List<NotificationEvent>) events;
-    long firstNotificationId = eventList.get(0).getEventId();
-
-    if (firstNotificationId > (latestProcessedId + 1)) {
-      LOGGER.info("First HMS event notification Id = {} is greater than latest Sentry processed"
-          + "notification Id = {} + 1. Need to request a full HMS snapshot.", firstNotificationId, latestProcessedId);
-      return true;
-    }
-
-    return false;
-  }
-
-  /**
    * Request for full snapshot and persists it if there is no snapshot available in the sentry
    * store. Also, wakes-up any waiting clients.
    *
@@ -392,16 +347,27 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
    * Also, persists the notification ID regardless of processing result.
    *
    * @param events list of event to be processed
+   * @param notificationId Max event-id that sentry processed so far.
    * @throws Exception if the complete notification list is not processed because of JDO Exception
    */
-  public void processNotifications(Collection<NotificationEvent> events) throws Exception {
+  public void processNotifications(Collection<NotificationEvent> events, long notificationId) throws Exception {
     boolean isNotificationProcessed;
+    long eventIdProcessed = notificationId;
     if (events.isEmpty()) {
       return;
     }
 
     for (NotificationEvent event : events) {
       isNotificationProcessed = false;
+      if (eventIdProcessed > 0) {
+        if (eventIdProcessed == event.getEventId()) {
+          LOGGER.info("Processing event with Duplicate event-id: {}", eventIdProcessed);
+        } else if (eventIdProcessed != event.getEventId() - 1) {
+          LOGGER.info("Events between ID's " + eventIdProcessed + " and "
+                  + event.getEventId() + " are either missing OR out of order");
+        }
+      }
+      eventIdProcessed = event.getEventId();
       try {
         // Only the leader should process the notifications
         if (!isLeader()) {
@@ -409,11 +375,12 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
           return;
         }
         isNotificationProcessed = notificationProcessor.processNotificationEvent(event);
+        notificationFetcher.updateCache(event);
       } catch (Exception e) {
         if (e.getCause() instanceof JDODataStoreException) {
           LOGGER.info("Received JDO Storage Exception, Could be because of processing "
               + "duplicate notification");
-          if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) {
+          if (event.getEventId() <= sentryStore.getMaxNotificationID()) {
             // Rest of the notifications need not be processed.
             LOGGER.error("Received event with Id: {} which is smaller then the ID "
                 + "persisted in store", event.getEventId());
@@ -431,6 +398,7 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
           // Continue processing the next notification.
           LOGGER.debug("Explicitly Persisting Notification ID = {} ", event.getEventId());
           sentryStore.persistLastProcessedNotificationID(event.getEventId());
+          notificationFetcher.updateCache(event);
         } catch (Exception failure) {
           LOGGER.error("Received exception while persisting the notification ID = {}", event.getEventId());
           throw failure;

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index edea5b6..03a6f45 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -40,6 +40,7 @@ import javax.jdo.PersistenceManager;
 import javax.jdo.PersistenceManagerFactory;
 import javax.jdo.Query;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.SentryAccessDeniedException;
@@ -467,7 +468,7 @@ public class SentryStore {
       @Override
       public Long getValue() {
         try {
-          return getLastProcessedNotificationID();
+          return getMaxNotificationID();
         } catch (Exception e) {
           LOGGER.error("Can not read current notificationId", e);
           return NOTIFICATION_UNKNOWN;
@@ -552,7 +553,7 @@ public class SentryStore {
   }
 
   @VisibleForTesting
-  void clearAllTables() {
+  public void clearAllTables() {
     try {
       tm.executeTransaction(
           new TransactionBlock<Object>() {
@@ -2803,7 +2804,7 @@ public class SentryStore {
    *
    * @return the last persisted snapshot ID. It returns 0 if no rows are found.
    */
-  private long getCurrentAuthzPathsSnapshotID() throws Exception {
+  public long getCurrentAuthzPathsSnapshotID() throws Exception {
     return tm.executeTransaction(
         new TransactionBlock<Long>() {
           @Override
@@ -3886,7 +3887,7 @@ public class SentryStore {
    * @return the notification ID of latest path change. If no change
    *         found then return 0.
    */
-  public Long getLastProcessedNotificationID() throws Exception {
+  public Long getMaxNotificationID() throws Exception {
     long notificationId = tm.executeTransaction(
     new TransactionBlock<Long>() {
       public Long execute(PersistenceManager pm) throws Exception {
@@ -4235,4 +4236,25 @@ public class SentryStore {
       }
     });
   }
+
+  /**
+   * Checks if notification with particular ID was already processed by searching
+   * for the ID on the MSentryHmsNotification table.
+   *
+   * @param id: event_id of the notification event.
+   * @return True if the notification was already processed; False otherwise
+   */
+  public boolean isNotificationIdProcessed(long id) throws Exception {
+    return tm.executeTransactionWithRetry(new TransactionBlock<Boolean>() {
+      @Override
+      public Boolean execute(PersistenceManager pm) throws Exception {
+        pm.setDetachAllOnCommit(false);
+        Query query = pm.newQuery(MSentryHmsNotification.class);
+        query.setFilter("this.notificationId == id");
+        query.declareParameters("long id");
+        List<MSentryHmsNotification> ids = (List<MSentryHmsNotification>) query.execute(id);
+        return (CollectionUtils.isEmpty(ids)) ? false : true;
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
index 93cc34f..30d6f50 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
@@ -18,10 +18,18 @@
 
 package org.apache.sentry.service.thrift;
 
+import static com.codahale.metrics.MetricRegistry.name;
+
+import com.codahale.metrics.Counter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
@@ -29,6 +37,8 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.sentry.hdfs.UniquePathsUpdate;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.core.common.exception.SentryOutOfSyncException;
+import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,14 +51,54 @@ public final class HiveNotificationFetcher implements AutoCloseable {
   private final SentryStore sentryStore;
   private final HiveConnectionFactory hmsConnectionFactory;
   private HiveMetaStoreClient hmsClient;
+  /*
+   This value helps HiveNotificationFetcher to decide the notification-id to use while fetching the notifications from
+   HMS to handle out-of-sequence notifications.
+   */
+  private final int refetchCount;
+
+  /*
+   In each iteration, HMSFollower will try to fetch notification which has the Max(event-id)
+   among the notifications it already fetched. This flag is used to check if that notification
+   is fetched in subsequent request. It's initialized to false in every iteration if sentry has fetched notifications
+   in any one the previous fetches.
+   */
+  private boolean foundLastProcessedNotification = false;
+  /*
+   Idea of this cache is to store the notification event id and hash to avoid database lookup as there will be a lot
+   of notifications that are fetched again and again to handle out-of-sync notifications. Least recently used entries
+   in this cache are evicted automatically once the map is full.
+  */
+  private final LRUMap cache;
+
+  /*
+  Cache is designed to hold all the events with event-id's in the range of MAX(event-id) (that sentry processed) and
+   MAX(event-id) - (notification fetch count) from the configuration. Theoretically, all of these event-id's could have
+   duplicates. Cache size should be sufficient to hold all of them. This factor is used to calculate the size of cache.
+   */
+  public static final int CACHE_BUFFER_FACTOR = 10;
+
+/*
+  This value is used to build a cache. Cache anticipates below duplicates and pre-allocated memory.
+ */
+  private static final int DUPLICATE_COUNT = 3;
 
-  /* The following cache and last filtered ID help us to avoid making less calls to the DB */
-  private long lastIdFiltered = 0;
-  private Set<String> cache = new HashSet<>();
+  // Counter for failed transactions
+  private static final Counter notificationCacheMissCount =
+          SentryMetrics.getInstance().
+                  getCounter(name(HiveNotificationFetcher.class,
+                          "Cache Miss"));
 
-  public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) {
+  public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory,
+                                 Configuration conf) {
+    refetchCount = conf.getInt(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT,
+            ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT_DEFAULT);
     this.sentryStore = sentryStore;
     this.hmsConnectionFactory = hmsConnectionFactory;
+    /*
+     Size of map is dependent on SENTRY_HMS_NOTIFICATION_REFETCH_COUNT and some additional buffer.
+    */
+    this.cache = new LRUMap(refetchCount + (refetchCount / CACHE_BUFFER_FACTOR));
   }
 
   /**
@@ -64,12 +114,82 @@ public final class HiveNotificationFetcher implements AutoCloseable {
   }
 
   /**
+   * Update cache with notification hash and event-id
+   * @param event Notification event
+   */
+  public void updateCache(NotificationEvent event) {
+    HashSet<String> eventSet;
+    String hash = UniquePathsUpdate.sha1(event);
+
+    if(Strings.isNullOrEmpty(hash)) {
+      LOGGER.error("Hash provided is either null/empty, Cache not updated");
+    }
+    eventSet = (HashSet<String>)cache.get(event.getEventId());
+    if(eventSet != null) {
+      eventSet.add(hash);
+    } else {
+      eventSet = new HashSet<>(DUPLICATE_COUNT, 1);
+      eventSet.add(hash);
+      cache.put(event.getEventId(), eventSet);
+    }
+  }
+
+  /**
+   * Find if the Notification is found in Cache.
+   * @param eventId notification event id
+   * @param hash notification hash
+   * @return True, if the hash is found in Cache
+   *         False, otherwise
+   */
+  @VisibleForTesting
+  boolean isFoundInCache(long eventId, String hash) {
+    HashSet<String> eventSet;
+    eventSet = (HashSet<String>)cache.get(eventId);
+
+    if(eventSet == null) {
+      return false;
+    } else {
+      return eventSet.contains(hash);
+    }
+  }
+
+  /**
+   * Find if the Notification event-id is found in Cache.
+   * @param eventId notification event id.
+   * @return True, if the event-id is found in Cache
+   *         False, otherwise
+   */
+  boolean isFoundInCache(long eventId) {
+    return cache.get(eventId) != null;
+  }
+
+  /**
+   * Get the Cache size
+   * @return size of the Cache
+   */
+  @VisibleForTesting
+  int getCacheSize() {
+    return cache.size();
+  }
+
+  /**
    * Fetch new HMS notifications appeared since a specified event ID. The returned list may
    * include notifications with the same specified ID if they were not seen by Sentry.
+   * In each iteration HiveNotificationFetcher will get the Max(event-id) that sentry has processed and tries to go back
+   * by configured number of event-ids and re-fetches them. Idea is that Max(event-id) should be fetched in subsequent
+   * fetch. If not, sentry should consider create full snapshot.This could happen for several scenario's.
+   * <ul>
+   *   <li>HMS is been restored from a snapshot taken in the past</li>
+   *   <li>Sentry did not fetch notifications from for a while. One use case is HDFS Sync being disabled</li>
+   *   <li>HMS is intentionally reset</li>
+   *   <li>NOTIFICATION_LOG table is cleared</li>
+   * </ul>
    *
    * @param lastEventId The event ID to use to request notifications.
    * @param maxEvents The maximum number of events to fetch.
    * @return A list of newer notifications unseen by Sentry.
+   * @throws  SentryOutOfSyncException If event with event-id equals to Max(event-id) processed by sentry is not received
+   *   in HMS response.
    * @throws Exception If an error occurs on the HMS communication.
    */
   List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception {
@@ -84,16 +204,41 @@ public final class HiveNotificationFetcher implements AutoCloseable {
      *
      * TODO: We can avoid doing this once HIVE-16886 is fixed.
      */
-    if (lastEventId > 0) {
-      filter = createNotificationFilterFor(lastEventId);
-      lastEventId--;
+
+    // Value of lastEventId has to be retained for logging purposes, making a copy.
+    long minFetchId = lastEventId;
+    if (minFetchId > 0) {
+      filter = createNotificationFilterFor(minFetchId);
+      minFetchId = (minFetchId > refetchCount) ? (minFetchId - refetchCount) : 0;
+      foundLastProcessedNotification = false;
+    } else {
+      foundLastProcessedNotification = true;
     }
 
-    LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId);
+    LOGGER.debug("Requesting HMS notifications since ID = {} Max(Event-id) processed: {}", minFetchId, lastEventId);
 
     NotificationEventResponse response;
     try {
-      response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter);
+      /**
+       * Logic below triggers full-snapshots in below situation.
+       * 1. When HMS response with notification events does not have the event with Max(Event-id) processed by sentry
+       *     and the event-id of the first event in the response is not equal to Max(Event-id)+1 processed by sentry
+       * Logic below will not trigger full-snapshots in below situations.
+       * 1. When the response doesn't have ay notification events. This will be seen in situations where there no changes
+       * to HMS data for a white and all the notification events are evicted from the NOTIFICATION_LOG table because of
+       * TTL expiration.
+       * 2. When HMS response with notification events does have the event with Max(Event-id) processed by sentry.
+       */
+      response = getHmsClient().getNextNotification(minFetchId, maxEvents, filter);
+      if ((response != null) && (response.getEventsSize() > 0) && !foundLastProcessedNotification &&
+              (response.getEvents().get(0).getEventId() != (lastEventId + 1))) {
+        LOGGER.error("Max event-id processed by sentry is " + lastEventId + " but the " +
+                "Id of the first event received from HMS in subsequent fetch is " +
+                response.getEvents().get(0).getEventId() + ", Requesting for Full snapshot");
+        //Full snapshot should be requested.
+        throw new SentryOutOfSyncException("Notification Log doesn't have the " +
+                "last notification processed by sentry");
+      }
     } catch (Exception e) {
       close();
       throw e;
@@ -121,27 +266,31 @@ public final class HiveNotificationFetcher implements AutoCloseable {
      * specified ID. If a new filter ID is used, then we clean up the cache.
      */
 
-    if (lastIdFiltered != id) {
-      lastIdFiltered = id;
-      cache.clear();
-    }
-
     return new NotificationFilter() {
       @Override
       public boolean accept(NotificationEvent notificationEvent) {
-        if (notificationEvent.getEventId() == id) {
+        LOGGER.debug("Applying filter created for event-id {} on Event with ID:{}", id, notificationEvent.getEventId());
+        if (notificationEvent.getEventId() <= id) {
           String hash = UniquePathsUpdate.sha1(notificationEvent);
-
           try {
-            if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) {
-              cache.add(hash);
-
-              LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id);
+            // This check makes sure that the last notification processed by
+            // HMSFollower is present in NOTIFICATION_LOG table. If it is not found
+            // it can be assumed that there were events that were cleaned up before
+            // Sentry could fetch them. When this happens sentry should take full snapshot again.
+            if ((notificationEvent.getEventId() == id) && !foundLastProcessedNotification) {
+              foundLastProcessedNotification = true;
+            }
+            if (isFoundInCache(notificationEvent.getEventId(), hash) == true) {
+              LOGGER.debug("Ignoring HMS notification already processed: ID = {}", notificationEvent.getEventId());
+              return false;
+            } else if (sentryStore.isNotificationProcessed(hash)) {
+              notificationCacheMissCount.inc();
+              LOGGER.debug("Ignoring HMS notification already processed: ID = {} - cache miss", notificationEvent.getEventId());
               return false;
             }
           } catch (Exception e) {
             LOGGER.error("An error occurred while checking if notification {} is already "
-                + "processed: {}", id, e.getMessage());
+                    + "processed: {}", notificationEvent.getEventId(), e);
 
             // We cannot throw an exception on this filter, so we return false assuming this
             // notification is already processed
@@ -203,9 +352,16 @@ public final class HiveNotificationFetcher implements AutoCloseable {
         hmsClient.close();
       }
 
-      cache.clear();
     } finally {
       hmsClient = null;
     }
   }
+
+  /**
+   * Gets notification cache miss count.
+   * @return notification cache miss count.
+   */
+  public long getNotificationCacheMissCount() {
+    return notificationCacheMissCount.getCount();
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 96c6810..827b078 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -642,4 +642,8 @@ public class SentryService implements Callable, SigUtils.SigListener {
     // Become follower
     leaderMonitor.deactivate();
   }
+  @VisibleForTesting
+  public long getCurrentAuthzPathsSnapshotID() throws Exception {
+    return sentryStore.getCurrentAuthzPathsSnapshotID();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index 7e02874..d16d2fd 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -241,6 +241,12 @@ public class ServiceConstants {
     public static final int SENTRY_DELTA_KEEP_COUNT_DEFAULT = 200;
 
     /**
+     * Number of notifications that HMSFollower should re-fetch in periodic pull from HMS.
+     */
+    public static final String SENTRY_HMS_NOTIFICATION_REFETCH_COUNT = "sentry_hms_notification_refetch_count";
+    public static final int SENTRY_HMS_NOTIFICATION_REFETCH_COUNT_DEFAULT = 100;
+
+    /**
      * Number of notification id's to keep around during cleaning
      */
     public static final String SENTRY_HMS_NOTIFICATION_ID_KEEP_COUNT = "sentry.server.delta.keep.count";

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
index 7903078..22ec61b 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
@@ -64,6 +64,8 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import javax.security.auth.login.LoginException;
 
@@ -128,7 +130,7 @@ public class TestHMSFollower {
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot because AuthzPathsMapping is empty
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true);
     when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
     hmsFollower.run();
@@ -140,7 +142,7 @@ public class TestHMSFollower {
     reset(sentryStore);
 
     // 2nd run should not get a snapshot because is already processed
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+    when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId());
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
     hmsFollower.run();
     verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
@@ -198,7 +200,7 @@ public class TestHMSFollower {
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot because AuthzPathsMapping is empty
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true);
     when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
     hmsFollower.run();
@@ -210,7 +212,7 @@ public class TestHMSFollower {
     reset(sentryStore);
 
     // 2nd run should not get a snapshot because is already processed
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+    when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId());
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
     hmsFollower.run();
     verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
@@ -222,7 +224,7 @@ public class TestHMSFollower {
     // but because of full update trigger it will, as in the first run
     PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_HMS, "message");
 
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+    when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId());
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
     hmsFollower.run();
     verify(sentryStore, times(1)).persistFullPathsImage(
@@ -233,7 +235,7 @@ public class TestHMSFollower {
 
     // 4th run should not get a snapshot because is already processed and publish-subscribe
     // trigger is only supposed to work once. This is exactly as 2nd run.
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+    when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId());
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
     hmsFollower.run();
     verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
@@ -284,7 +286,7 @@ public class TestHMSFollower {
 
     // 1st run should get a full snapshot because hms notificaions is empty
     // but it should never be persisted because HDFS sync is disabled
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
     when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
     hmsFollower.run();
     verify(sentryStore, times(0)).persistFullPathsImage(
@@ -309,7 +311,7 @@ public class TestHMSFollower {
 
     //Set last processed notification Id to match the full new value 1L
     final long LATEST_EVENT_ID = 1L;
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(LATEST_EVENT_ID);
+    when(sentryStore.getMaxNotificationID()).thenReturn(LATEST_EVENT_ID);
     //Mock that sets isHmsNotificationEmpty to false
     when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
     // Mock that sets the current HMS notification ID. Set it to match
@@ -369,7 +371,7 @@ public class TestHMSFollower {
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
     hmsFollower.run();
     verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
@@ -378,7 +380,7 @@ public class TestHMSFollower {
     reset(sentryStore);
 
     // 2nd run should not get a snapshot because is already processed
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
+    when(sentryStore.getMaxNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
     hmsFollower.run();
     verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
@@ -416,20 +418,33 @@ public class TestHMSFollower {
     SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
     when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
 
-    when(hmsClientMock.getNextNotification(Mockito.eq(SENTRY_PROCESSED_EVENT_ID - 1), Mockito.eq(Integer.MAX_VALUE),
-        (NotificationFilter) Mockito.notNull()))
-        .thenReturn(new NotificationEventResponse(
-            Arrays.<NotificationEvent>asList(
+    when(hmsClientMock.getNextNotification(Mockito.anyLong(), Mockito.eq(Integer.MAX_VALUE),
+            (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() {
+      @Override
+      public NotificationEventResponse answer(InvocationOnMock invocation)
+              throws Throwable {
+        NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2];
+        NotificationEventResponse response = new NotificationEventResponse();
+
+        List<NotificationEvent> events = Arrays.<NotificationEvent>asList(
                 new NotificationEvent(fullSnapshot.getId(), 0, "", "")
-            )
-        ));
+        );
+
+        for (NotificationEvent event : events) {
+          if (filter.accept(event)) {
+            response.addToEvents(event);
+          }
+        }
 
+        return response;
+      }
+    });
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
         hmsConnectionMock, hiveInstance);
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot
-    when(sentryStore.getLastProcessedNotificationID())
+    when(sentryStore.getMaxNotificationID())
         .thenReturn(SENTRY_PROCESSED_EVENT_ID);
     when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
     hmsFollower.run();
@@ -439,8 +454,11 @@ public class TestHMSFollower {
     reset(sentryStore);
 
     // 2nd run should not get a snapshot because is already processed
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
+    when(sentryStore.getMaxNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    when(sentryStore.isNotificationIdProcessed(fullSnapshot.getId())).thenReturn(true);
+    when(sentryStore.isNotificationProcessed(UniquePathsUpdate.sha1(new NotificationEvent(
+            fullSnapshot.getId(), 0, "", "")))).thenReturn(true);
     hmsFollower.run();
     verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
     verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
@@ -525,7 +543,7 @@ public class TestHMSFollower {
     events.add(notificationEvent);
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
         hiveConnectionFactory, hiveInstance);
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
@@ -555,7 +573,7 @@ public class TestHMSFollower {
 
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
         hiveConnectionFactory, hiveInstance);
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
@@ -589,7 +607,7 @@ public class TestHMSFollower {
 
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
         hiveConnectionFactory, hiveInstance);
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
@@ -624,7 +642,7 @@ public class TestHMSFollower {
 
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
         hiveConnectionFactory, hiveInstance);
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
@@ -665,7 +683,7 @@ public class TestHMSFollower {
 
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
         hiveConnectionFactory, hiveInstance);
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
@@ -724,7 +742,7 @@ public class TestHMSFollower {
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
     // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
     // and persistLastProcessedNotificationID was not invoked.
     //noinspection unchecked
@@ -748,7 +766,7 @@ public class TestHMSFollower {
     events.add(notificationEvent);
     inputEventId += 1;
     //Process the notification
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
     // Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification
     // and persistLastProcessedNotificationID was not invoked.
     //noinspection unchecked
@@ -768,7 +786,7 @@ public class TestHMSFollower {
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
     // Make sure that persistLastProcessedNotificationID is invoked explicitly.
     verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1);
     reset(sentryStore);
@@ -786,7 +804,7 @@ public class TestHMSFollower {
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
     // Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION
     // notification and persistLastProcessedNotificationID was not invoked.
     verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(),
@@ -809,7 +827,7 @@ public class TestHMSFollower {
     notificationEvent.setTableName(tableName2);
     events.add(notificationEvent);
     // Process the notification
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
     // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
     // and persistLastProcessedNotificationID was not invoked.
     //noinspection unchecked
@@ -864,7 +882,7 @@ public class TestHMSFollower {
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
     // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
     // and persistLastProcessedNotificationID was not invoked.
     //noinspection unchecked
@@ -889,7 +907,7 @@ public class TestHMSFollower {
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
     // Make sure that renameAuthzObj and deleteAuthzPathsMapping were  not invoked
     // to handle CREATE_TABLE notification
     // and persistLastProcessedNotificationID is explicitly invoked
@@ -916,7 +934,7 @@ public class TestHMSFollower {
     notificationEvent.setTableName(tableName2);
     events.add(notificationEvent);
     // Process the notification
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
     // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
     // and persistLastProcessedNotificationID was not invoked.
     //noinspection unchecked
@@ -970,7 +988,7 @@ public class TestHMSFollower {
     Configuration configuration = new Configuration();
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
         hiveConnectionFactory, hiveInstance);
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     // invalid event updates notification ID directly
     verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1);
@@ -1012,7 +1030,7 @@ public class TestHMSFollower {
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot because AuthzPathsMapping is empty
-    when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
     when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
     when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
     hmsFollower.run();
@@ -1046,7 +1064,7 @@ public class TestHMSFollower {
     Configuration configuration = new Configuration();
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
         hiveConnectionFactory, hiveInstance);
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java
index 91c90f9..996f554 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java
@@ -190,7 +190,7 @@ public class TestHMSFollowerSentryStoreIntegration {
     List<NotificationEvent> events = new ArrayList<>();
     events.add(notificationEvent);
 
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     Assert.assertEquals(1, sentryStore.getAllTSentryPrivilegesByRoleName(roleName1)
         .size());
@@ -248,7 +248,7 @@ public class TestHMSFollowerSentryStoreIntegration {
     List<NotificationEvent> events = new ArrayList<>();
     events.add(notificationEvent);
 
-    hmsFollower.processNotifications(events);
+    hmsFollower.processNotifications(events, 0);
 
     Assert.assertEquals(1, sentryStore.getAllTSentryPrivilegesByRoleName(roleName1)
         .size());

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index b410027..9a03f48 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -2466,7 +2466,7 @@ public class TestSentryStore extends org.junit.Assert {
     long notificationID = 11;
     sentryStore.persistFullPathsImage(authzPaths, notificationID);
     PathsUpdate pathsUpdate = sentryStore.retrieveFullPathsImageUpdate(prefixes);
-    long savedNotificationID = sentryStore.getLastProcessedNotificationID();
+    long savedNotificationID = sentryStore.getMaxNotificationID();
     assertEquals(1, pathsUpdate.getImgNum());
     TPathsDump pathDump = pathsUpdate.toThrift().getPathsDump();
     Map<Integer, TPathEntry> nodeMap = pathDump.getNodeMap();
@@ -2519,7 +2519,7 @@ public class TestSentryStore extends org.junit.Assert {
     sentryStore.addAuthzPathsMapping("db2", Arrays.asList("/hive/db2"), update2);
 
     // Check the latest persisted ID matches to both the path updates
-    long latestID = sentryStore.getLastProcessedNotificationID();
+    long latestID = sentryStore.getMaxNotificationID();
     assertEquals(notificationID, latestID);
 
     String []prefixes = {"/hive"};
@@ -2585,7 +2585,7 @@ public class TestSentryStore extends org.junit.Assert {
     sentryStore.persistLastProcessedNotificationID(notificationID);
 
     // Retrieving latest peristed ID should match with the previous persisted ID
-    long latestID = sentryStore.getLastProcessedNotificationID();
+    long latestID = sentryStore.getMaxNotificationID();
     assertEquals(notificationID, latestID);
   }
 
@@ -2597,7 +2597,7 @@ public class TestSentryStore extends org.junit.Assert {
     sentryStore.persistFullPathsImage(new HashMap<String, Collection<String>>(), notificationID);
 
     // Add "db1.table1" authzObj
-    Long lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    Long lastNotificationId = sentryStore.getMaxNotificationID();
     UniquePathsUpdate addUpdate = new UniquePathsUpdate("u1", 1, false);
     addUpdate.newPathChange("db1.table").
           addToAddPaths(Arrays.asList("db1", "tbl1"));
@@ -2624,7 +2624,7 @@ public class TestSentryStore extends org.junit.Assert {
     long lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange addPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(addUpdate.JSONSerialize(), addPathChange.getPathChange());
-    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    lastNotificationId = sentryStore.getMaxNotificationID();
     assertEquals(1, lastNotificationId.longValue());
 
     // Delete path 'db1.db/tbl1' from "db1.table1" authzObj.
@@ -2649,7 +2649,7 @@ public class TestSentryStore extends org.junit.Assert {
     lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange delPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(delUpdate.JSONSerialize(), delPathChange.getPathChange());
-    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    lastNotificationId = sentryStore.getMaxNotificationID();
     assertEquals(2, lastNotificationId.longValue());
 
     // Delete "db1.table" authzObj from the authzObj -> [Paths] mapping.
@@ -2674,7 +2674,7 @@ public class TestSentryStore extends org.junit.Assert {
     MSentryPathChange delAllPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(delAllupdate.JSONSerialize(), delAllPathChange.getPathChange());
 
-    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    lastNotificationId = sentryStore.getMaxNotificationID();
     assertEquals(3, lastNotificationId.longValue());
 
   }
@@ -2682,7 +2682,7 @@ public class TestSentryStore extends org.junit.Assert {
   @Test
   public void testRenameUpdateAuthzPathsMapping() throws Exception {
     Map<String, Collection<String>> authzPaths = new HashMap<>();
-    Long lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    Long lastNotificationId = sentryStore.getMaxNotificationID();
     authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1",
                                                 "user/hive/warehouse/db1.db/table1/p1"));
     authzPaths.put("db1.table2", Sets.newHashSet("user/hive/warehouse/db1.db/table2"));
@@ -2728,7 +2728,7 @@ public class TestSentryStore extends org.junit.Assert {
     long lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange());
-    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    lastNotificationId = sentryStore.getMaxNotificationID();
     assertEquals(1, lastNotificationId.longValue());
     // Rename 'db1.table1' to "db1.table2" but did not change its location.
     renameUpdate = new UniquePathsUpdate("u2",2, false);
@@ -2752,7 +2752,7 @@ public class TestSentryStore extends org.junit.Assert {
     assertTrue(CollectionUtils.isEqualCollection(Lists.newArrayList("user/hive/warehouse/db1.db/table1/p1",
             "user/hive/warehouse/db1.db/newTable1"),
             pathsImage.get("db1.newTable2")));
-    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    lastNotificationId = sentryStore.getMaxNotificationID();
     assertEquals(2, lastNotificationId.longValue());
 
     // Query the persisted path change and ensure it equals to the original one
@@ -2790,7 +2790,7 @@ public class TestSentryStore extends org.junit.Assert {
     lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange updatePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(update.JSONSerialize(), updatePathChange.getPathChange());
-    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    lastNotificationId = sentryStore.getMaxNotificationID();
     assertEquals(3, lastNotificationId.longValue());
   }
 
@@ -2914,7 +2914,7 @@ public class TestSentryStore extends org.junit.Assert {
     assertEquals(1, pathImage.get("db2.table").size());
     assertEquals(3, sentryStore.getMPaths().size());
 
-    Long lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    Long lastNotificationId = sentryStore.getMaxNotificationID();
     assertEquals(notificationID, lastNotificationId.longValue());
   }
 
@@ -3419,7 +3419,7 @@ public class TestSentryStore extends org.junit.Assert {
   @Test
   public void testDuplicateNotification() throws Exception {
     Map<String, Collection<String>> authzPaths = new HashMap<>();
-    Long lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    Long lastNotificationId = sentryStore.getMaxNotificationID();
 
     lastNotificationId ++;
     authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1",
@@ -3471,7 +3471,7 @@ public class TestSentryStore extends org.junit.Assert {
     long lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange());
-    Long savedLastNotificationId = sentryStore.getLastProcessedNotificationID();
+    Long savedLastNotificationId = sentryStore.getMaxNotificationID();
     assertEquals(lastNotificationId.longValue(), savedLastNotificationId.longValue());
 
 
@@ -3531,7 +3531,7 @@ public class TestSentryStore extends org.junit.Assert {
     localSentryStore.persistFullPathsImage(new HashMap<String, Collection<String>>(), 0);
 
     // Add "db1.table1" authzObj
-    Long lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    Long lastNotificationId = sentryStore.getMaxNotificationID();
     UniquePathsUpdate addUpdate = new UniquePathsUpdate("u1",1, false);
     addUpdate.newPathChange("db1.table").
         addToAddPaths(Arrays.asList("db1", "tbl1"));
@@ -3601,7 +3601,7 @@ public class TestSentryStore extends org.junit.Assert {
     lastChangeID = localSentryStore.getLastProcessedPathChangeID();
     assertEquals(0, lastChangeID);
 
-    lastNotificationId = localSentryStore.getLastProcessedNotificationID();
+    lastNotificationId = localSentryStore.getMaxNotificationID();
     assertEquals(0, lastNotificationId.longValue());
 
     // enable HDFS for other tests

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
index 83a1bec..3a74b70 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
@@ -18,17 +18,20 @@
 
 package org.apache.sentry.service.thrift;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.sentry.hdfs.UniquePathsUpdate;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -44,7 +47,7 @@ public class TestHiveNotificationFetcher {
 
     Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
 
-    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) {
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) {
       List<NotificationEvent> events;
 
       Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null))
@@ -63,7 +66,7 @@ public class TestHiveNotificationFetcher {
 
     Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
 
-    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) {
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) {
       List<NotificationEvent> events;
 
       Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null))
@@ -82,7 +85,7 @@ public class TestHiveNotificationFetcher {
 
     Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
 
-    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) {
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) {
       List<NotificationEvent> events;
 
       Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null))
@@ -110,9 +113,11 @@ public class TestHiveNotificationFetcher {
 
     Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
 
-    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) {
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) {
       List<NotificationEvent> events;
 
+      // Updating the Cache in the notification
+      fetcher.updateCache(new NotificationEvent(1L, 0, "CREATE_DATABASE", ""));
       /*
        * Requesting an ID > 0 will request all notifications from 0 again but filter those
        * already seen notifications with ID = 1
@@ -134,13 +139,9 @@ public class TestHiveNotificationFetcher {
               );
 
               for (NotificationEvent event : events) {
-                String hash = UniquePathsUpdate.sha1(event);
-                
                 // We simulate that CREATE_DATABASE is already processed
                 if (event.getEventType().equals("CREATE_DATABASE")) {
-                  Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(true);
-                } else {
-                  Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(false);
+                  Mockito.when(store.isNotificationIdProcessed(1)).thenReturn(true);
                 }
 
                 if (filter.accept(event)) {
@@ -153,6 +154,7 @@ public class TestHiveNotificationFetcher {
           });
 
       events = fetcher.fetchNotifications(1);
+      verify(store, times(1)).isNotificationProcessed(Mockito.anyString());
       assertEquals(2, events.size());
       assertEquals(1, events.get(0).getEventId());
       assertEquals("CREATE_TABLE", events.get(0).getEventType());
@@ -160,4 +162,200 @@ public class TestHiveNotificationFetcher {
       assertEquals("ALTER_TABLE", events.get(1).getEventType());
     }
   }
+
+  /**
+   * Test verifies that any out-of-sync notifications which below the SENTRY_HMS_NOTIFICATION_REFETCH_COUNT
+   * threshold will be fetched in the subsequent fetches.
+   * @throws Exception
+   */
+  @Test
+  public void testOutofSyncNotifications() throws Exception {
+    SentryStore store = Mockito.mock(SentryStore.class);
+    HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class);
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) {
+      List<NotificationEvent> events;
+
+      // This mock will also test that the NotificationFilter works as expected
+      Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
+              (NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() {
+        @Override
+        public NotificationEventResponse answer(InvocationOnMock invocation)
+                throws Throwable {
+          List<NotificationEvent> events = Arrays.<NotificationEvent>asList(
+                  new NotificationEvent(1L, 0, "CREATE_DATABASE", ""),
+                  new NotificationEvent(2L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(3L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(7L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(8L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(9L, 0, "CREATE_TABLE", "")
+          );
+          NotificationEventResponse response = new NotificationEventResponse(events);
+          for (NotificationEvent event : events) {
+            fetcher.updateCache(event);
+          }
+          return response;
+        }
+      });
+
+      events = fetcher.fetchNotifications(0);
+      assertEquals(6, events.size());
+      assertEquals(1, events.get(0).getEventId());
+      assertEquals("CREATE_DATABASE", events.get(0).getEventType());
+      assertEquals(2, events.get(1).getEventId());
+      assertEquals("CREATE_TABLE", events.get(1).getEventType());
+      verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject());
+
+      reset(hmsClient);
+      // This mock will also test that the NotificationFilter works as expected
+      Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
+              (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() {
+        @Override
+        public NotificationEventResponse answer(InvocationOnMock invocation)
+                throws Throwable {
+          NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2];
+          NotificationEventResponse response = new NotificationEventResponse();
+
+          List<NotificationEvent> events = Arrays.<NotificationEvent>asList(
+                  new NotificationEvent(1L, 0, "CREATE_DATABASE", ""),
+                  new NotificationEvent(2L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(3L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(4L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(5L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(6L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(7L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(8L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(9L, 0, "CREATE_TABLE", "")
+          );
+
+          for (NotificationEvent event : events) {
+            // We simulate that CREATE_DATABASE is already processed
+            if (event.getEventId() == 9) {
+              Mockito.when(store.isNotificationIdProcessed(9)).thenReturn(true);
+            }
+            if (filter.accept(event)) {
+              response.addToEvents(event);
+            }
+          }
+
+          return response;
+        }
+      });
+
+      events = fetcher.fetchNotifications(9);
+      assertEquals(3, events.size());
+      assertEquals(4, events.get(0).getEventId());
+      verify(store, times(3)).isNotificationProcessed(Mockito.anyString());
+      verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject());
+    }
+  }
+
+  /**
+   * Test verifies that any out-of-sync notifications which above the SENTRY_HMS_NOTIFICATION_REFETCH_COUNT
+   * threshold will be lost and will not be fetched.
+   * @throws Exception
+   */
+  @Test
+  public void testMissingNotifications() throws Exception {
+    SentryStore store = Mockito.mock(SentryStore.class);
+    HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class);
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+
+    Configuration conf = new Configuration();
+    conf.set(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT, "3");
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, conf)) {
+      List<NotificationEvent> events;
+
+      // This mock will also test that the NotificationFilter works as expected
+      Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
+              (NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() {
+        @Override
+        public NotificationEventResponse answer(InvocationOnMock invocation)
+                throws Throwable {
+          List<NotificationEvent> events = Arrays.<NotificationEvent>asList(
+                  new NotificationEvent(1L, 0, "CREATE_DATABASE", ""),
+                  new NotificationEvent(2L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(8L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(9L, 0, "CREATE_TABLE", "")
+          );
+          NotificationEventResponse response = new NotificationEventResponse(events);
+          for (NotificationEvent event : events) {
+            fetcher.updateCache(event);
+          }
+          return response;
+        }
+      });
+
+      events = fetcher.fetchNotifications(0);
+      assertEquals(4, events.size());
+      assertEquals(1, events.get(0).getEventId());
+      assertEquals("CREATE_DATABASE", events.get(0).getEventType());
+      assertEquals(2, events.get(1).getEventId());
+      assertEquals("CREATE_TABLE", events.get(1).getEventType());
+      verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject());
+
+      reset(hmsClient);
+      // This mock will also test that the NotificationFilter works as expected
+      Mockito.when(hmsClient.getNextNotification(Mockito.eq(6L), Mockito.eq(Integer.MAX_VALUE),
+              (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() {
+        @Override
+        public NotificationEventResponse answer(InvocationOnMock invocation)
+                throws Throwable {
+          NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2];
+          NotificationEventResponse response = new NotificationEventResponse();
+
+          List<NotificationEvent> events = Arrays.<NotificationEvent>asList(
+                  new NotificationEvent(7L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(8L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(9L, 0, "CREATE_TABLE", "")
+          );
+
+          for (NotificationEvent event : events) {
+            // We simulate that CREATE_DATABASE is already processed
+            if (event.getEventId() == 9) {
+              Mockito.when(store.isNotificationIdProcessed(9)).thenReturn(true);
+            }
+            if (filter.accept(event)) {
+              response.addToEvents(event);
+            }
+          }
+
+          return response;
+        }
+      });
+
+      events = fetcher.fetchNotifications(9);
+      assertEquals(1, events.size());
+      assertEquals(7, events.get(0).getEventId());
+      verify(store, times(1)).isNotificationProcessed(Mockito.anyString());
+      verify(hmsClient, times(1)).getNextNotification(Mockito.eq(6L), Mockito.anyInt(), Mockito.anyObject());
+    }
+  }
+
+  @Test
+  public void verifyCache() throws Exception {
+    SentryStore store = Mockito.mock(SentryStore.class);
+    HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class);
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+    Configuration conf = new Configuration();
+    // With this configuration, cache size should be 9.
+    conf.set(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT, "3");
+    HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, conf);
+
+    for (int i = 0; i < 15 ; i++) {
+      fetcher.updateCache(new NotificationEvent(i, 0, "CREATE_DATABASE", ""));
+    }
+
+    assertEquals("Invalid Cache size", 3, fetcher.getCacheSize());
+    for (int i = 0; i < (15 - (3 + (3/HiveNotificationFetcher.CACHE_BUFFER_FACTOR))) ; i++) {
+      assertFalse("Cache entry for " + i + " should have been evicted", fetcher.isFoundInCache(i));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java
new file mode 100644
index 0000000..a81fdf4
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java
@@ -0,0 +1,203 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  <p>
+  http://www.apache.org/licenses/LICENSE-2.0
+  <p>
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.security.alias.UserProvider;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class tests the notification metadata cache in HiveNotificationFetcher.
+ */
+public class TestHiveNotificationFetcherCache {
+  private static Configuration conf = null;
+  private static File dataDir;
+  private static File policyFilePath;
+  private static String[] adminGroups = {"adminGroup1"};
+  private static char[] passwd = new char[]{'1', '2', '3'};
+  private static SentryStore store;
+  private static HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class);
+  private static HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+  private static List<NotificationEvent> unFilteredEvents = new ArrayList<NotificationEvent>();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    conf = new Configuration(false);
+    final String ourUrl = UserProvider.SCHEME_NAME + ":///";
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, ourUrl);
+
+    // enable HDFS sync, so perm and path changes will be saved into DB
+    conf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
+    conf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin");
+
+    // THis should be a UserGroupInformation provider
+    CredentialProvider provider = CredentialProviderFactory.getProviders(conf).get(0);
+
+
+    // The user credentials are stored as a static variable by UserGrouoInformation provider.
+    // We need to only set the password the first time, an attempt to set it for the second
+    // time fails with an exception.
+    if (provider.getCredentialEntry(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS) == null) {
+      provider.createCredentialEntry(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS, passwd);
+      provider.flush();
+    }
+
+    dataDir = new File(Files.createTempDir(), "sentry_policy_db");
+    conf.set(ServiceConstants.ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
+    conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_URL,
+            "jdbc:derby:;databaseName=" + dataDir.getPath() + ";create=true");
+    conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS, "dummy");
+    conf.setStrings(ServiceConstants.ServerConfig.ADMIN_GROUPS, adminGroups);
+    conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_GROUP_MAPPING,
+            ServiceConstants.ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
+    policyFilePath = new File(dataDir, "local_policy_file.ini");
+    conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE,
+            policyFilePath.getPath());
+
+    // These tests do not need to retry transactions, so setting to 1 to reduce testing time
+    conf.setInt(ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY, 1);
+
+    // SentryStore should be initialized only once. The tables created by the test cases will
+    // be cleaned up during the @After method.
+     store = new SentryStore(conf);
+
+      boolean hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabled(conf);
+      store.setPersistUpdateDeltas(hdfsSyncEnabled);
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+
+    Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
+            (IMetaStoreClient.NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() {
+      @Override
+      public NotificationEventResponse answer(InvocationOnMock invocation)
+              throws Throwable {
+        NotificationEventResponse response = new NotificationEventResponse();
+        for (NotificationEvent event : unFilteredEvents) {
+          response.addToEvents(event);
+        }
+
+        return response;
+      }
+    });
+
+    // This mock will also test that the NotificationFilter works as expected
+    Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE),
+            (IMetaStoreClient.NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() {
+      @Override
+      public NotificationEventResponse answer(InvocationOnMock invocation)
+              throws Throwable {
+        IMetaStoreClient.NotificationFilter filter = (IMetaStoreClient.NotificationFilter) invocation.getArguments()[2];
+        NotificationEventResponse response = new NotificationEventResponse();
+        for (NotificationEvent event : unFilteredEvents) {
+          if (filter.accept(event)) {
+            response.addToEvents(event);
+          }
+        }
+
+        return response;
+      }
+    });
+  }
+
+  @After
+  public void after() {
+    store.clearAllTables();
+    unFilteredEvents.clear();
+  }
+
+  /**
+   * This test makes sure that there is no additional load on the database because of additional fetches done by
+   * notification fetcher by making sure that there are no cache miss when there are no gaps and out-of-sequence
+   * notifications.
+   * @throws Exception
+   */
+  @Test
+  public void testWithNoGapsAndOutOfSequenceNotifications() throws Exception {
+
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) {
+      List<NotificationEvent> filteredEvents;
+
+      for (int count = 1; count <= 150; count++) {
+        unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", ""));
+      }
+
+      filteredEvents = fetcher.fetchNotifications(0);
+      assertEquals(150, filteredEvents.size());
+      assertEquals(1, filteredEvents.get(0).getEventId());
+      assertEquals("CREATE_DATABASE", filteredEvents.get(0).getEventType());
+
+      filteredEvents = fetcher.fetchNotifications(150);
+      assertEquals(0, filteredEvents.size());
+      assertEquals(0, fetcher.getNotificationCacheMissCount());
+    }
+  }
+
+  /**
+   * This test makes sure that there is no additional load on the database because of additional fetches done by
+   * notification fetcher by making sure that there are no cache miss even when there are gaps and out-of-sequence
+   * notifications.
+   * @throws Exception
+   */
+  @Test
+  public void testWithGapsAndOutOfSequenceNotifications() throws Exception {
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) {
+      List<NotificationEvent> filteredEvents = new ArrayList<NotificationEvent>();
+      int count = 1;
+      for (int fetchCount = 1; fetchCount <= 10; fetchCount++) {
+        for (; count <= (fetchCount * 10); count++) {
+          if (!(count % 3 == 0 || count % 7 == 0) || (count %10 == 0)) {
+            unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", ""));
+          }
+        }
+        filteredEvents = fetcher.fetchNotifications(count - 1);
+      }
+
+      assertTrue("Invalid notification count", (filteredEvents.size() < 100));
+      assertEquals(0, fetcher.getNotificationCacheMissCount());
+
+      for (count = 1; count <= 100; count++) {
+        if (count % 3 == 0 || count % 7 == 0) {
+          unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", ""));
+        }
+      }
+      fetcher.fetchNotifications(count - 1);
+      assertEquals(0, fetcher.getNotificationCacheMissCount());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java
new file mode 100644
index 0000000..88ce7f9
--- /dev/null
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.dbprovider;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import org.apache.sentry.tests.e2e.hdfs.TestHDFSIntegrationBase;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class covers basic scenario of snapshot creation and makes sure that
+ * HMSFollower takes a full snapshot when sentry server comes up and not
+ * subsequently.
+ */
+public class TestSnapshotCreation extends TestHDFSIntegrationBase {
+
+  private final static String tableName1 = "tb_1";
+  private final static String tableName2 = "tb_2";
+  private final static String tableName3 = "tb_3";
+  private final static String tableName4 = "tb_4";
+
+  protected static final String DB1 = "db_1",
+          DB2 = "db_2";
+
+  private Connection connection;
+  private Statement statement;
+
+  @Before
+  public void initialize() throws Exception {
+    super.setUpTempDir();
+    admin = "hive";
+    connection = hiveServer2.createConnection(admin, admin);
+    statement = connection.createStatement();
+    statement.execute("create role admin_role");
+    statement.execute("grant role admin_role to group hive");
+    statement.execute("grant all on server server1 to role admin_role");
+  }
+
+  @Test
+  public void BasicSanity() throws Exception {
+    long latestSnapshotId = 0;
+    //Sleep for a sec allowing HMSFollower to create a snapshot
+    Thread.sleep(1000);
+    dbNames = new String[]{DB1};
+    roles = new String[]{"admin_role", "all_db1", "all_tbl1", "all_tbl2"};
+    do {
+      //Sleep for a sec allowing HMSFollower to create a snapshot
+      Thread.sleep(1000);
+      latestSnapshotId = sentryServer.get(0).getCurrentAuthzPathsSnapshotID();
+    } while (latestSnapshotId == 0);
+
+    statement.execute("CREATE DATABASE " + DB1);
+    statement.execute("CREATE DATABASE " + DB2);
+    statement.execute("create table " + DB1 + "." + tableName1
+            + " (under_col int comment 'the under column', value string)");
+    statement.execute("create table " + DB1 + "." + tableName2
+            + " (under_col int comment 'the under column', value string)");
+
+    Thread.sleep(5000);
+
+    statement.execute("create table " + DB2 + "." + tableName3
+            + " (under_col int comment 'the under column', value string)");
+    statement.execute("create table " + DB2 + "." + tableName4
+            + " (under_col int comment 'the under column', value string)");
+
+    assertEquals("Another snapshot is created, Snapshot ID: ", latestSnapshotId, sentryServer.get(0).getCurrentAuthzPathsSnapshotID());
+  }
+}


Mime
View raw message