sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ha...@apache.org
Subject sentry git commit: SENTRY-1411: The sentry client should retry RPCs if it gets a SentryStandbyException (SentryPolicyServiceClient - pool version) (Hao Hao, Reviewed by: Sravya Tirukkovalur)
Date Tue, 23 Aug 2016 00:03:08 GMT
Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 5a96dcdae -> c0ddd6121


SENTRY-1411: The sentry client should retry RPCs if it gets a SentryStandbyException (SentryPolicyServiceClient
- pool version) (Hao Hao, Reviewed by: Sravya Tirukkovalur)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c0ddd612
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c0ddd612
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c0ddd612

Branch: refs/heads/sentry-ha-redesign
Commit: c0ddd6121c2eec30ab43ef7b9511a5114507f773
Parents: 5a96dcd
Author: hahao <hao.hao@cloudera.com>
Authored: Mon Aug 22 15:24:00 2016 -0700
Committer: hahao <hao.hao@cloudera.com>
Committed: Mon Aug 22 15:24:00 2016 -0700

----------------------------------------------------------------------
 .../exception/SentryStandbyException.java       |   4 +
 sentry-hdfs/sentry-hdfs-common/pom.xml          |   5 +
 .../SentryPolicyServiceClientDefaultImpl.java   |  12 +-
 .../thrift/PoolClientInvocationHandler.java     | 305 ++++++++++++++++---
 .../thrift/SentryServiceClientFactory.java      |  11 +-
 .../thrift/SentryServiceClientPoolFactory.java  |  11 +-
 .../thrift/TestSentryServiceFailureCase.java    |   5 +-
 .../TestSentryServiceWithInvalidMsgSize.java    |   2 +-
 .../thrift/TestPoolClientInvocationHandler.java |  69 +++++
 .../AbstractTestWithStaticConfiguration.java    |   2 +-
 .../tests/e2e/hive/TestPolicyImportExport.java  |   4 +-
 11 files changed, 361 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
index b2df699..da6cfce 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
@@ -34,4 +34,8 @@ public class SentryStandbyException extends SentryUserException {
   public SentryStandbyException(String msg, String reason) {
     super(msg, reason);
   }
+
+  public SentryStandbyException(String msg, Throwable t) {
+    super(msg, t);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-hdfs/sentry-hdfs-common/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/pom.xml b/sentry-hdfs/sentry-hdfs-common/pom.xml
index e767e06..1c73aaa 100644
--- a/sentry-hdfs/sentry-hdfs-common/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-common/pom.xml
@@ -69,6 +69,11 @@ limitations under the License.
       <artifactId>sentry-provider-file</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.directory.jdbm</groupId>
+      <artifactId>apacheds-jdbm1</artifactId>
+      <version>2.0.0-M2</version>
+    </dependency>
   </dependencies>
   <build>
     <sourceDirectory>${basedir}/src/main/java</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index 1039e6e..4f42a51 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -132,12 +132,18 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
   }
 
   public SentryPolicyServiceClientDefaultImpl(Configuration conf) throws IOException {
+    this(Preconditions.checkNotNull(conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key
"
+        + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt(
+        ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT), conf);
+  }
+
+  public SentryPolicyServiceClientDefaultImpl(String addr, int port,
+        Configuration conf) throws IOException {
     this.conf = conf;
     Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
     this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull(
-                           conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key "
-                           + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt(
-                           ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT));
+                            addr, "Config key " + ClientConfig.SERVER_RPC_ADDRESS
+                            + " is required"), port);
     this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT,
                                          ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT);
     kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
index a35bf1d..353d461 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
@@ -20,11 +20,13 @@ package org.apache.sentry.service.thrift;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.pool2.PooledObjectFactory;
 import org.apache.commons.pool2.impl.AbandonedConfig;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryStandbyException;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
 import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
@@ -33,63 +35,273 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The PoolClientInvocationHandler is a proxy class for handling thrift call. For every thrift
call,
- * get the instance of SentryPolicyServiceBaseClient from the commons-pool, and return the
instance
- * to the commons-pool after complete the call. For any exception with the call, discard
the
- * instance and create a new one added to the commons-pool. Then, get the instance and do
the call
- * again. For the thread safe, the commons-pool will manage the connection pool, and every
thread
- * can get the connection by borrowObject() and return the connection to the pool by returnObject().
+ * The PoolClientInvocationHandler is a proxy class for handling thrift
+ * call. For every thrift call, get the instance of
+ * SentryPolicyServiceBaseClient from the commons-pool, and return the instance
+ * to the commons-pool after complete the call. For any exception with the call,
+ * discard the instance and create a new one added to the commons-pool. Then,
+ * get the instance and do the call again. For the thread safe, the commons-pool
+ * will manage the connection pool, and every thread can get the connection by
+ * borrowObject() and return the connection to the pool by returnObject().
  */
 
 public class PoolClientInvocationHandler extends SentryClientInvocationHandler {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PoolClientInvocationHandler.class);
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PoolClientInvocationHandler.class);
+  private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred ";
 
   private final Configuration conf;
