sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ak...@apache.org
Subject sentry git commit: SENTRY-1669: HMSFollower should read current processed notification ID from database every time it runs (Kalyan Kalvagadda, reviewed by Sergio Pena and Alex Kolbasov)
Date Fri, 19 May 2017 06:15:31 GMT
Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 5a2322338 -> 81eed11a9


SENTRY-1669: HMSFollower should read current processed notification ID from database every time it runs (Kalyan Kalvagadda, reviewed by Sergio Pena and Alex Kolbasov)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/81eed11a
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/81eed11a
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/81eed11a

Branch: refs/heads/sentry-ha-redesign
Commit: 81eed11a91139cd9f4e0582e8f2daa3d839df61b
Parents: 5a23223
Author: Alexander Kolbasov <akolb@cloudera.com>
Authored: Thu May 18 23:15:08 2017 -0700
Committer: Alexander Kolbasov <akolb@cloudera.com>
Committed: Thu May 18 23:15:08 2017 -0700

----------------------------------------------------------------------
 .../service/model/MSentryHmsNotification.java   |  81 ++++
 .../db/service/model/MSentryPathChange.java     |   1 +
 .../db/service/model/MSentryPermChange.java     |   1 +
 .../provider/db/service/model/package.jdo       |   9 +-
 .../persistent/DeltaTransactionBlock.java       |  10 +-
 .../db/service/persistent/SentryStore.java      |  41 +-
 .../sentry/service/thrift/HMSFollower.java      | 438 +++++++++++--------
 .../service/thrift/NotificationProcessor.java   |   8 +-
 .../db/service/persistent/TestSentryStore.java  |  81 +++-
 9 files changed, 457 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
