sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ak...@apache.org
Subject sentry git commit: SENTRY-1643: AutoIncrement ChangeID of MSentryPermChange/MSentryPathChange may be error-prone (Lei Xu, reviewed by Hao Hao, Alex Kolbasov, Na Li)
Date Tue, 11 Apr 2017 21:17:43 GMT
Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 57ded0a9e -> a21a41971


SENTRY-1643: AutoIncrement ChangeID of MSentryPermChange/MSentryPathChange may be error-prone
(Lei Xu, reviewed by Hao Hao, Alex Kolbasov, Na Li)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a21a4197
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a21a4197
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a21a4197

Branch: refs/heads/sentry-ha-redesign
Commit: a21a41971c229a8f0c2379716bf928cd17fc429f
Parents: 57ded0a
Author: Alexander Kolbasov <akolb@cloudera.com>
Authored: Tue Apr 11 14:12:35 2017 -0700
Committer: Alexander Kolbasov <akolb@cloudera.com>
Committed: Tue Apr 11 14:12:35 2017 -0700

----------------------------------------------------------------------
 .../db/service/model/MSentryPathChange.java     |  6 +-
 .../db/service/model/MSentryPermChange.java     |  5 +-
 .../persistent/DeltaTransactionBlock.java       |  6 +-
 .../db/service/persistent/SentryStore.java      |  2 +-
 .../db/service/persistent/TestSentryStore.java  | 67 ++++++++++++++++++++
 5 files changed, 81 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
index a0d3445..4b42ed0 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java
@@ -67,11 +67,15 @@ public class MSentryPathChange implements MSentryChange {
   private long createTimeMs;
   private long notificationID;
 
-  public MSentryPathChange(PathsUpdate pathChange) throws TException {
+  public MSentryPathChange(long changeID, PathsUpdate pathChange) throws TException {
     // Each PathsUpdate maps to a MSentryPathChange object.
     // The PathsUpdate is generated from a HMS notification log,
     // the notification ID is stored as seqNum and
     // the notification update is serialized as JSON string.
+    //
+    // See SENTRY-1643. changeID is set after increasing 1 of the "max(changeID)" fetched
from
+    // the table, to avoid holes between changeIDs. it is subjected to change.
+    this.changeID = changeID;
     this.notificationID = pathChange.getSeqNum();
     this.pathChange = pathChange.JSONSerialize();
     this.createTimeMs = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
index 476fbcb..a97d10a 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java
@@ -64,7 +64,10 @@ public class MSentryPermChange implements MSentryChange {
   private String permChange;
   private long createTimeMs;
 
-  public MSentryPermChange(PermissionsUpdate permChange) throws TException {
+  public MSentryPermChange(long changeID, PermissionsUpdate permChange) throws TException
{
+    // See SENTRY-1643. changeID is set after increasing 1 of the "max(changeID)" fetched
from
+    // the table, to avoid holes between changeIDs. it is subjected to change.
+    this.changeID = changeID;
     this.permChange = permChange.JSONSerialize();
     this.createTimeMs = System.currentTimeMillis();
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
index f590a52..8d3c88b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java
@@ -83,9 +83,11 @@ public class DeltaTransactionBlock implements TransactionBlock<Object>
{
     // changeID is trying to be persisted twice, the transaction would
     // fail.
     if (update instanceof PermissionsUpdate) {
-      pm.makePersistent(new MSentryPermChange((PermissionsUpdate)update));
+      long lastChangeID = SentryStore.getLastProcessedChangeIDCore(pm, MSentryPermChange.class);
+      pm.makePersistent(new MSentryPermChange(lastChangeID + 1, (PermissionsUpdate)update));
     } else if (update instanceof PathsUpdate) {
-      pm.makePersistent(new MSentryPathChange((PathsUpdate)update));
+      long lastChangeID = SentryStore.getLastProcessedChangeIDCore(pm, MSentryPathChange.class);
+      pm.makePersistent(new MSentryPathChange(lastChangeID + 1, (PathsUpdate)update));
     } else {
       throw new SentryInvalidInputException("Update should be type of either " +
           "PermissionsUpdate or PathsUpdate.\n");

http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 802b9c6..19bae55 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -3184,7 +3184,7 @@ public class SentryStore {
    * @return the last processed changedID for the delta changes. If no
    *         change found then return 0.
    */
-  private <T extends MSentryChange> Long getLastProcessedChangeIDCore(
+  static <T extends MSentryChange> Long getLastProcessedChangeIDCore(
       PersistenceManager pm, Class<T> changeCls) {
     Query query = pm.newQuery(changeCls);
     query.setResult("max(changeID)");

http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index aaa0b9f..fe3880d 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -20,6 +20,11 @@ package org.apache.sentry.provider.db.service.persistent;
 
 import java.io.File;
 import java.util.*;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -57,11 +62,15 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.sentry.provider.db.service.persistent.QueryParamBuilder.newQueryParamBuilder;
 
 public class TestSentryStore extends org.junit.Assert {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestSentryStore.class);
+
   private static File dataDir;
   private static SentryStore sentryStore;
   private static String[] adminGroups = { "adminGroup1" };
@@ -2572,4 +2581,62 @@ public class TestSentryStore extends org.junit.Assert {
     // TODO: verify MSentryPathChange being purged.
     // assertEquals(1, sentryStore.getMSentryPathChanges().size());
   }
+
+  /**
+   * This test verifies that in the case of concurrently updating delta change tables, no
gap
+   * between change ID was made. All the change IDs must be consecutive ({@see SENTRY-1643}).
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testConcurrentUpdateChanges() throws Exception {
+    final int numThreads = 20;
+    final int numChangesPerThread = 100;
+    final TransactionManager tm = sentryStore.getTransactionManager();
+    final AtomicLong seqNumGenerator = new AtomicLong(0);
+    final CyclicBarrier barrier = new CyclicBarrier(numThreads);
+
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            barrier.await();
+          } catch (Exception e) {
+            LOGGER.error("Barrier failed to await", e);
+            return;
+          }
+          for (int j = 0; j < numChangesPerThread; j++) {
+            List<TransactionBlock<Object>> tbs = new ArrayList<>();
+            PermissionsUpdate update =
+                new PermissionsUpdate(seqNumGenerator.getAndIncrement(), false);
+            tbs.add(new DeltaTransactionBlock(update));
+            try {
+              tm.executeTransaction(tbs);
+            } catch (Exception e) {
+              LOGGER.error("Failed to execute permission update transaction", e);
+              fail(String.format("Transaction failed: %s", e.getMessage()));
+            }
+          }
+        }
+      });
+    }
+    executor.shutdown();
+    executor.awaitTermination(60, TimeUnit.SECONDS);
+
+    List<MSentryPermChange> changes = sentryStore.getMSentryPermChanges();
+    assertEquals(numThreads * numChangesPerThread, changes.size());
+    TreeSet<Long> changeIDs = new TreeSet<>();
+    for (MSentryPermChange change : changes) {
+      changeIDs.add(change.getChangeID());
+    }
+    assertEquals("duplicated change ID", numThreads * numChangesPerThread, changeIDs.size());
+    long prevId = changeIDs.first() - 1;
+    for (Long changeId : changeIDs) {
+      assertTrue(String.format("Found non-consecutive number: prev=%d cur=%d", prevId, changeId),
+          changeId - prevId == 1);
+      prevId = changeId;
+    }
+  }
 }


Mime
View raw message