sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ak...@apache.org
Subject [3/3] sentry git commit: SENTRY-1580: Provide pooled client connection model with HA
Date Thu, 25 May 2017 04:43:42 GMT
SENTRY-1580: Provide pooled client connection model with HA


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/95d073f0
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/95d073f0
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/95d073f0

Branch: refs/heads/SENTRY-1580
Commit: 95d073f0645477e3caff998488ef8aa7de10a829
Parents: f30b80b
Author: Alexander Kolbasov <akolb@cloudera.com>
Authored: Wed May 24 00:48:28 2017 -0700
Committer: Alexander Kolbasov <akolb@cloudera.com>
Committed: Wed May 24 00:48:28 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../binding/hive/authz/SentryConfigTool.java    |  28 +-
 .../SentryMetastorePostEventListenerBase.java   |  20 +-
 .../hive/ql/exec/SentryGrantRevokeTask.java     |  13 +-
 .../SentryMetastorePostEventListener.java       |  17 +-
 .../sentry/kafka/binding/KafkaAuthBinding.java  |   8 +-
 .../binding/solr/authz/SolrAuthzBinding.java    |   8 +-
 .../sentry/sqoop/binding/SqoopAuthBinding.java  |   8 +-
 sentry-core/sentry-core-common/pom.xml          |   4 +
 .../transport/RetryClientInvocationHandler.java |  99 +++---
 .../SentryClientTransportConfigInterface.java   |  78 +++--
 .../SentryClientTransportConstants.java         |  91 ++++--
 .../core/common/transport/SentryConnection.java |  54 ++++
 .../SentryHDFSClientTransportConfig.java        |  71 +++--
 .../SentryPolicyClientTransportConfig.java      |  67 ++--
 .../common/transport/SentryServiceClient.java   |  43 ---
 .../transport/SentryTransportFactory.java       | 309 ++++++------------
 .../common/transport/SentryTransportPool.java   | 312 +++++++++++++++++++
 .../common/transport/TTransportWrapper.java     |  89 ++++++
 .../core/common/transport/TransportFactory.java |  39 +++
 .../sentry/core/common/utils/ThriftUtil.java    |  10 +-
 .../org/apache/sentry/hdfs/SentryUpdater.java   |   6 +-
 .../sentry/hdfs/SentryHDFSServiceClient.java    |  14 +-
 .../SentryHDFSServiceClientDefaultImpl.java     | 107 +++----
 .../hdfs/SentryHDFSServiceClientFactory.java    |  93 +++++-
 .../hdfs/SentryHdfsServiceIntegrationBase.java  |   5 +-
 .../provider/db/SimpleDBProviderBackend.java    |   9 +-
 .../generic/SentryGenericProviderBackend.java   |  17 +-
 .../provider/db/generic/UpdatableCache.java     |  55 +++-
 .../thrift/SentryGenericServiceClient.java      |   4 +-
 .../SentryGenericServiceClientDefaultImpl.java  | 108 ++++---
 .../SentryGenericServiceClientFactory.java      | 114 ++++++-
 .../db/generic/tools/SentryConfigToolSolr.java  |  12 +-
 .../db/generic/tools/SentryShellKafka.java      |  62 ++--
 .../db/generic/tools/SentryShellSolr.java       |  62 ++--
 .../thrift/SentryPolicyServiceClient.java       |   4 +-
 .../SentryPolicyServiceClientDefaultImpl.java   | 220 +++++++------
 .../provider/db/tools/SentryShellHive.java      |  57 ++--
 .../thrift/PoolClientInvocationHandler.java     | 294 -----------------
 .../sentry/service/thrift/SentryService.java    |   5 +-
 .../thrift/SentryServiceClientFactory.java      | 116 ++++++-
 .../thrift/SentryServiceClientPoolFactory.java  |  72 -----
 .../TestSentryServiceWithInvalidMsgSize.java    |   2 +
 .../thrift/TestPoolClientInvocationHandler.java |  79 -----
 .../e2e/kafka/AbstractKafkaSentryTestBase.java  |  21 +-
 .../sentry/tests/e2e/kafka/TestAuthorize.java   |   6 +
 .../e2e/sqoop/AbstractSqoopSentryTestBase.java  |  11 +-
 47 files changed, 1599 insertions(+), 1326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ad54cfd..ba13a0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@ limitations under the License.
     <build.helper.maven.plugin.version>1.8</build.helper.maven.plugin.version>
     <cglib.version>2.2</cglib.version>
     <commons-cli.version>1.2</commons-cli.version>