new file mode 100644
index 0000000..0d54548
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.db.service.model;
+
+/**
+ * Database backend store for HMS Notification ID's. All the notifications that are processed
+ * by sentry are stored.
+ */
+
+/*
+ * <p> HMS notification ID's are stored in separate table for three reasons</p>
+ * <ol>
+ * <li>SENTRY_PATH_CHANGE is not updated for every notification that is received from HMS. There
+ * are cases where HMSFollower doesn't process notifications and skip's them. Depending on
+ * SENTRY_PATH_CHANGE information may not provide the last notification processed.</li>
+ * <li> There could be cases where HMSFollower thread in multiple sentry servers acting as a
+ * leader and process HMS notifications. we need to avoid processing the notifications
+ * multiple times. This can be made sure by always having some number of notification
+ * information always regardless of purging interval.</li>
+ * <li>SENTRY_PATH_CHANGE information stored can typically be removed once namenode plug-in
+ * has processed the update.</li>
+ * </ol>
+ * <p>
+ * As the purpose and usage of notification ID information is different from PATH update info,
+ * it locally makes sense to store notification ID separately.
+ * </p>
+ */
+public class MSentryHmsNotification {
+  private long notificationId;
+
+  public MSentryHmsNotification(long notificationId) {
+    this.notificationId = notificationId;
+  }
+
+  public long getId() {
+    return notificationId;
+  }
+
+  public void setId(long notificationId) {
+    this.notificationId = notificationId;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) notificationId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (obj == null) {
+      return false;
+    }
+
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+
+    MSentryHmsNotification other = (MSentryHmsNotification) obj;
+
+    return (notificationId == other.notificationId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
index 42f80aa..d11f37f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
@@ -60,6 +60,7 @@ import javax.jdo.annotations.PrimaryKey;
 public class MSentryPathChange implements MSentryChange {
 
   @PrimaryKey
+  //This value is auto incremented by JDO
   private long changeID;
 
   // Path change in JSON format.

http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
index 8d9528f..1cb1a1f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
@@ -58,6 +58,7 @@ import javax.jdo.annotations.PrimaryKey;
 public class MSentryPermChange implements MSentryChange {
 
   @PrimaryKey
+  //This value is auto incremented by JDO
   private long changeID;
 
   // Permission change in JSON format.

http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo
index 8fd5278..96ab462 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo
@@ -297,7 +297,10 @@
          <column name="CREATE_TIME_MS" jdbc-type="BIGINT"/>
        </field>
      </class>
-
+     <class name="MSentryHmsNotification" table="SENTRY_HMS_NOTIFICATION_ID" identity-type="application" detachable="true">
+        <field name="notificationId" primary-key="true">
+          <column name="NOTIFICATION_ID" jdbc-type="BIGINT" allows-null="false"/>
+        </field>
+     </class>
   </package>
-</jdo>
-
+</jdo>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
index 709d195..77282da 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.sentry.core.common.exception.SentryInvalidInputException;
 import org.apache.sentry.hdfs.PathsUpdate;
 import org.apache.sentry.hdfs.PermissionsUpdate;
+import org.apache.sentry.provider.db.service.model.MSentryHmsNotification;
 import org.apache.sentry.provider.db.service.model.MSentryPathChange;
 import org.apache.sentry.provider.db.service.model.MSentryPermChange;
 import static org.apache.sentry.hdfs.Updateable.Update;
@@ -84,12 +85,15 @@ public class DeltaTransactionBlock implements TransactionBlock<Object> {
     // changeID is trying to be persisted twice, the transaction would
     // fail.
     if (update instanceof PermissionsUpdate) {
-      pm.makePersistent(new MSentryPermChange((PermissionsUpdate)update));
+      pm.makePersistent(new MSentryPermChange((PermissionsUpdate) update));
     } else if (update instanceof PathsUpdate) {
-      pm.makePersistent(new MSentryPathChange((PathsUpdate)update));
+      pm.makePersistent(new MSentryPathChange((PathsUpdate) update));
+      // Notification id from PATH_UPDATE entry is made persistent in
+      // SENTRY_LAST_NOTIFICATION_ID table.
+      pm.makePersistent(new MSentryHmsNotification(update.getSeqNum()));
     } else {
       throw new SentryInvalidInputException("Update should be type of either " +
-          "PermissionsUpdate or PathsUpdate.\n");
+        "PermissionsUpdate or PathsUpdate.\n");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 29e3686..7756c4a 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -56,6 +56,7 @@ import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType;
 import org.apache.sentry.provider.db.service.model.MAuthzPathsMapping;
 import org.apache.sentry.provider.db.service.model.MSentryChange;
 import org.apache.sentry.provider.db.service.model.MSentryGroup;
+import org.apache.sentry.provider.db.service.model.MSentryHmsNotification;
 import org.apache.sentry.provider.db.service.model.MSentryPathChange;
 import org.apache.sentry.provider.db.service.model.MSentryPermChange;
 import org.apache.sentry.provider.db.service.model.MSentryPrivilege;
@@ -122,6 +123,8 @@ public class SentryStore {
 
   public static final long EMPTY_CHANGE_ID = 0L;
 
+  public static final long EMPTY_NOTIFICATION_ID = 0L;
+
   // For counters, representation of the "unknown value"
   private static final long COUNT_VALUE_UNKNOWN = -1L;
 
@@ -3436,6 +3439,35 @@ public class SentryStore {
   }
 
   /**
+   * Gets the last processed Notification ID
+   * <p>
+   * As the table might have zero or one record, result of the query
+   * might be null OR instance of MSentryHmsNotification.
+   *
+   * @param pm the PersistenceManager
+   * @return EMPTY_NOTIFICATION_ID(0) when there are no notifications processed.
+   * else  last NotificationID processed by HMSFollower
+   */
+  static Long getLastProcessedNotificationIDCore(
+      PersistenceManager pm) {
+    Query query = pm.newQuery(MSentryHmsNotification.class);
+    query.setResult("max(notificationId)");
+    Long notificationId = (Long) query.execute();
+    return notificationId == null ? EMPTY_NOTIFICATION_ID : notificationId;
+  }
+
+  /**
+   * Set the notification ID of last processed HMS notification.
+   */
+  public void persistLastProcessedNotificationID(final Long notificationId) throws Exception {
+    tm.executeTransaction(
+      new TransactionBlock<Object>() {
+        public Object execute(PersistenceManager pm) throws Exception {
+          return pm.makePersistent(new MSentryHmsNotification(notificationId));
+        }
+      });
+  }
+  /**
    * Gets the last processed change ID for perm delta changes.
    *
    * Internally invoke {@link #getLastProcessedChangeIDCore(PersistenceManager, Class)}
@@ -3477,14 +3509,7 @@ public class SentryStore {
     return tm.executeTransaction(
     new TransactionBlock<Long>() {
       public Long execute(PersistenceManager pm) throws Exception {
-        pm.setDetachAllOnCommit(false); // No need to detach objects
-        long changeID = getLastProcessedChangeIDCore(pm, MSentryPathChange.class);
-        if (changeID == EMPTY_CHANGE_ID) {
-          return EMPTY_CHANGE_ID;
-        } else {
-          MSentryPathChange mSentryPathChange = getMSentryPathChangeByID(changeID);
-          return mSentryPathChange.getNotificationID();
-        }
+        return getLastProcessedNotificationIDCore(pm);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 72e9d72..59eda52 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -47,7 +47,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.sentry.binding.metastore.messaging.json.*;
 
+import javax.jdo.JDODataStoreException;
 import javax.security.auth.login.LoginException;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.SocketException;
@@ -71,7 +73,6 @@ import static org.apache.sentry.hdfs.Updateable.Update;
 @SuppressWarnings("PMD")
 public class HMSFollower implements Runnable, AutoCloseable {
   private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
-  private long currentEventID;
   // Track the latest eventId of the event that has been logged. So we don't log the same message
   private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID;
   private static boolean connectedToHMS = false;
@@ -86,16 +87,24 @@ public class HMSFollower implements Runnable, AutoCloseable {
   private boolean needLogHMSSupportReady = true;
   private final LeaderStatusMonitor leaderMonitor;
 
-  HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor) throws Exception {
+  HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor) {
     LOGGER.info("HMSFollower is being initialized");
+    Long lastProcessedNotificationID;
     authzConf = conf;
     this.leaderMonitor = leaderMonitor;
     sentryStore = store;
 
-    // Initialize currentEventID based on the latest persisted notification ID.
-    // If currentEventID is empty, need to retrieve a full hive snapshot,
-    currentEventID = getLastProcessedNotificationID();
-    needHiveSnapshot = (currentEventID == SentryStore.EMPTY_CHANGE_ID);
+    try {
+      // Initializing lastProcessedNotificationID based on the latest persisted notification ID.
+      lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID();
+    } catch (Exception e) {
+      LOGGER.error("Failed to get the last processed notification id from sentry store, " +
+        "Skipping the processing", e);
+      needHiveSnapshot = true;
+      return;
+    }
+    // If lastProcessedNotificationID is empty, need to retrieve a full hive snapshot,
+    needHiveSnapshot = (lastProcessedNotificationID == SentryStore.EMPTY_CHANGE_ID);
   }
 
   @VisibleForTesting
@@ -123,8 +132,8 @@ public class HMSFollower implements Runnable, AutoCloseable {
    * Throws @MetaException if there was a problem on creating an HMSClient
    */
   private HiveMetaStoreClient getMetaStoreClient(Configuration conf)
-      throws IOException, InterruptedException, LoginException, MetaException {
-    if(client != null) {
+    throws IOException, InterruptedException, LoginException, MetaException {
+    if (client != null) {
       return client;
     }
 
@@ -147,29 +156,29 @@ public class HMSFollower implements Runnable, AutoCloseable {
     //TODO: Is this the right(standard) way to create a HMS client? HiveMetastoreClientFactoryImpl?
     //TODO: Check if HMS is using kerberos instead of relying on Sentry conf
     kerberos = ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
-        conf.get(ServiceConstants.ServerConfig.SECURITY_MODE, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS).trim());
+      conf.get(ServiceConstants.ServerConfig.SECURITY_MODE, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS).trim());
     if (kerberos) {
       LOGGER.info("Making a kerberos connection to HMS");
       try {
         int port = conf.getInt(ServiceConstants.ServerConfig.RPC_PORT, ServiceConstants.ServerConfig.RPC_PORT_DEFAULT);
         String rawPrincipal = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.PRINCIPAL),
-            ServiceConstants.ServerConfig.PRINCIPAL + " is required");
+          ServiceConstants.ServerConfig.PRINCIPAL + " is required");
         principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr(
-            conf.get(ServiceConstants.ServerConfig.RPC_ADDRESS, ServiceConstants.ServerConfig.RPC_ADDRESS_DEFAULT), port).getAddress());
-      } catch(IOException io) {
+          conf.get(ServiceConstants.ServerConfig.RPC_ADDRESS, ServiceConstants.ServerConfig.RPC_ADDRESS_DEFAULT), port).getAddress());
+      } catch (IOException io) {
         throw new RuntimeException("Can't translate kerberos principal'", io);
       }
 
       LOGGER.info("Using kerberos principal: " + principal);
       final String[] principalParts = SaslRpcServer.splitKerberosName(principal);
       Preconditions.checkArgument(principalParts.length == 3,
-          "Kerberos principal should have 3 parts: " + principal);
+        "Kerberos principal should have 3 parts: " + principal);
 
       keytab = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.KEY_TAB),
-              ServiceConstants.ServerConfig.KEY_TAB + " is required");
+        ServiceConstants.ServerConfig.KEY_TAB + " is required");
       File keytabFile = new File(keytab);
       Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
