sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ak...@apache.org
Subject sentry git commit: SENTRY-1780: FullUpdateInitializer does not kill the threads whenever getFullHMSSnapshot throws an exception (Alex Kolbasov, reviewed by Na Li and Vamsee Yarlagadda)
Date Tue, 06 Jun 2017 06:30:34 GMT
Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 4a1365de9 -> 42b092a56


SENTRY-1780: FullUpdateInitializer does not kill the threads whenever getFullHMSSnapshot throws
an exception (Alex Kolbasov, reviewed by Na Li and Vamsee Yarlagadda)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/42b092a5
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/42b092a5
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/42b092a5

Branch: refs/heads/sentry-ha-redesign
Commit: 42b092a56f51c301191f0296fb5045bc16dd6b3d
Parents: 4a1365d
Author: Alexander Kolbasov <akolb@cloudera.com>
Authored: Mon Jun 5 23:30:17 2017 -0700
Committer: Alexander Kolbasov <akolb@cloudera.com>
Committed: Mon Jun 5 23:30:17 2017 -0700

----------------------------------------------------------------------
 .../sentry/hdfs/FullUpdateInitializer.java      | 80 ++++++++++----------
 .../sentry/service/thrift/HMSFollower.java      | 17 ++---
 2 files changed, 49 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/42b092a5/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