-  private PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
-  private GenericObjectPool<SentryPolicyServiceClient> pool;
-  private GenericObjectPoolConfig poolConfig;
-  private int connectionRetryTotal;
 
-  private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occured ";
+  /**
+   * The configuration to use for our object pools.
+   * Null if we are not using object pools.
+   */
+  private final GenericObjectPoolConfig poolConfig;
+
+  /**
+   * The total number of connection retries to attempt per endpoint.
+   */
+  private final int connectionRetryTotal;
+
+  /**
+   * The configured sentry servers.
+   */
+  private final Endpoint[] endpoints;
+
+  /**
+   * The endpoint which we are currently using.  This can be read without any locks.
+   * It must be written while holding the endpoints lock.
+   */
+  private volatile int freshestEndpointIdx = 0;
+
+  private class Endpoint {
+    /**
+     * The server address or hostname.
+     */
+    private final String addr;
+
+    /**
+     * The server port.
+     */
+    private final int port;
+
+    /**
+     * The server's poolFactory used to create new clients.
+     */
+    private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
+
+    /**
+     * The server's pool of cached clients.
+     */
+    private final GenericObjectPool<SentryPolicyServiceClient> pool;
+
+    Endpoint(String addr, int port) {
+      this.addr = addr;
+      this.port = port;
+      this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf);
+      this.pool = new GenericObjectPool<SentryPolicyServiceClient>(
+          this.poolFactory, poolConfig, new AbandonedConfig());
+    }
+
+    GenericObjectPool<SentryPolicyServiceClient> getPool() {
+      return pool;
+    }
+
+    String getEndPointStr() {
+      return new String("endpoint at [address " + addr + ", port " + port + "]");
+    }
+  }
 
   public PoolClientInvocationHandler(Configuration conf) throws Exception {
     this.conf = conf;
-    readConfiguration();
-    poolFactory = new SentryServiceClientPoolFactory(conf);
-    pool = new GenericObjectPool<SentryPolicyServiceClient>(poolFactory, poolConfig,
new AbandonedConfig());
+
+    this.poolConfig = new GenericObjectPoolConfig();
+    // config the pool size for commons-pool
+    this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL,
+        ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT));
+    this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE,
+        ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT));
+    this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE,
+        ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT));
+
+    // get the retry number for reconnecting service
+    this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
+        ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT);
+
+    String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS);
+    if (hostsAndPortsStr == null) {
+      throw new RuntimeException("Config key " +
+          ClientConfig.SERVER_RPC_ADDRESS + " is required");
+    }
+    int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT,
+        ClientConfig.SERVER_RPC_PORT_DEFAULT);
+    String[] hostsAndPorts = hostsAndPortsStr.split(",");
+    String[] hosts = new String[hostsAndPorts.length];
+    int[] ports = new int[hostsAndPorts.length];
+    parseHostPortStrings(hostsAndPortsStr, hostsAndPorts, hosts,
+        ports, defaultPort);
+    this.endpoints = new Endpoint[hostsAndPorts.length];
+    for (int i = 0; i < this.endpoints.length; i++) {
+      this.endpoints[i] = new Endpoint(hosts[i], ports[i]);
+      LOGGER.info("Initiate sentry sever endpoint: hostname " + hosts[i] + ", port " + ports[i]);
+    }
+  }
+
+  @VisibleForTesting
+  /**
+   * Utility function for parsing host and port strings. Expected form should be
+   * (host:port). The hostname could be in ipv6 style. Port number can be empty
+   * and will be default to defaultPort.
+   */
+  static protected void parseHostPortStrings(String hostsAndPortsStr,
+        String[] hostsAndPorts, String[] hosts, int[] ports,
+        int defaultPort) {
+    int i = -1;
+    for (String hostAndPort: hostsAndPorts) {
+      i++;
+      hostAndPort = hostAndPort.trim();
+      if (hostAndPort.isEmpty()) {
+        throw new RuntimeException("Cannot handle empty server RPC address " +
+            "in component " + (i + 1) + " of " + hostsAndPortsStr);
+      }
+      int colonIdx = hostAndPort.lastIndexOf(":");
+      if (colonIdx == -1) {
+        // There is no colon in the host+port string.
+        // That means the port is left unspecified, and should be set to
+        // the default.
+        hosts[i] = hostAndPort;
+        ports[i] = defaultPort;
+        continue;
+      }
+      int rightBracketIdx = hostAndPort.indexOf(']', colonIdx);
+      if (rightBracketIdx != -1) {
+        // If there is a right bracket that occurs after the colon, the
+        // colon we found is part of an ipv6 address like this:
+        // [::1].  That means we only have the host part, not the port part.
+        hosts[i] = hostAndPort.substring(0, rightBracketIdx);
+        ports[i] = defaultPort;
+        continue;
+      }
+      // We have a host:port string, where the part after colon should be
+      // the port.
+      hosts[i] = hostAndPort.substring(0, colonIdx);
+      String portStr = hostAndPort.substring(colonIdx+1);
+      try {
+        ports[i] = Integer.valueOf(portStr);
+      } catch (NumberFormatException e) {
+        throw new RuntimeException("Cannot parse port string " + portStr +
+            "in component " + (i + 1) + " of " + hostsAndPortsStr);
+      }
+      if ((ports[i] < 0) || (ports[i] > 65535)) {
+        throw new RuntimeException("Invalid port number given for " + portStr +
+            "in component " + (i + 1) + " of " + hostsAndPortsStr);
+      }
+    }
+    // Strip the brackets off of hostnames and ip addresses enclosed in square
+    // brackets.  This is to support ipv6-style [address]:port addressing.
+    for (int j = 0; j < hosts.length; j++) {
+      if ((hosts[j].startsWith("[")) && (hosts[j].endsWith("]"))) {
+        hosts[j] = hosts[j].substring(1, hosts[j].length() - 1);
+      }
+    }
   }
 
   @Override