-          "Keytab " + keytab + " does not exist or is not readable.");
+        "Keytab " + keytab + " does not exist or is not readable.");
 
       try {
         // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal.
@@ -208,21 +217,24 @@ public class HMSFollower implements Runnable, AutoCloseable {
 
   @Override
   public void run() {
-    // Wake any clients connected to this service waiting for HMS already processed notifications.
+    Long lastProcessedNotificationID;
     try {
-      wakeUpWaitingClientsForSync(getLastProcessedNotificationID());
+      // Initializing lastProcessedNotificationID based on the latest persisted notification ID.
+      lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID();
     } catch (Exception e) {
-      LOGGER.error("Couldn't wake up HMS waiters because an error attempting to get the latest notification ID.", e);
+      LOGGER.error("Failed to get the last processed notification id from sentry store, " +
+        "Skipping the processing", e);
+      return;
     }
-
+    // Wake any clients connected to this service waiting for HMS already processed notifications.
+    wakeUpWaitingClientsForSync(lastProcessedNotificationID);
     // Only the leader should listen to HMS updates
     if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
       // Close any outstanding connections to HMS
       closeHMSConnection();
       return;
     }
-
-    processHiveMetastoreUpdates();
+    processHiveMetastoreUpdates(lastProcessedNotificationID);
   }
 
   /**
@@ -249,7 +261,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
    *
    * Clients connections waiting for an event notification will be woken up afterwards.
    */
-  private void processHiveMetastoreUpdates() {
+  private void processHiveMetastoreUpdates(Long lastProcessedNotificationID) {
     if (client == null) {
       try {
         client = getMetaStoreClient(authzConf);
@@ -301,18 +313,24 @@ public class HMSFollower implements Runnable, AutoCloseable {
 
         if (!eventIDBefore.equals(eventIDAfter)) {
           LOGGER.error("#### Fail to get a point-in-time hive full snapshot !! Current NotificationID = " +
-              eventIDAfter.toString());
+            eventIDAfter.toString());
           return;
         }
 
         LOGGER.info(String.format("Successfully fetched hive full snapshot, Current NotificationID = %s.",
-            eventIDAfter));
-        needHiveSnapshot = false;
-        currentEventID = eventIDAfter.getEventId();
+          eventIDAfter));
+        // As eventIDAfter is the last event that was processed, eventIDAfter is used to update
+        // lastProcessedNotificationID instead of getting it from persistent store.
+        lastProcessedNotificationID = eventIDAfter.getEventId();
         sentryStore.persistFullPathsImage(pathsFullSnapshot);
-
+        needHiveSnapshot = false;
+        sentryStore.persistLastProcessedNotificationID(eventIDAfter.getEventId());
         // Wake up any HMS waiters that could have been put on hold before getting the eventIDBefore value.
-        wakeUpWaitingClientsForSync(currentEventID);
+        wakeUpWaitingClientsForSync(lastProcessedNotificationID);
+      } else {
+        // Every time HMSFollower is scheduled to run, value should be updates based
+        // on the value stored in database.
+        lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID();
       }
 
       // HMSFollower connected to HMS and it finished full snapshot if that was required
@@ -326,15 +344,16 @@ public class HMSFollower implements Runnable, AutoCloseable {
       // NotificationEventResponse causing TProtocolException.
       // Workaround: Only processes the notification events newer than the last updated one.
       CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
-      if (eventId.getEventId() > currentEventID) {
-        NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null);
+      if (eventId.getEventId() > lastProcessedNotificationID) {
+        NotificationEventResponse response =
+          client.getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null);
         if (response.isSetEvents()) {
           if (!response.getEvents().isEmpty()) {
-            if (currentEventID != lastLoggedEventId) {
+            if (lastProcessedNotificationID != lastLoggedEventId) {
               // Only log when there are updates and the notification ID has changed.
-              LOGGER.debug(String.format("CurrentEventID = %s. Processing %s events",
-                    currentEventID, response.getEvents().size()));
-              lastLoggedEventId = currentEventID;
+              LOGGER.debug(String.format("lastProcessedNotificationID = %s. Processing %s events",
+                lastProcessedNotificationID, response.getEvents().size()));
+              lastLoggedEventId = lastProcessedNotificationID;
             }
 
             processNotificationEvents(response.getEvents());
@@ -349,13 +368,13 @@ public class HMSFollower implements Runnable, AutoCloseable {
       } else {
         LOGGER.error("ThriftException occured fetching Notification entries, will try", e);
       }
-    } catch (SentryInvalidInputException |SentryInvalidHMSEventException e) {
+    } catch (SentryInvalidInputException | SentryInvalidHMSEventException e) {
       LOGGER.error("Encounter SentryInvalidInputException|SentryInvalidHMSEventException " +
-                   "while processing notification log", e);
+        "while processing notification log", e);
     } catch (Throwable t) {
       // catching errors to prevent the executor to halt.
       LOGGER.error("Caught unexpected exception in HMSFollower! Caused by: " + t.getMessage(),
-            t.getCause());
+        t.getCause());
       t.printStackTrace();
     }
   }
@@ -383,6 +402,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
 
   /**
    * Retrieve a Hive full snapshot from HMS.
+   *
    * @return HMS snapshot. Snapshot consists of a mapping from auth object name
    * to the set of paths corresponding to that name.
    * @throws InterruptedException
@@ -390,7 +410,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
    * @throws ExecutionException
    */
   private Map<String, Set<String>> fetchFullUpdate()
-          throws InterruptedException, TException, ExecutionException {
+    throws InterruptedException, TException, ExecutionException {
     LOGGER.info("Request full HMS snapshot");
     try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf)) {
       Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
@@ -399,16 +419,6 @@ public class HMSFollower implements Runnable, AutoCloseable {
     }
   }
 
-  /**
-   * Get the last processed eventID from Sentry DB.
-   *
-   * @return the stored currentID
-   * @throws Exception
-   */
-  private long getLastProcessedNotificationID() throws Exception {
-    return sentryStore.getLastProcessedNotificationID();
-  }
-
   private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) {
     return "true"
         .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), syncConfVar.getDefault())));