index efd3fa3..cf9774c 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
@@ -18,7 +18,6 @@
 package org.apache.sentry.hdfs;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -147,11 +146,11 @@ public final class FullUpdateInitializer implements AutoCloseable {
   }
 
   private static final class CallResult {
-    private final TException failure;
+    private final Exception failure;
     private final boolean successStatus;
     private final ObjectMapping objectMapping;
 
-    CallResult(TException ex) {
+    CallResult(Exception ex) {
       failure = ex;
       successStatus = false;
       objectMapping = emptyObjectMapping;
@@ -171,7 +170,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
       return objectMapping;
     }
 
-    public TException getFailure() {
+    public Exception getFailure() {
       return failure;
     }
   }
@@ -184,53 +183,54 @@ public final class FullUpdateInitializer implements AutoCloseable {
     private final class RetryStrategy {
       private int retryStrategyMaxRetries = 0;
       private final int retryStrategyWaitDurationMillis;
-      private int retries;
 
       private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis)
{
         this.retryStrategyMaxRetries = retryStrategyMaxRetries;
-        retries = 0;
 
         // Assign default wait duration if negative value is provided.
-        if (retryStrategyWaitDurationMillis > 0) {
-          this.retryStrategyWaitDurationMillis = retryStrategyWaitDurationMillis;
-        } else {
-          this.retryStrategyWaitDurationMillis = 1000;
-        }
+        this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
+                retryStrategyWaitDurationMillis : 1000;
       }
 
+      @SuppressWarnings({"squid:S1141", "squid:S2142"})
       public CallResult exec()  {
-
         // Retry logic is happening inside callable/task to avoid
         // synchronous waiting on getting the result.
         // Retry the failure task until reach the max retry number.
         // Wait configurable duration for next retry.
-        TException exception = null;
-        for (int i = 0; i < retryStrategyMaxRetries; i++) {
-          try {
-            return new CallResult(doTask());
-          } catch (TException ex) {
-            LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
-            " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
-                    ex.toString(), ex);
-            exception = ex;
-
+        //
+        // Only thrift exceptions are retried.
+        // Other exceptions are propagated up the stack.
+        Exception exception = null;
+        try {
+          // We catch all exceptions except Thrift exceptions which are retried
+          for (int i = 0; i < retryStrategyMaxRetries; i++) {
+            //noinspection NestedTryStatement
             try {
-              Thread.sleep(retryStrategyWaitDurationMillis);
-            } catch (InterruptedException ignored) {
-              // Skip the rest retries if get InterruptedException.
-              // And set the corresponding retries number.
-              LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
-              retries = i;
-              i = retryStrategyMaxRetries;
+              return new CallResult(doTask());
+            } catch (TException ex) {
+              LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
+                      " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception:
" +
+                      ex.toString(), ex);
+              exception = ex;
+
+              try {
+                Thread.sleep(retryStrategyWaitDurationMillis);
+              } catch (InterruptedException ignored) {
+                // Skip the rest retries if get InterruptedException.
+                // And set the corresponding retries number.
+                LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
+                break;
+              }
             }
           }
-
-          retries = i;
+        } catch (Exception ex) {
+          exception = ex;
         }
-
-        // Task fails, return the failure flag.
-        LOGGER.error("Task did not complete successfully after " + (retries + 1)
-        + " tries", exception);
+        LOGGER.error("Failed to execute task", exception);
+        // We will fail in the end, so we are shutting down the pool to prevent
+        // new tasks from being scheduled.
+        threadPool.shutdown();
         return new CallResult(exception);
       }
     }
@@ -291,6 +291,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
     }
 
     @Override
+    @SuppressWarnings({"squid:S2629", "squid:S135"})
     ObjectMapping doTask() throws TException {
       List<Table> tables = client.getTableObjectsByName(dbName, tableNames);
       if (LOGGER.isDebugEnabled()) {
@@ -345,8 +346,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
     ObjectMapping doTask() throws TException {
       Database db = client.getDatabase(dbName);
       if (!dbName.equalsIgnoreCase(db.getName())) {
-        LOGGER.warn(String.format("Database name %s does not match %s",
-                db.getName(), dbName));
+        LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
         return emptyObjectMapping;
       }
       List<String> allTblStr = client.getAllTables(dbName);
@@ -388,8 +388,9 @@ public final class FullUpdateInitializer implements AutoCloseable {
    * @throws ExecutionException if there was a scheduling error
    * @throws InterruptedException if processing was interrupted
    */
+  @SuppressWarnings("squid:S00112")
   public Map<String, Set<String>> getFullHMSSnapshot()
-          throws TException, ExecutionException, InterruptedException {
+          throws Exception {
     // Get list of all HMS databases
     List<String> allDbStr = client.getAllDatabases();
     // Schedule async task for each database responsible for fetching per-database
@@ -410,7 +411,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
       Future<CallResult> result = results.pop();
       // Wait for the task to complete
       CallResult callResult = result.get();
-      // Fail if we got Thrift errors
+      // Fail if we got errors
       if (!callResult.success()) {
         throw callResult.getFailure();
       }
@@ -438,6 +439,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
       threadPool.awaitTermination(1, TimeUnit.SECONDS);
     } catch (InterruptedException ignored) {
       LOGGER.warn("Interrupted shutdown");
+      Thread.currentThread().interrupt();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/42b092a5/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index d410a6c..78dc0ac 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -54,6 +54,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.SocketException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -273,14 +274,9 @@ public class HMSFollower implements Runnable, AutoCloseable {
         CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId();
         LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID
= %s.", eventIDBefore));
 
-        try {
-          pathsFullSnapshot = fetchFullUpdate();
-          if(pathsFullSnapshot.isEmpty()) {
-            LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data");
-            return;
-          }
-        } catch (ExecutionException | InterruptedException ex) {
-          LOGGER.error("#### Encountered failure during fetching hive full snapshot !!",
ex);
+        pathsFullSnapshot = fetchFullUpdate();
+        if(pathsFullSnapshot.isEmpty()) {
+          LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data");
           return;
         }
 
@@ -385,12 +381,15 @@ public class HMSFollower implements Runnable, AutoCloseable {
    * @throws ExecutionException
    */
   private Map<String, Set<String>> fetchFullUpdate()
-    throws InterruptedException, TException, ExecutionException {
+    throws TException, ExecutionException {
     LOGGER.info("Request full HMS snapshot");
     try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf))
{
       Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
       LOGGER.info("Obtained full HMS snapshot");
       return pathsUpdate;
+    } catch (Exception ignored) {
+      // Caller will retry later
+      return Collections.emptyMap();
     }
   }
 


Mime
View raw message