-  public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
+  public Object invokeImpl(Object proxy, Method method, Object[] args)
+      throws Exception {
     int retryCount = 0;
-    Object result = null;
-    while (retryCount < connectionRetryTotal) {
+    /**
+     * The maximum number of retries that we will do.  Each endpoint gets its
+     * own set of retries.
+     */
+    int retryLimit = connectionRetryTotal * endpoints.length;
+
+    /**
+     * The index of the endpoint to use.
+     */
+    int endpointIdx = freshestEndpointIdx;
+
+    /**
+     * A list of exceptions from each endpoint.  This starts as null to avoid
+     * memory allocation in the common case where there is no error.
+     */
+    Exception exc[] = null;
+
+    Object ret = null;
+
+    while (retryCount < retryLimit) {
+      GenericObjectPool<SentryPolicyServiceClient> pool =
+          endpoints[endpointIdx].getPool();
       try {
-        // The wapper here is for the retry of thrift call, the default retry number is 3.
-        result = invokeFromPool(method, args);
+        if ((exc != null) &&
+            (exc[endpointIdx] instanceof TTransportException)) {
+          // If there was a TTransportException last time we tried to contact
+          // this endpoint, attempt to create a new connection before we try
+          // again.
+          synchronized (endpoints) {
+            // If there has room, create new instance and add it to the
+            // commons-pool.  This instance will be returned first from the
+            // commons-pool, because the configuration is LIFO.
+            if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
+              pool.addObject();
+            }
+          }
+        }
+        // Try to make the RPC.
+        ret = invokeFromPool(method, args, pool);
         break;
-      } catch (TTransportException e) {
-        // TTransportException means there has connection problem, create a new connection
and try
-        // again. Get the lock of pool and add new connection.
-        synchronized (pool) {
-          // If there has room, create new instance and add it to the commons-pool, this
instance
-          // will be back first from the commons-pool because the configuration is LIFO.
-          if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
-            pool.addObject();
+      } catch (SentryStandbyException | TTransportException e) {
+        if (exc == null) {
+          exc = new Exception[endpoints.length];
+        }
+        exc[endpointIdx] = e;
+      }
+
+      Exception lastExc = exc[endpointIdx];
+      synchronized (endpoints) {
+        int curFreshestEndpointIdx = freshestEndpointIdx;
+        if (curFreshestEndpointIdx == endpointIdx) {
+          curFreshestEndpointIdx =
+              (curFreshestEndpointIdx  + 1) %  endpoints.length;
+          freshestEndpointIdx = curFreshestEndpointIdx;
+        }
+        endpointIdx = curFreshestEndpointIdx;
+      }
+      // Increase the retry num, and throw the exception if can't retry again.
+      retryCount++;
+      if (retryCount == connectionRetryTotal) {
+        boolean allStandby = true, allUnreachable = true;
+        for (int i = 0; i < exc.length; i++) {
+          if (exc[i] instanceof SentryStandbyException) {
+            allUnreachable = false;
+            LOGGER.error("Sentry server " + endpoints[endpointIdx].getEndPointStr()
+                + " is in standby mode");
+          } else {
+            allStandby = false;
+            LOGGER.error("Sentry server " + endpoints[endpointIdx].getEndPointStr()
+                + " is in unreachable.");
           }
         }
-        // Increase the retry num, and throw the exception if can't retry again.
-        retryCount++;
-        if (retryCount == connectionRetryTotal) {
-          throw new SentryUserException(e.getMessage(), e);
+        if (allStandby) {
+          throw new SentryStandbyException("All sentry servers are in " +
+              "standby mode.", lastExc);
+        } else if (allUnreachable) {
+          throw new SentryUserException("All sentry servers are unreachable. " +
+              "Diagnostics is needed for unreachable servers.",
+              lastExc);
+        } else {
+          throw new SentryUserException("All reachable servers are standby. " +
+              "Diagnostics is needed for unreachable servers.",
+              lastExc);
         }
       }
     }
-    return result;
+    return ret;
   }
 
-  private Object invokeFromPool(Method method, Object[] args) throws Exception {
+  private Object invokeFromPool(Method method, Object[] args,
+      GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception {
     Object result = null;
     SentryPolicyServiceClient client;
     try {
@@ -106,7 +318,9 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler
{
       // Get the target exception, check if SentryUserException or TTransportException is
wrapped.
       // TTransportException means there has connection problem with the pool.
       Throwable targetException = e.getCause();
-      if (targetException instanceof SentryUserException) {
+      if (targetException instanceof SentryStandbyException) {
+        throw (SentryStandbyException)targetException;
+      } else if (targetException instanceof SentryUserException) {
         Throwable sentryTargetException = targetException.getCause();
         // If there has connection problem, eg, invalid connection if the service restarted,
         // sentryTargetException instanceof TTransportException = true.
@@ -134,21 +348,12 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler
{
 
   @Override
   public void close() {
-    try {
-      pool.close();
-    } catch (Exception e) {
-      LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
+    for (int i = 0; i < endpoints.length; i++) {
+      try {
+        endpoints[i].getPool().close();
+      } catch (Exception e) {
+        LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
+      }
     }
   }
-
-  private void readConfiguration() {
-    poolConfig = new GenericObjectPoolConfig();
-    // config the pool size for commons-pool
-    poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL, ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT));
-    poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE, ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT));
-    poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE, ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT));
-    // get the retry number for reconnecting service
-    connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
-        ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT);
-  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index 56d774b..9e90af8 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration;
 
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
 
 public final class SentryServiceClientFactory {
 
@@ -32,15 +31,9 @@ public final class SentryServiceClientFactory {
   }
 
   public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
-    boolean pooled = conf.getBoolean(ClientConfig.SENTRY_POOL_ENABLED, false);
-    if (pooled) {
       return (SentryPolicyServiceClient) Proxy
           .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
-              SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
-              new PoolClientInvocationHandler(conf));
-    } else {
-      return new SentryPolicyServiceClientDefaultImpl(conf);
-    }
+          SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+          new PoolClientInvocationHandler(conf));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
index afea78a..0164fa6 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
@@ -36,16 +36,21 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class);
 
-  private Configuration conf;
+  private final String addr;
+  private final int port;
+  private final Configuration conf;
 
-  public SentryServiceClientPoolFactory(Configuration conf) {
+  public SentryServiceClientPoolFactory(String addr, int port,
+                                        Configuration conf) {
+    this.addr = addr;
+    this.port = port;
     this.conf = conf;
   }
 
   @Override
   public SentryPolicyServiceClient create() throws Exception {
     LOGGER.debug("Creating Sentry Service Client...");
-    return new SentryPolicyServiceClientDefaultImpl(conf);
+    return new SentryPolicyServiceClientDefaultImpl(addr, port, conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
index 51bba31..d1ac447 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
@@ -58,10 +58,13 @@ public class TestSentryServiceFailureCase extends SentryServiceIntegrationBase
{
   public void testClientServerConnectionFailure()  throws Exception {
     try {
       connectToSentryService();
+      String requestorUserName = ADMIN_USER;
+      client.listRoles(requestorUserName);
       Assert.fail("Failed to receive Exception");
     } catch(Exception e) {
       LOGGER.info("Excepted exception", e);
-      Throwable cause = e.getCause();
+      // peer callback exception is nested inside SentryUserException.
+      Throwable cause = e.getCause().getCause();
       if (cause == null) {
         throw e;
       }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
index 15eab15..a4dd8a6 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
@@ -104,7 +104,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio
           client.grantServerPrivilege(ADMIN_USER, ROLE_NAME, "server", false);
         } catch (SentryUserException e) {
           exceptionThrown = true;
-          Assert.assertTrue(e.getMessage().contains("org.apache.thrift.transport.TTransportException"));
+          Assert.assertTrue(e.getCause().getMessage().contains("org.apache.thrift.transport.TTransportException"));
         } finally {
           Assert.assertEquals(true, exceptionThrown);
         }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
new file mode 100644
index 0000000..5b0e12b
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestPoolClientInvocationHandler {
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(TestPoolClientInvocationHandler.class);
+
+  private void expectParseHostPortStrings(String hostsAndPortsStr,
+        String[] expectedHosts, int[] expectedPorts) throws Exception {
+    boolean success = false;
+    String[] hostsAndPorts = hostsAndPortsStr.split(",");
+    String[] hosts = new String[hostsAndPorts.length];
+    int[] ports = new int[hostsAndPorts.length];
+    try {
+      PoolClientInvocationHandler.parseHostPortStrings(hostsAndPortsStr,
+          hostsAndPorts, hosts, ports, 8038);
+      success = true;
+    } finally {
+      if (!success) {
+        LOGGER.error("Caught exception while parsing hosts/ports string " +
+            hostsAndPortsStr);
+      }
+    }
+    Assert.assertArrayEquals("Got unexpected hosts results while " +
+        "parsing " + hostsAndPortsStr, expectedHosts, hosts);
+    Assert.assertArrayEquals("Got unexpected ports results while " +
+        "parsing " + hostsAndPortsStr, expectedPorts, ports);
+  }
+
+  @SuppressWarnings("PMD.AvoidUsingHardCodedIP")
+  @Test
+  public void testParseHostPortStrings() throws Exception {
+    expectParseHostPortStrings("foo", new String[] {"foo"}, new int[] {8038});
+    expectParseHostPortStrings("foo,bar",
+        new String[] {"foo", "bar"},
+        new int[] {8038, 8038});
+    expectParseHostPortStrings("foo:2020,bar:2021",
+        new String[] {"foo", "bar"},
+        new int[] {2020, 2021});
+    expectParseHostPortStrings("127.0.0.1:2020,127.1.0.1",
+        new String[] {"127.0.0.1", "127.1.0.1"},
+        new int[] {2020, 8038});
+    expectParseHostPortStrings("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:433",
+        new String[] {"2001:db8:85a3:8d3:1319:8a2e:370:7348"},
+        new int[] {433});
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
index eae33e0..61b24fa 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
@@ -111,7 +111,7 @@ public abstract class AbstractTestWithStaticConfiguration {
       .outerRule(new TestWatcher() {
         @Override
         protected void failed(Throwable e, Description description) {
-          if (e.getMessage().contains("test timed out after")) {
+          if (e.getMessage()!= null && e.getMessage().contains("test timed out after"))
{
             LOGGER.error("Test method time out, but caught by rule, description = " + description
+ "ex = " + e);
           }
         }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c0ddd612/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
index 3f57a00..b45fc6d 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
 import java.util.Set;
 
@@ -172,7 +173,8 @@ public class TestPolicyImportExport extends AbstractTestWithStaticConfiguration
     try {
       configTool.importPolicy();
       fail("IllegalArgumentException should be thrown for: Invalid key value: server [server]");
-    } catch (IllegalArgumentException ex) {
+    } catch (UndeclaredThrowableException ex) {
+      assertTrue(ex.getUndeclaredThrowable().getCause() instanceof IllegalArgumentException);
       // ignore
     }
   }


Mime
View raw message