@@ -420,6 +430,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
   void processNotificationEvents(List<NotificationEvent> events) throws Exception {
     SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer();
 
+    boolean isNotificationProcessingSkipped = false;
     for (NotificationEvent event : events) {
       String dbName;
       String tableName;
@@ -428,172 +439,221 @@ public class HMSFollower implements Runnable, AutoCloseable {
       String location;
       List<String> locations;
       NotificationProcessor notificationProcessor = new NotificationProcessor(sentryStore, LOGGER);
-      switch (HCatEventMessage.EventType.valueOf(event.getEventType())) {
-        case CREATE_DATABASE:
-          SentryJSONCreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(event.getMessage());
-          dbName = message.getDB();
-          location = message.getLocation();
-          if ((dbName == null) || (location == null)) {
-            throw new SentryInvalidHMSEventException(String.format("Create database event " +
-                "has incomplete information. dbName = %s location = %s",
+      try {
+        switch (HCatEventMessage.EventType.valueOf(event.getEventType())) {
+          case CREATE_DATABASE:
+            SentryJSONCreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(event.getMessage());
+            dbName = message.getDB();
+            location = message.getLocation();
+            if ((dbName == null) || (location == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Create database event " +
+                  "has incomplete information. dbName = %s location = %s",
                 StringUtils.defaultIfBlank(dbName, "null"),
                 StringUtils.defaultIfBlank(location, "null")));
-          }
-          if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
-            dropSentryDbPrivileges(dbName, event);
-          }
-          notificationProcessor.processCreateDatabase(dbName,location, event.getEventId());
-          break;
-        case DROP_DATABASE:
-          SentryJSONDropDatabaseMessage dropDatabaseMessage =
+              break;
+            }
+            if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
+              dropSentryDbPrivileges(dbName, event);
+            }
+            notificationProcessor.processCreateDatabase(dbName, location, event.getEventId());
+            break;
+          case DROP_DATABASE:
+            SentryJSONDropDatabaseMessage dropDatabaseMessage =
               deserializer.getDropDatabaseMessage(event.getMessage());
-          dbName = dropDatabaseMessage.getDB();
-          location = dropDatabaseMessage.getLocation();
-          if (dbName == null) {
-            throw new SentryInvalidHMSEventException(
-                    "Drop database event has incomplete information: dbName = null");
-          }
-          if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
-            dropSentryDbPrivileges(dbName, event);
-          }
-          notificationProcessor.processDropDatabase(dbName, location, event.getEventId());
-          break;
-        case CREATE_TABLE:
-          SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage());
-          dbName = createTableMessage.getDB();
-          tableName = createTableMessage.getTable();
-          location = createTableMessage.getLocation();
-          if ((dbName == null) || (tableName == null) || (location == null)) {
-            throw new SentryInvalidHMSEventException(String.format("Create table event " +
-                "has incomplete information. dbName = %s, tableName = %s, location = %s",
+            dbName = dropDatabaseMessage.getDB();
+            location = dropDatabaseMessage.getLocation();
+            if (dbName == null) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error("Drop database event has incomplete information: dbName = null");
+              break;
+            }
+            if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
+              dropSentryDbPrivileges(dbName, event);
+            }
+            notificationProcessor.processDropDatabase(dbName, location, event.getEventId());
+            break;
+          case CREATE_TABLE:
+            SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage());
+            dbName = createTableMessage.getDB();
+            tableName = createTableMessage.getTable();
+            location = createTableMessage.getLocation();
+            if ((dbName == null) || (tableName == null) || (location == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Create table event " + "has incomplete information."
+                  + " dbName = %s, tableName = %s, location = %s",
                 StringUtils.defaultIfBlank(dbName, "null"),
                 StringUtils.defaultIfBlank(tableName, "null"),
                 StringUtils.defaultIfBlank(location, "null")));
-          }
-          if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
-            dropSentryTablePrivileges(dbName, tableName, event);
-          }
-          notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId());
-          break;
-        case DROP_TABLE:
-          SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage());
-          dbName = dropTableMessage.getDB();
-          tableName = dropTableMessage.getTable();
-          if ((dbName == null) || (tableName == null)) {
-            throw new SentryInvalidHMSEventException(String.format("Drop table event " +
-                "has incomplete information. dbName = %s, tableName = %s",
+              break;
+            }
+            if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
+              dropSentryTablePrivileges(dbName, tableName, event);
+            }
+            notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId());
+            break;
+          case DROP_TABLE:
+            SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage());
+            dbName = dropTableMessage.getDB();
+            tableName = dropTableMessage.getTable();
+            if ((dbName == null) || (tableName == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Drop table event " +
+                  "has incomplete information. dbName = %s, tableName = %s",
                 StringUtils.defaultIfBlank(dbName, "null"),
                 StringUtils.defaultIfBlank(tableName, "null")));