-    <commons-pool2.version>2.2</commons-pool2.version>
+    <commons-pool2.version>2.4.2</commons-pool2.version>
     <commons.lang.version>2.6</commons.lang.version>
     <commons.logging.version>1.2</commons.logging.version>
     <curator.version>2.11.1</curator.version>

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
index a3140f2..8a5085b 100644
--- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
+++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java
@@ -291,23 +291,27 @@ public class SentryConfigTool {
     Map<String, Map<String, Set<String>>> policyFileMappingData = sentryPolicyFileFormatter.parse(
         importPolicyFilePath, authzConf);
     // todo: here should be an validator to check the data's value, format, hierarchy
-    SentryPolicyServiceClient client = SentryServiceClientFactory.create(getAuthzConf());
-    // import the mapping data to database
-    client.importPolicy(policyFileMappingData, requestorUserName, importOverwriteRole);
+    try(SentryPolicyServiceClient client =
+                SentryServiceClientFactory.create(getAuthzConf())) {
+      // import the mapping data to database
+      client.importPolicy(policyFileMappingData, requestorUserName, importOverwriteRole);
+    }
   }
 
   // export the sentry mapping data to file
   public void exportPolicy() throws Exception {
     String requestorUserName = System.getProperty("user.name", "");
-    SentryPolicyServiceClient client = SentryServiceClientFactory.create(getAuthzConf());
-    // export the sentry mapping data from database to map structure
-    Map<String, Map<String, Set<String>>> policyFileMappingData = client
-        .exportPolicy(requestorUserName, objectPath);
-    // get the FileFormatter according to the configuration
-    SentryPolicyFileFormatter sentryPolicyFileFormatter = SentryPolicyFileFormatFactory
-        .createFileFormatter(authzConf);
-    // write the sentry mapping data to exportPolicyFilePath with the data in map structure
-    sentryPolicyFileFormatter.write(exportPolicyFilePath, policyFileMappingData);
+    try (SentryPolicyServiceClient client =
+                SentryServiceClientFactory.create(getAuthzConf())) {
+      // export the sentry mapping data from database to map structure
+      Map<String, Map<String, Set<String>>> policyFileMappingData = client
+              .exportPolicy(requestorUserName, objectPath);
+      // get the FileFormatter according to the configuration
+      SentryPolicyFileFormatter sentryPolicyFileFormatter = SentryPolicyFileFormatFactory
+              .createFileFormatter(authzConf);
+      // write the sentry mapping data to exportPolicyFilePath with the data in map structure
+      sentryPolicyFileFormatter.write(exportPolicyFilePath, policyFileMappingData);
+    }
   }
 
   // list permissions for given user

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
index 262db11..2abdd53 100644
--- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
+++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java
@@ -356,11 +356,11 @@ public class SentryMetastorePostEventListenerBase extends MetaStoreEventListener
       throws SentryUserException, IOException, MetaException {
     String requestorUserName = UserGroupInformation.getCurrentUser()
         .getShortUserName();
-    SentryPolicyServiceClient sentryClient = getSentryServiceClient();
-    sentryClient.dropPrivileges(requestorUserName, authorizableTable);
-
-    // Close the connection after dropping privileges is done.
-    sentryClient.close();
+    try(SentryPolicyServiceClient sentryClient = getSentryServiceClient()) {
+      sentryClient.dropPrivileges(requestorUserName, authorizableTable);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
   }
 
   private void renameSentryTablePrivilege(String oldDbName, String oldTabName,
@@ -379,9 +379,7 @@ public class SentryMetastorePostEventListenerBase extends MetaStoreEventListener
     if (!oldTabName.equalsIgnoreCase(newTabName)
         && syncWithPolicyStore(AuthzConfVars.AUTHZ_SYNC_ALTER_WITH_POLICY_STORE)) {
 
-      SentryPolicyServiceClient sentryClient = getSentryServiceClient();
-
-      try {
+      try (SentryPolicyServiceClient sentryClient = getSentryServiceClient()){
         String requestorUserName = UserGroupInformation.getCurrentUser()
             .getShortUserName();
         sentryClient.renamePrivileges(requestorUserName, oldAuthorizableTable, newAuthorizableTable);
@@ -392,10 +390,8 @@ public class SentryMetastorePostEventListenerBase extends MetaStoreEventListener
             + " Error: " + e.getMessage());
       } catch (IOException e) {
         throw new MetaException("Failed to find local user " + e.getMessage());
-      } finally {
-
-        // Close the connection after renaming privileges is done.
-        sentryClient.close();
+      } catch (Exception e) {
+        e.printStackTrace();
       }
     }
     // The HDFS plugin needs to know if it's a path change (set location)

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
index da587d0..3f06cae 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java
@@ -98,7 +98,6 @@ public class SentryGrantRevokeTask extends Task<DDLWork> implements Serializable
   private static final int terminator = Utilities.newLineCode;
   private static final long serialVersionUID = -7625118066790571999L;
 
-  private SentryPolicyServiceClient sentryClient;
   private HiveConf conf;
   private HiveAuthzBinding hiveAuthzBinding;
   private HiveAuthzConf authzConf;
@@ -116,13 +115,8 @@ public class SentryGrantRevokeTask extends Task<DDLWork> implements Serializable
 
   @Override
   public int execute(DriverContext driverContext) {
-    try {
-      try {
-        this.sentryClient = SentryServiceClientFactory.create(authzConf);
-      } catch (Exception e) {
-        String msg = "Error creating Sentry client: " + e.getMessage();
-        throw new RuntimeException(msg, e);
-      }
+    try (SentryPolicyServiceClient sentryClient =
+                 SentryServiceClientFactory.create(authzConf)) {
       Preconditions.checkNotNull(hiveAuthzBinding, "HiveAuthzBinding cannot be null");
       Preconditions.checkNotNull(authzConf, "HiveAuthConf cannot be null");
       Preconditions.checkNotNull(subject, "Subject cannot be null");
@@ -179,9 +173,6 @@ public class SentryGrantRevokeTask extends Task<DDLWork> implements Serializable
       console.printError(msg);
       return RETURN_CODE_FAILURE;
     } finally {
-      if (sentryClient != null) {
-        sentryClient.close();
-      }
       if (hiveAuthzBinding != null) {
         hiveAuthzBinding.close();
       }

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
index fdb6df4..3ec2eed 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
@@ -364,11 +364,12 @@ public class SentryMetastorePostEventListener extends MetaStoreEventListener {
       throws SentryUserException, IOException, MetaException {
     String requestorUserName = UserGroupInformation.getCurrentUser()
         .getShortUserName();
-    SentryPolicyServiceClient sentryClient = getSentryServiceClient();
-    sentryClient.dropPrivileges(requestorUserName, authorizableTable);
-
-    // Close the connection after dropping privileges is done.
-    sentryClient.close();
+    try (SentryPolicyServiceClient sentryClient = SentryServiceClientFactory.create(authzConf)) {
+      sentryClient.dropPrivileges(requestorUserName, authorizableTable);
+    } catch (Exception e) {
+      throw new MetaException("Failed to connect to Sentry service "
+              + e.getMessage());
+    }
   }
 
   private void renameSentryTablePrivilege(String oldDbName, String oldTabName,
@@ -403,7 +404,11 @@ public class SentryMetastorePostEventListener extends MetaStoreEventListener {
       } finally {
 
         // Close the connection after renaming privileges is done.
-        sentryClient.close();
+        try {
+          sentryClient.close();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
       }
     }
     // The HDFS plugin needs to know if it's a path change (set location)

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index 4851114..f5d4431 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -365,9 +365,7 @@ public class KafkaAuthBinding {
   }
 
   private <T> T execute(Command<T> cmd) throws KafkaException {
-    SentryGenericServiceClient client = null;
-    try {
-      client = getClient();
+    try (SentryGenericServiceClient client  = getClient()){
       return cmd.run(client);
     } catch (SentryUserException ex) {
       String msg = "Unable to excute command on sentry server: " + ex.getMessage();
@@ -377,10 +375,6 @@ public class KafkaAuthBinding {
       String msg = "Unable to obtain client:" + ex.getMessage();
       LOG.error(msg, ex);
       throw new KafkaException(msg, ex);
-    } finally {
-      if (client != null) {
-        client.close();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java b/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
index 2400673..37adb56 100644
--- a/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
+++ b/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java
@@ -298,9 +298,7 @@ public class SolrAuthzBinding {
     if (!isSyncEnabled()) {
       return;
     }
-    SentryGenericServiceClient client = null;
-    try {
-      client = getClient();
+    try (SentryGenericServiceClient client = getClient()) {
       TSentryPrivilege tPrivilege = new TSentryPrivilege();
       tPrivilege.setComponent(AuthorizationComponent.Search);
       tPrivilege.setServiceName(authzConf.get(SENTRY_SEARCH_SERVICE_KEY,
@@ -316,10 +314,6 @@ public class SolrAuthzBinding {
           " can't delete privileges for collection " + collection);
     } catch (Exception ex) {
       throw new SentrySolrAuthorizationException("Unable to obtain client:" + ex.getMessage());
-    } finally {
-      if (client != null) {
-        client.close();
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java b/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
index 84a61cc..11e2aa4 100644
--- a/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
+++ b/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java
@@ -408,9 +408,7 @@ public class SqoopAuthBinding {
   }
 
   private <T> T execute(Command<T> cmd) throws SqoopException {
-    SentryGenericServiceClient client = null;
-    try {
-      client = getClient();
+    try (SentryGenericServiceClient client = getClient()){
       return cmd.run(client);
     } catch (SentryUserException ex) {
       String msg = "Unable to excute command on sentry server: " + ex.getMessage();
@@ -420,10 +418,6 @@ public class SqoopAuthBinding {
       String msg = "Unable to obtain client:" + ex.getMessage();
       LOG.error(msg, ex);
       throw new SqoopException(SecurityError.AUTH_0014, msg, ex);
-    } finally {
-      if (client != null) {
-        client.close();
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/pom.xml b/sentry-core/sentry-core-common/pom.xml
index e1be256..2e7f5c5 100644
--- a/sentry-core/sentry-core-common/pom.xml
+++ b/sentry-core/sentry-core-common/pom.xml
@@ -66,6 +66,10 @@ limitations under the License.
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
     </dependency>
+      <dependency>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-pool2</artifactId>
+      </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
index 34a594e..112f050 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
@@ -25,7 +25,6 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
@@ -46,17 +45,17 @@ import java.lang.reflect.Method;
  * <p>
  */
 
-public class RetryClientInvocationHandler extends SentryClientInvocationHandler {
+public final class RetryClientInvocationHandler extends SentryClientInvocationHandler {
   private static final Logger LOGGER =
     LoggerFactory.getLogger(RetryClientInvocationHandler.class);
-  private SentryServiceClient client = null;
+  private SentryConnection client = null;
   private final int maxRetryCount;
 
   /**
    * Initialize the sentry configurations, including rpc retry count and client connection
    * configs for SentryPolicyServiceClientDefaultImpl
    */
-  public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject,
+  public RetryClientInvocationHandler(Configuration conf, SentryConnection clientObject,
                                       SentryClientTransportConfigInterface transportConfig) {
     Preconditions.checkNotNull(conf, "Configuration object cannot be null");
     Preconditions.checkNotNull(clientObject, "Client Object cannot be null");
@@ -72,76 +71,78 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
    * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException
    * if failed retry after rpcRetryTotal times.
    * if it is failed with other exception, method would just re-throw the exception.
-   * Synchronized it for thread safety.
    */
   @Override
   public synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
-    int retryCount = 0;
     Exception lastExc = null;
 
-    while (retryCount < maxRetryCount) {
-      // Connect to a sentry server if not connected yet.
-      try {
-        client.connect();
-      } catch (IOException e) {
-        // Increase the retry num
-        // Retry when the exception is caused by connection problem.
-        retryCount++;
-        lastExc = e;
-        close();
-        continue;
-      }
+    for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+      connect();
 
       // do the thrift call
       try {
+        LOGGER.debug("Calling client {}", method.getName());
         return method.invoke(client, args);
       } catch (InvocationTargetException e) {
         // Get the target exception, check if SentryUserException or TTransportException is wrapped.
         // TTransportException means there is a connection problem.
+        LOGGER.error("failed to execute {}", method.getName(), e);
         Throwable targetException = e.getCause();
-        if (targetException instanceof SentryUserException ||
-          targetException instanceof SentryHdfsServiceException) {
-          Throwable sentryTargetException = targetException.getCause();
-          // If there has connection problem, eg, invalid connection if the service restarted,
-          // sentryTargetException instanceof TTransportException will be true.
-          if (sentryTargetException instanceof TTransportException) {
-            // Retry when the exception is caused by connection problem.
-            lastExc = new TTransportException(sentryTargetException);
-            LOGGER.error("Thrift call failed with TTransportException", lastExc);
-            // Closing the thrift client on TTransportException. New client object is
-            // created using new socket when an attempt to reconnect is made.
-            close();
+        if (!((targetException instanceof SentryUserException) ||
+            (targetException instanceof SentryHdfsServiceException))) {
+          throw e;
+        }
+        Throwable sentryTargetException = targetException.getCause();
+        // If there has connection problem, eg, invalid connection if the service restarted,
+        // sentryTargetException instanceof TTransportException will be true.
+        if (sentryTargetException instanceof TTransportException) {
+          // Retry when the exception is caused by connection problem.
+          lastExc = new TTransportException(sentryTargetException);
+          LOGGER.error("Thrift call failed", lastExc);
+          // The connection to the server is bad, inform the client of the problem
+          client.invalidate();
+        } else {
+          // Semantic exception which does not indicate the connection failure.
+          // Do not need to reconnect to the sentry server.
+          if (targetException instanceof SentryUserException) {
+            throw (SentryUserException) targetException;
           } else {
-            // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
-            // Do not need to reconnect to the sentry server.
-            if (targetException instanceof SentryUserException) {
-              throw (SentryUserException) targetException;
-            } else {
-              throw (SentryHdfsServiceException) targetException;
-            }
+            throw (SentryHdfsServiceException) targetException;
           }
-        } else {
-          throw e;
         }
       }
-
-      // Increase the retry num
-      retryCount++;
     }
 
     // Throw the exception as reaching the max rpc retry num.
     String error = String.format("Request failed, %d retries attempted ", maxRetryCount);
-    LOGGER.error(error, lastExc);
     throw new SentryUserException(error, lastExc);
   }
 
+  /**
+   * Connect the client, retry multiple times
+   * @throws Exception
+   */
+  private void connect() throws Exception {
+    Exception lastExc = null;
+    for (int retryCount = 0;  retryCount < maxRetryCount; retryCount++) {
+      try {
+        client.connect();
+        return;
+      } catch (Exception e) {
+        // Increase the retry num
+        // Retry when the exception is caused by connection problem.
+        LOGGER.error("failed to connect", e);
+        retryCount++;
+        lastExc = e;
+      }
+    }
+    assert lastExc != null;
+    throw lastExc;
+  }
+
   @Override
   public synchronized void close() {
-    try {
-      LOGGER.debug("Releasing the current client connection");
-      client.disconnect();
-    } catch (Exception e) {
-      LOGGER.error("Encountered failure while closing the connection");
-    }
+    //We are done with this client
+    client.done();
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
index 9ea7185..ad45305 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.sentry.core.common.transport;
 
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.MissingConfigurationException;
 
@@ -29,14 +30,6 @@ import org.apache.sentry.core.common.exception.MissingConfigurationException;
  * the transport configuration.
  */
 interface SentryClientTransportConfigInterface {
-  /**
-   * @param conf configuration
-   * @return number of times client retry logic should iterate through all
-   * the servers before giving up.
-   * @throws MissingConfigurationException if property is mandatory and is missing in
-   *                                       configuration.
-   */
-  int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException;
 
   /**
    * @param conf configuration
@@ -52,7 +45,7 @@ interface SentryClientTransportConfigInterface {
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException;
+  boolean isKerberosEnabled(Configuration conf);
 
   /**
    * @param conf configuration
@@ -61,7 +54,7 @@ interface SentryClientTransportConfigInterface {
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  boolean useUserGroupInformation(Configuration conf) throws MissingConfigurationException;
+  boolean useUserGroupInformation(Configuration conf);
 
   /**
    * @param conf configuration
@@ -69,7 +62,7 @@ interface SentryClientTransportConfigInterface {
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  String getSentryPrincipal(Configuration conf) throws MissingConfigurationException;
+  String getSentryPrincipal(Configuration conf);
 
   /**
    * Port in RPC Addresses configured is optional
@@ -78,7 +71,7 @@ interface SentryClientTransportConfigInterface {
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  String getSentryServerRpcAddress(Configuration conf) throws MissingConfigurationException;
+  String getSentryServerRpcAddress(Configuration conf);
 
   /**
    * Port in RPC Addresses configured is optional. If a port is not provided for a server
@@ -88,7 +81,7 @@ interface SentryClientTransportConfigInterface {
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  int getServerRpcPort(Configuration conf) throws MissingConfigurationException;
+  int getServerRpcPort(Configuration conf);
 
   /**
    * @param conf configuration
@@ -99,14 +92,59 @@ interface SentryClientTransportConfigInterface {
    * @throws MissingConfigurationException if property is mandatory and is missing in
    *                                       configuration.
    */
-  int getServerRpcConnTimeoutInMs(Configuration conf) throws MissingConfigurationException;
+  int getServerRpcConnTimeoutInMs(Configuration conf);
 
   /**
-   *
+   * Maximum number of connections in the pool.
+   * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMaxTotal(int)}
    * @param conf configuration
-   * @return True if the client should load balance connections between multiple servers
-   * @throws MissingConfigurationException if property is mandatory and is missing in
-   *                                       configuration.
+   * @return maximum number of connection objects in the pool
+   */
+  int getPoolMaxTotal(Configuration conf);
+
+  /**
+   * Minimum number of idle obects on the pool.
+   * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMinIdle(int)}
+   * @param conf Configuration
+   * @return Minimum idle connections to keep in the pool
+   */
+  int getPoolMinIdle(Configuration conf);
+
+  /**
+   * Maximum number of idle connections in the pool.
+   * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMaxIdle(int)}
+   * @param conf Configuration
+   * @return Maximum number of idle connections in the pool
+   */
+  int getPoolMaxIdle(Configuration conf);
+
+  /**
+   * This is the minimum amount of time an object may sit idle in the pool
+   * before it is eligible for eviction.
+   * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMinEvictableIdleTimeMillis}
+   * @param conf Configuration
+   * @return The value for the pool minimum eviction time.
+   */
+  long getMinEvictableTimeSec(Configuration conf);
+
+  /**
+   * The number of seconds to sleep between runs of the idle object evictor thread.
+   * When non-positive, no idle object evictor thread will be run.
+   * See {@link GenericObjectPoolConfig#getTimeBetweenEvictionRunsMillis()}
+   * @param conf Configuration
+   * @return The number of seconds to sleep between runs of the idle object evictor thread.
+   */
+  long getTimeBetweenEvictionRunsSec(Configuration conf);
+
+  /**
+   * @param conf configuration
+   * @return True if using load-balancing between Sentry servers
+   */
+  boolean isLoadBalancingEnabled(Configuration conf);
+
+  /**
+   * @param conf configuration
+   * @return true if transport pools are enabled
    */
-   boolean isLoadBalancingEnabled(Configuration conf)throws MissingConfigurationException;
+  boolean isTransportPoolEnabled(Configuration conf);
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
index 651173e..4caa6b6 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
@@ -19,6 +19,8 @@
 package org.apache.sentry.core.common.transport;
 
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Defines configuration strings needed for sentry thrift clients to handle the transport level
  * operations.
@@ -27,7 +29,7 @@ package org.apache.sentry.core.common.transport;
  * Clients that needs these configuration string use the implementations of interface
  * <code>SentryClientTransportConfigInterface</code>.
  */
-class SentryClientTransportConstants {
+public final class SentryClientTransportConstants {
 
   /**
    * max retry num for client rpc
@@ -44,8 +46,18 @@ class SentryClientTransportConstants {
     "sentry.service.client.connection.full.retry-total";
   static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2;
 
+  /**
+   * Enable load balancing between servers
+   */
+  static final String SENTRY_CLIENT_LOAD_BALANCING =
+          "sentry.service.client.connection.loadbalance";
+  static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
+
   static final int RPC_PORT_DEFAULT = 8038;
 
+  private SentryClientTransportConstants() {
+  }
+
   /**
    * Defines configuration strings needed for sentry thrift policy clients to handle
    * the transport level operations.
@@ -57,7 +69,6 @@ class SentryClientTransportConstants {
     //configuration for server address. It can be coma seperated list of server addresses.
     static final String SERVER_RPC_ADDRESS = "sentry.service.client.server.rpc-address";
 
-
     /**
      * This configuration parameter is only meant to be used for testing purposes.
      */
@@ -72,7 +83,7 @@ class SentryClientTransportConstants {
     static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT =
       SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT;
 
-    static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi";
+    public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi";
     static final String PRINCIPAL = "sentry.service.server.principal";
 
     //configration for the client connection timeout.
@@ -88,35 +99,39 @@ class SentryClientTransportConstants {
     static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total";
     static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3;
 
-    // connection pool configuration
-    static final String SENTRY_POOL_ENABLED = "sentry.service.client.connection.pool.enabled";
-    static final boolean SENTRY_POOL_ENABLED_DEFAULT = false;
+    // commons-pool configuration
+    static final String SENTRY_POOL_ENABLE = "sentry.service.client.connection.pool.enabled";
+    static final boolean SENTRY_POOL_ENABLE_DEFAULT = true;
 
-    // commons-pool configuration for pool size
+    /** Allow unlimited number of idle connections */
     static final String SENTRY_POOL_MAX_TOTAL = "sentry.service.client.connection.pool.max-total";
-    static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 8;
+    static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = -1;
     static final String SENTRY_POOL_MAX_IDLE = "sentry.service.client.connection.pool.max-idle";
-    static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 8;
+    static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 256;
     static final String SENTRY_POOL_MIN_IDLE = "sentry.service.client.connection.pool.min-idle";
-    static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 0;
-
-    // configuration to load balance the connections to the configured sentry servers
-    static final String SENTRY_CLIENT_LOAD_BALANCING = "sentry.service.client.connection.loadbalance";
-    static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
-
-    // retry num for getting the connection from connection pool
-    static final String SENTRY_POOL_RETRY_TOTAL =
-      SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL;
-    static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT =
-      SentryClientTransportConstants.SENTRY_RPC_RETRY_TOTAL_DEFAULT;
-
+    static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 16;
+    static final String SENTRY_POOL_MIN_EVICTION_TIME_SEC =
+            "sentry.service.client.connection.pool.eviction.mintime.sec";
+    // 2 minutes seconds min time before eviction
+    static final long SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT =
+            TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);;
+    static final String SENTRY_POOL_EVICTION_INTERVAL_SEC =
+            "sentry.service.client.connection.pool.eviction.interval.sec";
+    // Run eviction thread every minute
+    static final long SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT =
+            TimeUnit.MILLISECONDS.convert(1L, TimeUnit.MINUTES);
+
+    static final String SENTRY_CLIENT_LOAD_BALANCING =
+            SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING;
+    static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT =
+            SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING_DEFAULT;
   }
 
   /**
    * Defines configuration strings needed for sentry HDFS clients to handle the transport level
    * operations.
    */
-  static class HDFSClientConstants {
+  public static class HDFSClientConstants {
 
     //Default server port
     static final int SERVER_RPC_PORT_DEFAULT = SentryClientTransportConstants.RPC_PORT_DEFAULT;
@@ -142,14 +157,10 @@ class SentryClientTransportConstants {
     static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT =
       SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT;
 
-    static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi";
+    public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi";
 
     static final String PRINCIPAL = "sentry.hdfs.service.server.principal";
 
-    static final String RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address";
-
-    static final String RPC_ADDRESS_DEFAULT = "0.0.0.0"; //NOPMD
-
     //configration for the client connection timeout.
     static final String SERVER_RPC_CONN_TIMEOUT =
       "sentry.hdfs.service.client.server.rpc-connection-timeout";
@@ -165,8 +176,28 @@ class SentryClientTransportConstants {
 
     static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3;
 
-    // configuration to load balance the connections to the configured sentry servers
-    static final String SENTRY_CLIENT_LOAD_BALANCING = "sentry.hdfs.service.client.connection.loadbalance";
-    static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true;
+    // commons-pool configuration - disable pool for HDFS clients
+    static final String SENTRY_POOL_ENABLE = "sentry.hdfs.service.client.connection.pool.enable";
+    static final boolean SENTRY_POOL_ENABLE_DEFAULT = false;
+
+    /** Total maximum number of open connections. There shouldn't be many. */
+    static final String SENTRY_POOL_MAX_TOTAL = "sentry.hdfs.service.client.connection.pool.max-total";
+    static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 16;
+    /** Maximum number of idle connections to keep */
+    static final String SENTRY_POOL_MAX_IDLE = "sentry.hdfs.service.client.connection.pool.max-idle";
+    static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 2;
+    static final String SENTRY_POOL_MIN_IDLE = "sentry.hdfs.service.client.connection.pool.min-idle";
+    static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 1;
+    static final String SENTRY_POOL_MIN_EVICTION_TIME_SEC =
+            "sentry.hdfs.service.client.connection.pool.eviction.mintime.sec";
+    // No evictions for HDFS connections by default
+    static final long SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT = 0L;
+    static final String SENTRY_POOL_EVICTION_INTERVAL_SEC =
+            "sentry.hdfs.service.client.connection.pool.eviction.interval.sec";
+    static final long SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT = -1L;
+    static final String SENTRY_CLIENT_LOAD_BALANCING =
+            SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING;
+    static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT =
+            SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING_DEFAULT;
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java
new file mode 100644
index 0000000..b5f2bcf
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+/**
+ * Representation of a connection to a Sentry Server.
+ * <ul>
+ *   <li>Connection is initialized using the {@link #connect()} method.</li>
+ *   <li>When the connection is no longer used, the {@link #done()} method should be called to
+ * deallocate any resources.</li>
+ * <li>If the user detected that connection is broken, they should call
+ * {@link #invalidate()} method. The connection can not be used after that.</li>
+ * </ul>
+ */
+public interface SentryConnection {
+  /**
+   * Connect to Sentry server.
+   * Either creates a new connection or reuses an existing one.
+   * @throws Exception on failure to connect.
+   */
+  void connect() throws Exception;
+
+  /**
+   * Disconnect from the server. May close connection or return it to a
+   * pool for reuse.
+   */
+  void done();
+
+  /**
+   * The connection is assumed to be non-working, invalidate it.
+   * Subsequent {@link #connect() call} should attempt to obtain
+   * another connection.
+   * <p>
+   * The implementation may attempt to connect
+   * to another server immediately or delay it till the call to
+   * {@link #connect()}.
+   */
+  void invalidate();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
index 2d80827..1724e7f 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
@@ -32,63 +32,92 @@ import static org.apache.sentry.core.common.transport.SentryClientTransportConst
  */
 public final class SentryHDFSClientTransportConfig
   implements SentryClientTransportConfigInterface {
-  public SentryHDFSClientTransportConfig() { }
 
   @Override
-  public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException {
+  public boolean isKerberosEnabled(Configuration conf) {
     return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim()
       .equalsIgnoreCase((SentryConstants.KERBEROS_MODE)));
   }
 
   @Override
-  public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException {
-    return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT);
-  }
-
-  @Override
   public int getSentryRpcRetryTotal(Configuration conf) {
     return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
   }
 
   @Override
-  public boolean useUserGroupInformation(Configuration conf)
-    throws MissingConfigurationException {
-    return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
+  public boolean useUserGroupInformation(Configuration conf) {
+    return Boolean.parseBoolean(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
   }
 
+  /**
+   * @throws MissingConfigurationException
+   */
   @Override
-  public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException {
+  public String getSentryPrincipal(Configuration conf) {
     String principle = conf.get(PRINCIPAL);
-    if (principle != null && !principle.isEmpty()) {
+    if ((principle != null) && !principle.isEmpty()) {
       return principle;
     }
     throw new MissingConfigurationException(PRINCIPAL);
   }
 
+  /**
+   * @throws MissingConfigurationException
+   */
   @Override
-  public String getSentryServerRpcAddress(Configuration conf)
-    throws MissingConfigurationException {
+  public String getSentryServerRpcAddress(Configuration conf) {
     String serverAddress = conf.get(SERVER_RPC_ADDRESS);
-    if (serverAddress != null && !serverAddress.isEmpty()) {
+    if ((serverAddress != null) && !serverAddress.isEmpty()) {
       return serverAddress;
     }
     throw new MissingConfigurationException(SERVER_RPC_ADDRESS);
   }
 
   @Override
-  public int getServerRpcPort(Configuration conf) throws MissingConfigurationException {
+  public int getServerRpcPort(Configuration conf) {
     return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT);
   }
 
   @Override
-  public int getServerRpcConnTimeoutInMs(Configuration conf)
-    throws MissingConfigurationException {
+  public int getServerRpcConnTimeoutInMs(Configuration conf) {
     return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT);
   }
 
   @Override
-  public boolean isLoadBalancingEnabled(Configuration conf)
-    throws MissingConfigurationException {
-    return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
+  public int getPoolMaxTotal(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MAX_TOTAL, SENTRY_POOL_MAX_TOTAL_DEFAULT);
+  }
+
+  @Override
+  public int getPoolMinIdle(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MIN_IDLE, SENTRY_POOL_MIN_IDLE_DEFAULT);
+  }
+
+  @Override
+  public int getPoolMaxIdle(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MAX_IDLE, SENTRY_POOL_MAX_IDLE_DEFAULT);
+  }
+
+  @Override
+  public long getMinEvictableTimeSec(Configuration conf) {
+    return conf.getLong(SENTRY_POOL_MIN_EVICTION_TIME_SEC,
+            SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT);
+  }
+
+  @Override
+  public long getTimeBetweenEvictionRunsSec(Configuration conf) {
+    return conf.getLong(SENTRY_POOL_EVICTION_INTERVAL_SEC,
+            SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT);
+  }
+
+  @Override
+  public boolean isLoadBalancingEnabled(Configuration conf) {
+    return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING,
+            SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
+  }
+
+  @Override
+  public boolean isTransportPoolEnabled(Configuration conf) {
+    return conf.getBoolean(SENTRY_POOL_ENABLE, SENTRY_POOL_ENABLE_DEFAULT);
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
index c97fe97..45522df 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
@@ -21,6 +21,7 @@ package org.apache.sentry.core.common.transport;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.MissingConfigurationException;
 import org.apache.sentry.core.common.utils.SentryConstants;
+
 import static org.apache.sentry.core.common.transport.SentryClientTransportConstants.PolicyClientConstants.*;
 
 /**
@@ -32,63 +33,91 @@ import static org.apache.sentry.core.common.transport.SentryClientTransportConst
  */
 public final class SentryPolicyClientTransportConfig
   implements SentryClientTransportConfigInterface {
-  public SentryPolicyClientTransportConfig() { }
 
   @Override
-  public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException {
+  public boolean isKerberosEnabled(Configuration conf) {
     return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim()
       .equalsIgnoreCase((SentryConstants.KERBEROS_MODE)));
   }
 
   @Override
-  public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException {
-    return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT);
-  }
-
-  @Override
   public int getSentryRpcRetryTotal(Configuration conf) {
     return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
   }
 
   @Override
-  public boolean useUserGroupInformation(Configuration conf)
-    throws MissingConfigurationException {
+  public boolean useUserGroupInformation(Configuration conf) {
     return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
   }
 
+  /**
+   * @throws MissingConfigurationException
+   */
   @Override
-  public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException {
+  public String getSentryPrincipal(Configuration conf) {
     String principle = conf.get(PRINCIPAL);
-    if (principle != null && !principle.isEmpty()) {
+    if ((principle != null) && !principle.isEmpty()) {
       return principle;
     }
     throw new MissingConfigurationException(PRINCIPAL);
   }
 
+  /**
+   * @throws MissingConfigurationException
+   */
   @Override
-  public String getSentryServerRpcAddress(Configuration conf)
-    throws MissingConfigurationException {
+  public String getSentryServerRpcAddress(Configuration conf) {
     String serverAddress = conf.get(SERVER_RPC_ADDRESS);
-    if (serverAddress != null && !serverAddress.isEmpty()) {
+    if ((serverAddress != null) && !serverAddress.isEmpty()) {
       return serverAddress;
     }
     throw new MissingConfigurationException(SERVER_RPC_ADDRESS);
   }
 
   @Override
-  public int getServerRpcPort(Configuration conf) throws MissingConfigurationException {
+  public int getServerRpcPort(Configuration conf) {
     return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT);
   }
 
   @Override
-  public int getServerRpcConnTimeoutInMs(Configuration conf)
-    throws MissingConfigurationException {
+  public int getServerRpcConnTimeoutInMs(Configuration conf) {
     return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT);
   }
 
   @Override
-  public boolean isLoadBalancingEnabled(Configuration conf)
-    throws MissingConfigurationException {
+  public int getPoolMaxTotal(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MAX_TOTAL, SENTRY_POOL_MAX_TOTAL_DEFAULT);
+  }
+
+  @Override
+  public int getPoolMinIdle(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MIN_IDLE, SENTRY_POOL_MIN_IDLE_DEFAULT);
+  }
+
+  @Override
+  public int getPoolMaxIdle(Configuration conf) {
+    return conf.getInt(SENTRY_POOL_MAX_IDLE, SENTRY_POOL_MAX_IDLE_DEFAULT);
+  }
+
+  @Override
+  public long getMinEvictableTimeSec(Configuration conf) {
+    return conf.getLong(SENTRY_POOL_MIN_EVICTION_TIME_SEC,
+            SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT);
+  }
+
+  @Override
+  public long getTimeBetweenEvictionRunsSec(Configuration conf) {
+    return conf.getLong(SENTRY_POOL_EVICTION_INTERVAL_SEC,
+            SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT);
+  }
+
+  @Override
+  public boolean isLoadBalancingEnabled(Configuration conf) {
     return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, SENTRY_CLIENT_LOAD_BALANCING_DEFAULT);
   }
+
+  @Override
+  public boolean isTransportPoolEnabled(Configuration conf) {
+    return conf.getBoolean(SENTRY_POOL_ENABLE, SENTRY_POOL_ENABLE_DEFAULT);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
deleted file mode 100644
index 9a10ca5..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.core.common.transport;
-
-/**
- * Client interface for Proxy Invocation handlers
- * <p>
- * Defines interface that Sentry client's should expose to the Invocation handlers like
- * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry
- * client instances .
- * <p>
- * All the sentry clients that need retrying and failover capabilities should implement
- * this interface.
- */
-public interface SentryServiceClient {
-  /**
-   * Connect to Sentry server.
-   * Either creates a new connection or reuses an existing one.
-   * @throws Exception on failure to acquire a transport towards server.
-   */
-  void connect() throws Exception;
-
-  /**
-   * Disconnect from the server. May close connection or return it to a
-   * pool for reuse.
-   */
-  void disconnect();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
index 74aced2..b41bdfd 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,215 +22,73 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.net.HostAndPort;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
 import org.apache.thrift.transport.TSaslClientTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
-import org.apache.sentry.core.common.utils.ThriftUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.ThreadSafe;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
 
 /**
- * Create Thrift transports suitable for talking to Sentry
+ * Factory for producing connected Thrift transports.
+ * It can produce regular transports as well as Kerberos-enabled transports.
+ * <p>
+ * This class is immutable and thus thread-safe.
  */
+@ThreadSafe
+public final class SentryTransportFactory implements TransportFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class);
 
-public class SentryTransportFactory {
-  protected final Configuration conf;
-  private String[] serverPrincipalParts;
-  protected TTransport thriftTransport;
+  private final Configuration conf;
+  private final boolean useUgi;
+  private final String serverPrincipal;
   private final int connectionTimeout;
-  private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class);
-  // configs for connection retry
-  private final int connectionFullRetryTotal;
-  private final ArrayList<InetSocketAddress> endpoints;
-  private final SentryClientTransportConfigInterface transportConfig;
+  private final boolean isKerberosEnabled;
   private static final ImmutableMap<String, String> SASL_PROPERTIES =
     ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
 
   /**
-   * This transport wraps the Sasl transports to set up the right UGI context for open().
-   */
-  public static class UgiSaslClientTransport extends TSaslClientTransport {
-    UserGroupInformation ugi = null;
-
-    public UgiSaslClientTransport(String mechanism, String protocol,
-                                  String serverName, TTransport transport,
-                                  boolean wrapUgi, Configuration conf)
-      throws IOException, SaslException {
-      super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
-        transport);
-      if (wrapUgi) {
-        ugi = UserGroupInformation.getLoginUser();
-      }
-    }
-
-    // open the SASL transport with using the current UserGroupInformation
-    // This is needed to get the current login context stored
-    @Override
-    public void open() throws TTransportException {
-      if (ugi == null) {
-        baseOpen();
-      } else {
-        try {
-          if (ugi.isFromKeytab()) {
-            ugi.checkTGTAndReloginFromKeytab();
-          }
-          ugi.doAs(new PrivilegedExceptionAction<Void>() {
-            public Void run() throws TTransportException {
-              baseOpen();
-              return null;
-            }
-          });
-        } catch (IOException e) {
-          throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
-        } catch (InterruptedException e) {
-          throw new TTransportException(
-            "Interrupted while opening underlying transport: " + e.getMessage(), e);
-        }
-      }
-    }
-
-    private void baseOpen() throws TTransportException {
-      super.open();
-    }
-  }
-
-  /**
    * Initialize the object based on the sentry configuration provided.
-   * List of configured servers are reordered randomly preventing all
-   * clients connecting to the same server.
    *
    * @param conf            Sentry configuration
    * @param transportConfig transport configuration to use
    */
   public SentryTransportFactory(Configuration conf,
-                                SentryClientTransportConfigInterface transportConfig) throws IOException {
+                         SentryClientTransportConfigInterface transportConfig) {
 
     this.conf = conf;
     Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    serverPrincipalParts = null;
-    this.transportConfig = transportConfig;
-
-    try {
-      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
-      this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
-      if(transportConfig.isKerberosEnabled(conf) &&
-        transportConfig.useUserGroupInformation(conf)) {
-          // Re-initializing UserGroupInformation, if needed
-          UserGroupInformationInitializer.initialize(conf);
-      }
-      String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
-
-      int serverPort = transportConfig.getServerRpcPort(conf);
-
-      String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
-      HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
-
-      this.endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
-      for (HostAndPort endpoint : hostsAndPorts) {
-        this.endpoints.add(
-          new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
-        LOGGER.debug("Added server endpoint: " + endpoint.toString());
-      }
-
-      if((endpoints.size() > 1) && (transportConfig.isLoadBalancingEnabled(conf))) {
-        // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
-        // and load balance the connections towards sentry servers
-        Collections.shuffle(endpoints);
-      }
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Initialize object based on the parameters provided provided.
-   *
-   * @param addr            Host address which the client needs to connect
-   * @param port            Host Port which the client needs to connect
-   * @param conf            Sentry configuration
-   * @param transportConfig transport configuration to use
-   */
-  public SentryTransportFactory(String addr, int port, Configuration conf,
-                                SentryClientTransportConfigInterface transportConfig) throws IOException {
-    // copy the configuration because we may make modifications to it.
-    this.conf = new Configuration(conf);
-    serverPrincipalParts = null;
-    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    this.transportConfig = transportConfig;
-
-    try {
-      this.endpoints = new ArrayList<>(1);
-      this.endpoints.add(NetUtils.createSocketAddr(addr, port));
-      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
-      this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e);
-    }
-  }
-
-
-  /**
-   * On connection error, Iterates through all the configured servers and tries to connect.
-   * On successful connection, control returns
-   * On connection failure, continues iterating through all the configured sentry servers,
-   * and then retries the whole server list no more than connectionFullRetryTotal times.
-   * In this case, it won't introduce more latency when some server fails.
-   * <p>
-   * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
-   */
-  public TTransport getTransport() throws IOException {
-    IOException currentException = null;
-    for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
-      try {
-        return connectToAvailableServer();
-      } catch (IOException e) {
-        currentException = e;
-        LOGGER.error(
-                "Failed to connect to all the configured sentry servers, Retrying again");
-      }
+    connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
+    isKerberosEnabled = transportConfig.isKerberosEnabled(conf);
+    if (isKerberosEnabled) {
+      useUgi = transportConfig.useUserGroupInformation(conf);
+      serverPrincipal = transportConfig.getSentryPrincipal(conf);
+    } else {
+      serverPrincipal = null;
+      useUgi = false;
     }
-    // Throws exception on reaching the connectionFullRetryTotal.
-    LOGGER.error(
-      String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
-      currentException);
-    throw currentException;
   }
 
   /**
-   * Iterates through all the configured servers and tries to connect.
-   * On connection error, tries to connect to next server.
-   * Control returns on successful connection OR it's done trying to all the
-   * configured servers.
-   *
+   * Connect to the endpoint and return a connected Thrift transport.
+   * @return Connection to the endpoint
    * @throws IOException
    */
-  private TTransport connectToAvailableServer() throws IOException {
-    IOException currentException = null;
-    for (InetSocketAddress addr : endpoints) {
-      try {
-        return connectToServer(addr);
-      } catch (IOException e) {
-        LOGGER.error(String.format("Failed connection to %s: %s",
-          addr.toString(), e.getMessage()), e);
-        currentException = e;
-      }
-    }
-    throw currentException;
+  @Override
+  public TTransportWrapper getTransport(HostAndPort endpoint) throws IOException {
+    return new TTransportWrapper(connectToServer(new InetSocketAddress(endpoint.getHostText(),
+                                                 endpoint.getPort())),
+                                 endpoint);
   }
 
   /**
@@ -241,70 +99,93 @@ public class SentryTransportFactory {
    */
   private TTransport connectToServer(InetSocketAddress serverAddress) throws IOException {
     try {
-      thriftTransport = createTransport(serverAddress);
+      TTransport thriftTransport = createTransport(serverAddress);
       thriftTransport.open();
+      return thriftTransport;
     } catch (TTransportException e) {
       throw new IOException("Failed to open transport: " + e.getMessage(), e);
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException(e.getMessage(), e);
     }
-
-    LOGGER.debug("Successfully opened transport: " + thriftTransport + " to " + serverAddress);
-    return thriftTransport;
   }
 
   /**
-   * New socket is is created
-   *
-   * @param serverAddress
-   * @return
+   * Create transport given InetSocketAddress
+   * @param serverAddress - endpoint address
+   * @return unconnected transport
    * @throws TTransportException
-   * @throws MissingConfigurationException
    * @throws IOException
    */
   private TTransport createTransport(InetSocketAddress serverAddress)
-    throws TTransportException, MissingConfigurationException, IOException {
-    TTransport socket = new TSocket(serverAddress.getHostName(),
-      serverAddress.getPort(), connectionTimeout);
+          throws IOException {
+    String hostName = serverAddress.getHostName();
+    int port = serverAddress.getPort();
+    TTransport socket = new TSocket(hostName, port, connectionTimeout);
 
-    if (!transportConfig.isKerberosEnabled(conf)) {
+    if (!isKerberosEnabled) {
+      LOGGER.debug("created unprotected connection to {}:{} ", hostName, port);
       return socket;
-    } else {
-      String serverPrincipal = transportConfig.getSentryPrincipal(conf);
-      serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
-      LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
-      if (serverPrincipalParts == null) {
-        serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
-        Preconditions.checkArgument(serverPrincipalParts.length == 3,
-          "Kerberos principal should have 3 parts: " + serverPrincipal);
-      }
+    }
 
-      boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
-      return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
-        serverPrincipalParts[0], serverPrincipalParts[1],
-        socket, wrapUgi, conf);
+    String principal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+    String[] serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+    if (serverPrincipalParts.length != 3) {
+      throw new IOException("Kerberos principal should have 3 parts: " + principal);
     }
-  }
 
-  private boolean isConnected() {
-    return thriftTransport != null && thriftTransport.isOpen();
-  }
+    UgiSaslClientTransport connection =
+            new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
+              serverPrincipalParts[0], serverPrincipalParts[1],
+              socket, useUgi);
 
-  /**
-   * Method currently closes the transport
-   * TODO (Kalyan) Plan is to hold the transport and resuse it accross multiple client's
-   * That way, new connection need not be created for each new client
-   */
-  public void releaseTransport() {
-    close();
+    LOGGER.debug("created secured connection to {}:{} ", hostName, port);
+    return connection;
   }
 
   /**
-   * Method closes the transport
+   * This transport wraps the Sasl transports to set up the right UGI context for open().
    */
-  public void close() {
-    if (isConnected()) {
-      thriftTransport.close();
+  private static class UgiSaslClientTransport extends TSaslClientTransport {
+    private UserGroupInformation ugi = null;
+
+    UgiSaslClientTransport(String mechanism, String protocol,
+                           String serverName, TTransport transport,
+                           boolean wrapUgi)
+            throws IOException, SaslException {
+      super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
+              transport);
+      if (wrapUgi) {
+        ugi = UserGroupInformation.getLoginUser();
+      }
+    }
+
+    // open the SASL transport with using the current UserGroupInformation
+    // This is needed to get the current login context stored
+    @Override
+    public void open() throws TTransportException {
+      if (ugi == null) {
+        baseOpen();
+      } else {
+        try {
+          if (ugi.isFromKeytab()) {
+            ugi.checkTGTAndReloginFromKeytab();
+          }
+          ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws TTransportException {
+              baseOpen();
+              return null;
+            }
+          });
+        } catch (IOException e) {
+          throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new TTransportException(
+                  "Interrupted while opening underlying transport: " + e.getMessage(), e);
+        }
+      }
+    }
+
+    private void baseOpen() throws TTransportException {
+      super.open();
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
new file mode 100644
index 0000000..e33dd2b
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.utils.ThriftUtil;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Pool of transport connections to Sentry server.
+ * The pool caches open connections to multiple Sentry servers,
+ * specified in the configuration.
+ *
+ * When transport pooling is disabled in configuration,
+ * creates transports directly.
+ */
+@ThreadSafe
+public final class SentryTransportPool implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportPool.class);
+
+  // Used for logging to identify pool instances. This is only useful for test debugging
+  // so we do not preserve thread safety for this field.
+  private static int poolId = 0;
+  private final int id;
+
+  // True if using Object pool
+  private final boolean isPoolEnabled;
+
+  // Load balance between servers if true
+  private final boolean doLoadBalancing;
+
+  // List of all known servers
+  private final ArrayList<HostAndPort> endpoints;
+
+  // Transport pool which keeps connected transports
+  private final KeyedObjectPool<HostAndPort, TTransportWrapper> pool;
+  // Source of connected transports
+  private final TransportFactory transportFactory;
+
+  // Set when we are closed
+  private final AtomicBoolean closed = new AtomicBoolean();
+
+  /**
+   * Configure transport pool.
+   * <p>
+   * The pool accepts the following configuration:
+   * <ul>
+   *   <li>Maximum total number of objects in the pool</li>
+   *   <li>Minimum number of idle objects</li>
+   *   <li>Maximum number of idle objects</li>
+   *   <li>Minimum time before the object is evicted</li>
+   *   <li>Interval between evictions</li>
+   * </ul>
+   * @param conf Configuration
+   * @param transportConfig Configuration interface
+   * @param transportFactory Transport factory used to produce transports
+   */
+  public SentryTransportPool(Configuration conf,
+                             SentryClientTransportConfigInterface transportConfig,
+                             TransportFactory transportFactory) {
+
+    // This isn't thread-safe, but we don't care - it is only used
+    // for debugging when running tests - normal apps use a single pool
+    poolId++;
+    id = poolId;
+
+    this.transportFactory = transportFactory;
+    doLoadBalancing = transportConfig.isLoadBalancingEnabled(conf);
+    isPoolEnabled = transportConfig.isTransportPoolEnabled(conf);
+
+    // Get list of server addresses
+    String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
+    int serverPort = transportConfig.getServerRpcPort(conf);
+    LOGGER.info("Creating pool for {} with default port {}",
+            hostsAndPortsStr, serverPort);
+    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+    Preconditions.checkArgument(hostsAndPortsStrArr.length > 0,
+            "At least one server should be specified");
+
+    endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
+    for(String addr: hostsAndPortsStrArr) {
+      HostAndPort endpoint = ThriftUtil.parseAddress(addr, serverPort);
+      LOGGER.info("Adding endpoint {}", endpoint);
+      endpoints.add(endpoint);
+    }
+
+    // this.transportFactory = new SentryTransportFactory(conf, transportConfig);
+
+    if (!isPoolEnabled) {
+      pool = null;
+      LOGGER.info("Connection pooling is disabled");
+      return;
+    }
+
+    LOGGER.info("Connection pooling is enabled");
+    // Set pool configuration based on Configuration settings
+    GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
+
+    // Don't limit maximum number of objects in the pool
+    poolConfig.setMaxTotal(-1);
+
+    poolConfig.setMinIdlePerKey(transportConfig.getPoolMinIdle(conf));
+    poolConfig.setMaxIdlePerKey(transportConfig.getPoolMaxIdle(conf));
+
+    // Do not block when pool is exhausted, throw exception instead
+    poolConfig.setBlockWhenExhausted(false);
+    poolConfig.setTestOnReturn(true);
+
+    // No limit for total objects in the pool
+    poolConfig.setMaxTotalPerKey(transportConfig.getPoolMaxTotal(conf));
+    poolConfig.setMinEvictableIdleTimeMillis(transportConfig.getMinEvictableTimeSec(conf));
+    poolConfig.setTimeBetweenEvictionRunsMillis(transportConfig.getTimeBetweenEvictionRunsSec(conf));
+
+    // Create object pool
+    pool = new GenericKeyedObjectPool<>(new PoolFactory(this.transportFactory, id),
+            poolConfig);
+  }
+
+  public TTransportWrapper getTransport() throws Exception {
+    List<HostAndPort> servers;
+    // If we are doing load balancing and there is more then one server,
+    // shuffle them before obtaining connection
+    if (doLoadBalancing && (endpoints.size() > 1)) {
+      servers = new ArrayList<>(endpoints);
+      Collections.shuffle(servers);
+    } else {
+      servers = endpoints;
+    }
+
+    // Try to get a connection from one of the pools
+    Exception failure = null;
+    for(HostAndPort addr: servers) {
+      try {
+        TTransportWrapper transport =
+          isPoolEnabled ?
+                pool.borrowObject(addr) :
+                transportFactory.getTransport(addr);
+        LOGGER.debug("[{}] obtained transport {}", id, transport);
+        return transport;
+      } catch (IllegalStateException e) {
+        // Should not happen
+        LOGGER.error("Unexpected error from pool {}", id,  e);
+        failure = e;
+      } catch (Exception e) {
+        LOGGER.error("Failed to obtain transport for {}: {}",
+                addr, e.getMessage());
+        failure = e;
+      }
+    }
+    // Failed to borrow connect to any endpoint
+    assert failure != null;
+    throw failure;
+  }
+
+  /**
+   * Return transport to the pool
+   * @param transport Open transport
+   */
+  public void returnTransport(TTransportWrapper transport) {
+    if (closed.get()) {
+      LOGGER.debug("Returned {} to closed pool", transport);
+      transport.close();
+      return;
+    }
+    try {
+      if (isPoolEnabled) {
+        LOGGER.debug("[{}] returning {}", id, transport);
+        pool.returnObject(transport.getAddress(), transport);
+      } else {
+        LOGGER.debug("Closing {}", transport);
+        transport.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to return {}", transport, e);
+    }
+  }
+
+  public void invalidateTransport(TTransportWrapper transport) {
+    if (closed.get()) {
+      LOGGER.debug("invalidated {} for closed pool", transport);
+      transport.close();
+      return;
+    }
+    try {
+      LOGGER.debug("[{}] Invalidating address {}", id, transport);
+      if (!isPoolEnabled) {
+        transport.close();
+      } else {
+        pool.invalidateObject(transport.getAddress(), transport);
+        // Invalidate the whole pool associated with the given address
+        // It is a bit brutal since a single bad connection may
+        // cause an invalidation, but otherwise we may have a lot of bad
+        // connections in the pool and try to return them.
+        pool.clear(transport.getAddress());
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to invalidate {}", transport, e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (closed.get()) {
+      // already closed
+      return;
+    }
+    LOGGER.debug("[{}] closing", id);
+    if (pool != null) {
+      LOGGER.debug("Closing pool of {}/{} endpoints",
+              pool.getNumIdle(), pool.getNumActive());
+      pool.close();
+    }
+  }
+
+  /**
+   * Factory that creates and destroys pool objects
+   */
+  private static final class PoolFactory
+          extends BaseKeyedPooledObjectFactory<HostAndPort, TTransportWrapper> {
+    private final TransportFactory transportFactory;
+    private final int id;
+
+    /**
+     * Create a pool factory associated with the given transport factory
+     * @param transportFactory - factory producing transports
+     * @param id pool id (for debugging)
+     */
+    private PoolFactory(TransportFactory transportFactory, int id) {
+      this.transportFactory = transportFactory;
+      this.id = id;
+    }
+
+    @Override
+    public boolean validateObject(HostAndPort key, PooledObject<TTransportWrapper> p) {
+      TTransportWrapper transport = p.getObject();
+      if (transport == null) {
+        LOGGER.error("No transport to validate");
+        return false;
+      }
+      if (transport.getAddress() != key) {
+        LOGGER.error("Invalid endpoint {}: does not match {}", transport, key);
+        return false;
+      }
+      try {
+        transport.flush();
+      } catch (TTransportException e) {
+        LOGGER.error("Failed to verify connection to {}", key, e);
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public TTransportWrapper create(HostAndPort key) throws Exception {
+      TTransportWrapper transportWrapper = transportFactory.getTransport(key);
+      LOGGER.debug("[{}] created {}", id, transportWrapper);
+      return transportWrapper;
+    }
+
+    @Override
+    public void destroyObject(HostAndPort key, PooledObject<TTransportWrapper> p) throws Exception {
+      TTransportWrapper transport = p.getObject();
+      if (transport != null) {
+        LOGGER.debug("[{}] Destroying endpoint {}", id, transport);
+        try {
+          transport.close();
+        } catch (RuntimeException e) {
+          LOGGER.error("fail to destroy endpoint {}", transport, e);
+        }
+      }
+    }
+
+    @Override
+    public PooledObject<TTransportWrapper> wrap(TTransportWrapper value) {
+      return new DefaultPooledObject<>(value);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java
new file mode 100644
index 0000000..07283fb
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TTransportWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.net.HostAndPort;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Closeable;
+
+/**
+ * Extension of Thrift Transport which also provides the endpoint address.
+ * The address is represented as {@link HostAndPort} object.
+ */
+public final class TTransportWrapper implements Closeable {
+  private final TTransport transport;
+  private final HostAndPort address;
+
+  /**
+   * @param transport Thrift transport (may be in any state)
+   * @param address The address associated with this transport.
+   */
+  TTransportWrapper(TTransport transport, HostAndPort address) {
+    this.transport = transport;
+    this.address = address;
+  }
+
+  /**
+   * @return Thrift transport value
+   */
+  public TTransport getTTransport() {
+    return transport;
+  }
+
+  /**
+   * @return endpoint address for the transport
+   */
+  public HostAndPort getAddress() {
+    return address;
+  }
+
+  /**
+   * @return True if and only if the transport is open
+   */
+  public boolean isOpen() {
+    return transport.isOpen();
+  }
+
+  /**
+   * Flush the underlying transport
+   * @throws TTransportException
+   */
+  public void flush() throws TTransportException {
+    transport.flush();
+  }
+
+  /**
+   * @return human-readable representation of a transport.
+   * It includes the endpoint address and open/closed state.
+   */
+  @Override
+  public String toString() {
+    return address.toString();
+  }
+
+  /**
+   * Close the underlying transport
+   */
+  @Override
+  public void close() {
+    transport.close();
+  }
+}


Mime
View raw message