-          }
-          if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
-            dropSentryTablePrivileges(dbName, tableName, event);
-          }
-          notificationProcessor.processDropTable(dbName, tableName, event.getEventId());
-          break;
-        case ALTER_TABLE:
-          SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage());
-
-          String oldDbName = alterTableMessage.getDB();
-          String oldTableName = alterTableMessage.getTable();
-          String newDbName = event.getDbName();
-          String newTableName = event.getTableName();
-          oldLocation = alterTableMessage.getOldLocation();
-          newLocation = alterTableMessage.getNewLocation();
-
-          if ((oldDbName == null) ||
+              break;
+            }
+            if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
+              dropSentryTablePrivileges(dbName, tableName, event);
+            }
+            notificationProcessor.processDropTable(dbName, tableName, event.getEventId());
+            break;
+          case ALTER_TABLE:
+            SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage());
+
+            String oldDbName = alterTableMessage.getDB();
+            String oldTableName = alterTableMessage.getTable();
+            String newDbName = event.getDbName();
+            String newTableName = event.getTableName();
+            oldLocation = alterTableMessage.getOldLocation();
+            newLocation = alterTableMessage.getNewLocation();
+
+            if ((oldDbName == null) ||
               (oldTableName == null) ||
               (newDbName == null) ||
               (newTableName == null) ||
               (oldLocation == null) ||
               (newLocation == null)) {
-            throw new SentryInvalidHMSEventException(String.format("Alter table event " +
-                "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " +
-                "newDbName = %s, newTableName = %s, newLocation = %s",
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Alter table event " +
+                  "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " +
+                  "newDbName = %s, newTableName = %s, newLocation = %s",
                 StringUtils.defaultIfBlank(oldDbName, "null"),
                 StringUtils.defaultIfBlank(oldTableName, "null"),
                 StringUtils.defaultIfBlank(oldLocation, "null"),
                 StringUtils.defaultIfBlank(newDbName, "null"),
                 StringUtils.defaultIfBlank(newTableName, "null"),
                 StringUtils.defaultIfBlank(newLocation, "null")));
-          }
+              break;
+            } else if ((oldDbName == newDbName) &&
+              (oldTableName == newTableName) &&
+              (oldLocation == newLocation)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.info(String.format("Alter table notification ignored as neither name nor " +
+                "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " +
+                "newLocation = %s", oldDbName + "." + oldTableName , oldLocation,
+                newDbName + "." + newTableName, newLocation));
+              break;
+            }
 
-          if(!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) {
-            // Name has changed
-            try {
-              renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
-            } catch (SentryNoSuchObjectException e) {
-              LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s",
+            if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) {
+              // Name has changed
+              try {
+                renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
+              } catch (SentryNoSuchObjectException e) {
+                LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s",
                   oldDbName, oldTableName);
-            } catch (Exception e) {
-              throw new SentryInvalidInputException("Could not process Alter table event. Event: " + event.toString(), e);
+              } catch (Exception e) {
+                isNotificationProcessingSkipped = true;
+                LOGGER.info("Could not process Alter table event. Event: " + event.toString(), e);
+                break;
+              }
             }
-          }
-          notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName,
+            notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName,
               newTableName, oldLocation, newLocation, event.getEventId());
-          break;
-        case ADD_PARTITION:
-          SentryJSONAddPartitionMessage addPartitionMessage =
-                deserializer.getAddPartitionMessage(event.getMessage());
-          dbName = addPartitionMessage.getDB();
-          tableName = addPartitionMessage.getTable();
-          locations = addPartitionMessage.getLocations();
-          if ((dbName == null) || (tableName == null) || (locations == null)) {
-            LOGGER.error(String.format("Create table event has incomplete information. " +
-                "dbName = %s, tableName = %s, locations = %s",
+            break;
+          case ADD_PARTITION:
+            SentryJSONAddPartitionMessage addPartitionMessage =
+              deserializer.getAddPartitionMessage(event.getMessage());
+            dbName = addPartitionMessage.getDB();
+            tableName = addPartitionMessage.getTable();
+            locations = addPartitionMessage.getLocations();
+            if ((dbName == null) || (tableName == null) || (locations == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Create table event has incomplete information. " +
+                  "dbName = %s, tableName = %s, locations = %s",
                 StringUtils.defaultIfBlank(dbName, "null"),
                 StringUtils.defaultIfBlank(tableName, "null"),
                 locations != null ? locations.toString() : "null"));
-          }
-          notificationProcessor.processAddPartition(dbName, tableName, locations,
-              event.getEventId());
-          break;
-        case DROP_PARTITION:
-          SentryJSONDropPartitionMessage dropPartitionMessage =
-                deserializer.getDropPartitionMessage(event.getMessage());
-          dbName = dropPartitionMessage.getDB();
-          tableName = dropPartitionMessage.getTable();
-          locations = dropPartitionMessage.getLocations();
-          if ((dbName == null) || (tableName == null) || (locations == null)) {
-            throw new SentryInvalidHMSEventException(String.format("Drop partition event " +
-                "has incomplete information. dbName = %s, tableName = %s, location = %s",
+              break;
+            }
+            notificationProcessor.processAddPartition(dbName, tableName, locations, event.getEventId());
+            break;
+          case DROP_PARTITION:
+            SentryJSONDropPartitionMessage dropPartitionMessage =
+              deserializer.getDropPartitionMessage(event.getMessage());
+            dbName = dropPartitionMessage.getDB();
+            tableName = dropPartitionMessage.getTable();
+            locations = dropPartitionMessage.getLocations();
+            if ((dbName == null) || (tableName == null) || (locations == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Drop partition event " +
+                  "has incomplete information. dbName = %s, tableName = %s, location = %s",
                 StringUtils.defaultIfBlank(dbName, "null"),
                 StringUtils.defaultIfBlank(tableName, "null"),
                 locations != null ? locations.toString() : "null"));
-          }
-          notificationProcessor.processDropPartition(dbName, tableName, locations,
-              event.getEventId());
-          break;
-      case ALTER_PARTITION:
-        SentryJSONAlterPartitionMessage alterPartitionMessage =
+              break;
+            }
+            notificationProcessor.processDropPartition(dbName, tableName, locations, event.getEventId());
+
+            break;
+          case ALTER_PARTITION:
+            SentryJSONAlterPartitionMessage alterPartitionMessage =
               deserializer.getAlterPartitionMessage(event.getMessage());
-        dbName = alterPartitionMessage.getDB();
-        tableName = alterPartitionMessage.getTable();
-        oldLocation = alterPartitionMessage.getOldLocation();
-        newLocation = alterPartitionMessage.getNewLocation();
-
-        if ((dbName == null) ||
-            (tableName == null) ||
-            (oldLocation == null) ||
-            (newLocation == null)) {
-          throw new SentryInvalidHMSEventException(String.format("Alter partition event " +
-              "has incomplete information. dbName = %s, tableName = %s, " +
-              "oldLocation = %s, newLocation = %s",
-              StringUtils.defaultIfBlank(dbName, "null"),
-              StringUtils.defaultIfBlank(tableName, "null"),
-              StringUtils.defaultIfBlank(oldLocation, "null"),
-              StringUtils.defaultIfBlank(newLocation, "null")));
-        }
+            dbName = alterPartitionMessage.getDB();
+            tableName = alterPartitionMessage.getTable();
+            oldLocation = alterPartitionMessage.getOldLocation();
+            newLocation = alterPartitionMessage.getNewLocation();
+
+            if ((dbName == null) ||
+              (tableName == null) ||
+              (oldLocation == null) ||
+              (newLocation == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Alter partition event " +
+                  "has incomplete information. dbName = %s, tableName = %s, " +
+                  "oldLocation = %s, newLocation = %s",
+                StringUtils.defaultIfBlank(dbName, "null"),
+                StringUtils.defaultIfBlank(tableName, "null"),
+                StringUtils.defaultIfBlank(oldLocation, "null"),
+                StringUtils.defaultIfBlank(newLocation, "null")));
+              break;
+            } else if (oldLocation == newLocation) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.info(String.format("Alter partition notification ignored as" +
+                "location has not changed: AuthzObj = %s, Location = %s", dbName + "." +
+                "." + tableName, oldLocation));
+              break;
+            }
 
-        notificationProcessor.processAlterPartition(dbName, tableName, oldLocation,
-            newLocation, event.getEventId());
-        break;
-        case INSERT:
-          // TODO DO we need to do anything here?
-          break;
+            notificationProcessor.processAlterPartition(dbName, tableName, oldLocation,
+              newLocation, event.getEventId());
+            break;
+          case INSERT:
+            // TODO DO we need to do anything here?
+            break;
+        }
+      } catch (Exception e) {
+        if (e.getCause() instanceof JDODataStoreException) {
+          LOGGER.info("Received JDO Storage Exception, Could be because of processing " +
+            "duplicate notification");
+          if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) {
+            // Rest of the notifications need not be processed.
+            throw e;
+          }
+        }
+        sentryStore.persistLastProcessedNotificationID(event.getEventId());
+      }
+      if (isNotificationProcessingSkipped) {
+        // Update the notification ID in the persistent store even when the notification is
+        // not processed as the content in in the notification is not valid.
+        // Continue processing the next notification.
+        sentryStore.persistLastProcessedNotificationID(event.getEventId());
+        isNotificationProcessingSkipped = false;
       }
-      currentEventID = event.getEventId();
       // Wake up any HMS waiters that are waiting for this ID.
-      wakeUpWaitingClientsForSync(currentEventID);
+      wakeUpWaitingClientsForSync(event.getEventId());
     }
   }
 
@@ -606,7 +666,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
       LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName);
     } catch (Exception e) {
       throw new SentryInvalidInputException("Could not process Drop database event." +
-          "Event: " + event.toString(), e);
+        "Event: " + event.toString(), e);
     }
   }
 
@@ -624,7 +684,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
   }
 
   private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, String newTableName) throws
-      Exception {
+    Exception {
     TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(hiveInstance);
     oldAuthorizable.setDb(oldDbName);
     oldAuthorizable.setTable(oldTableName);
@@ -632,7 +692,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
     newAuthorizable.setDb(newDbName);
     newAuthorizable.setTable(newTableName);
     Update update =
-        onRenameSentryPrivilege(oldAuthorizable, newAuthorizable);
+      onRenameSentryPrivilege(oldAuthorizable, newAuthorizable);
     sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
   }
 
@@ -646,8 +706,8 @@ public class HMSFollower implements Runnable, AutoCloseable {
 
   @VisibleForTesting
   static Update onRenameSentryPrivilege(TSentryAuthorizable oldAuthorizable,
-            TSentryAuthorizable newAuthorizable)
-          throws SentryPolicyStorePlugin.SentryPluginException {
+                                        TSentryAuthorizable newAuthorizable)
+    throws SentryPolicyStorePlugin.SentryPluginException {
     String oldAuthz = getAuthzObj(oldAuthorizable);
     String newAuthz = getAuthzObj(newAuthorizable);
     PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);

http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
index 9f4cfe8..de8e2f7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
@@ -32,6 +32,10 @@ import java.util.List;
  * NotificationProcessor processes various notification events generated from
  * the Hive MetaStore state change, and applies these changes on the complete
  * HMS Paths snapshot or delta update stored in Sentry using SentryStore.
+ * <p>
+ * NotificationProcessor should not skip processing notification events for any reason.
+ * If some notification events are to be skipped, appropriate logic should be added in
+ * HMSFollower before invoking NotificationProcessor.
  */
 class NotificationProcessor {
 
@@ -354,10 +358,6 @@ class NotificationProcessor {
               Collections.singleton(newPathTree),
               update);
       }
-    } else {
-      LOGGER.info(String.format("Alter table notification ignored as neither name nor " +
-          "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " +
-          "newLocation = %s", oldAuthzObj, oldLocation, newAuthzObj, newLocation));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index e7443eb..440b0e9 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -75,6 +75,8 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.sentry.provider.db.service.persistent.QueryParamBuilder.newQueryParamBuilder;
 
+import javax.jdo.JDODataStoreException;
+
 public class TestSentryStore extends org.junit.Assert {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TestSentryStore.class);
@@ -2460,11 +2462,13 @@ public class TestSentryStore extends org.junit.Assert {
   @Test
   public void testAddDeleteAuthzPathsMapping() throws Exception {
     // Add "db1.table1" authzObj
-    PathsUpdate addUpdate = new PathsUpdate(0, false);
+    Long lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    PathsUpdate addUpdate = new PathsUpdate(1, false);
     addUpdate.newPathChange("db1.table").
           addToAddPaths(Arrays.asList("db1", "tbl1"));
     addUpdate.newPathChange("db1.table").
           addToAddPaths(Arrays.asList("db1", "tbl2"));
+
     sentryStore.addAuthzPathsMapping("db1.table",
           Sets.newHashSet("db1/tbl1", "db1/tbl2"), addUpdate);
     PathsImage pathsImage = sentryStore.retrieveFullPathsImage();
@@ -2477,9 +2481,11 @@ public class TestSentryStore extends org.junit.Assert {
     long lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange addPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(addUpdate.JSONSerialize(), addPathChange.getPathChange());
+    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    assertEquals(1, lastNotificationId.longValue());
 
     // Delete path 'db1.db/tbl1' from "db1.table1" authzObj.
-    PathsUpdate delUpdate = new PathsUpdate(1, false);
+    PathsUpdate delUpdate = new PathsUpdate(2, false);
     delUpdate.newPathChange("db1.table")
           .addToDelPaths(Arrays.asList("db1", "tbl1"));
     sentryStore.deleteAuthzPathsMapping("db1.table", Sets.newHashSet("db1/tbl1"), delUpdate);
@@ -2492,9 +2498,11 @@ public class TestSentryStore extends org.junit.Assert {
     lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange delPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(delUpdate.JSONSerialize(), delPathChange.getPathChange());
+    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    assertEquals(2, lastNotificationId.longValue());
 
     // Delete "db1.table" authzObj from the authzObj -> [Paths] mapping.
-    PathsUpdate delAllupdate = new PathsUpdate(2, false);
+    PathsUpdate delAllupdate = new PathsUpdate(3, false);
     delAllupdate.newPathChange("db1.table")
         .addToDelPaths(Lists.newArrayList(PathsUpdate.ALL_PATHS));
     sentryStore.deleteAllAuthzPathsMapping("db1.table", delAllupdate);
@@ -2506,11 +2514,16 @@ public class TestSentryStore extends org.junit.Assert {
     lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange delAllPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(delAllupdate.JSONSerialize(), delAllPathChange.getPathChange());
+
+    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    assertEquals(3, lastNotificationId.longValue());
+
   }
 
   @Test
   public void testRenameUpdateAuthzPathsMapping() throws Exception {
     Map<String, Set<String>> authzPaths = new HashMap<>();
+    Long lastNotificationId = sentryStore.getLastProcessedNotificationID();
     authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1",
                                                 "user/hive/warehouse/db1.db/table1/p1"));
     authzPaths.put("db1.table2", Sets.newHashSet("user/hive/warehouse/db1.db/table2"));
@@ -2518,8 +2531,9 @@ public class TestSentryStore extends org.junit.Assert {
     Map<String, Set<String>> pathsImage = sentryStore.retrieveFullPathsImage().getPathImage();
     assertEquals(2, pathsImage.size());
 
+
     // Rename path of 'db1.table1' from 'db1.table1' to 'db1.newTable1'
-    PathsUpdate renameUpdate = new PathsUpdate(0, false);
+    PathsUpdate renameUpdate = new PathsUpdate(1, false);
     renameUpdate.newPathChange("db1.table1")
         .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "table1"));
     renameUpdate.newPathChange("db1.newTable1")
@@ -2538,9 +2552,10 @@ public class TestSentryStore extends org.junit.Assert {
     long lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange());
-
+    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    assertEquals(1, lastNotificationId.longValue());
     // Rename 'db1.table1' to "db1.table2" but did not change its location.
-    renameUpdate = new PathsUpdate(1, false);
+    renameUpdate = new PathsUpdate(2, false);
     renameUpdate.newPathChange("db1.newTable1")
         .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable1"));
     renameUpdate.newPathChange("db1.newTable2")
@@ -2553,6 +2568,8 @@ public class TestSentryStore extends org.junit.Assert {
     assertEquals(Sets.newHashSet("user/hive/warehouse/db1.db/table1/p1",
                                 "user/hive/warehouse/db1.db/newTable1"),
                   pathsImage.get("db1.newTable2"));
+    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    assertEquals(2, lastNotificationId.longValue());
 
     // Query the persisted path change and ensure it equals to the original one
     lastChangeID = sentryStore.getLastProcessedPathChangeID();
@@ -2581,6 +2598,8 @@ public class TestSentryStore extends org.junit.Assert {
     lastChangeID = sentryStore.getLastProcessedPathChangeID();
     MSentryPathChange updatePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
     assertEquals(update.JSONSerialize(), updatePathChange.getPathChange());
+    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    assertEquals(3, lastNotificationId.longValue());
   }
 
   @Test
@@ -2964,4 +2983,54 @@ public class TestSentryStore extends org.junit.Assert {
       prevId = changeId;
     }
   }
+
+  @Test
+  public void testDuplicateNotification() throws Exception {
+    Map<String, Set<String>> authzPaths = new HashMap<>();
+    Long lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1",
+      "user/hive/warehouse/db1.db/table1/p1"));
+    authzPaths.put("db1.table2", Sets.newHashSet("user/hive/warehouse/db1.db/table2"));
+    sentryStore.persistFullPathsImage(authzPaths);
+    Map<String, Set<String>> pathsImage = sentryStore.retrieveFullPathsImage().getPathImage();
+    assertEquals(2, pathsImage.size());
+
+    if (lastNotificationId == null) {
+      lastNotificationId = SentryStore.EMPTY_NOTIFICATION_ID;
+    }
+
+    // Rename path of 'db1.table1' from 'db1.table1' to 'db1.newTable1'
+    PathsUpdate renameUpdate = new PathsUpdate(1, false);
+    renameUpdate.newPathChange("db1.table1")
+      .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "table1"));
+    renameUpdate.newPathChange("db1.newTable1")
+      .addToAddPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable1"));
+    sentryStore.renameAuthzPathsMapping("db1.table1", "db1.newTable1",
+      "user/hive/warehouse/db1.db/table1", "user/hive/warehouse/db1.db/newTable1", renameUpdate);
+    pathsImage = sentryStore.retrieveFullPathsImage().getPathImage();
+    assertEquals(2, pathsImage.size());
+    assertEquals(3, sentryStore.getMPaths().size());
+    assertTrue(pathsImage.containsKey("db1.newTable1"));
+    assertEquals(Sets.newHashSet("user/hive/warehouse/db1.db/table1/p1",
+      "user/hive/warehouse/db1.db/newTable1"),
+      pathsImage.get("db1.newTable1"));
+
+    // Query the persisted path change and ensure it equals to the original one
+    long lastChangeID = sentryStore.getLastProcessedPathChangeID();
+    MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID);
+    assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange());
+    lastNotificationId = sentryStore.getLastProcessedNotificationID();
+    assertEquals(1, lastNotificationId.longValue());
+
+
+    // Process the notificaiton second time
+    try {
+      sentryStore.renameAuthzPathsMapping("db1.table1", "db1.newTable1",
+        "user/hive/warehouse/db1.db/table1", "user/hive/warehouse/db1.db/newTable1", renameUpdate);
+    } catch (Exception e) {
+      if (!(e.getCause() instanceof JDODataStoreException)) {
+        fail("Unexpected failure occured while processing duplicate notification");
+      }
+    }
+  }
 }


Mime
View raw message