sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ak...@apache.org
Subject [1/3] sentry git commit: SENTRY-1580: Provide pooled client connection model with HA
Date Thu, 25 May 2017 04:43:40 GMT
Repository: sentry
Updated Branches:
  refs/heads/SENTRY-1580 [created] 95d073f06


http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
index 1d09846..09f17ed 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java
@@ -40,36 +40,39 @@ public class SentryShellHive extends SentryShellCommon {
 
   public void run() throws Exception {
     Command command = null;
-    SentryPolicyServiceClient client = SentryServiceClientFactory.create(getSentryConf());
-    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    String requestorName = ugi.getShortUserName();
 
-    if (isCreateRole) {
-      command = new CreateRoleCmd(roleName);
-    } else if (isDropRole) {
-      command = new DropRoleCmd(roleName);
-    } else if (isAddRoleGroup) {
-      command = new GrantRoleToGroupsCmd(roleName, groupName);
-    } else if (isDeleteRoleGroup) {
-      command = new RevokeRoleFromGroupsCmd(roleName, groupName);
-    } else if (isGrantPrivilegeRole) {
-      command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr);
-    } else if (isRevokePrivilegeRole) {
-      command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr);
-    } else if (isListRole) {
-      command = new ListRolesCmd(groupName);
-    } else if (isListPrivilege) {
-      command = new ListPrivilegesCmd(roleName);
-    }
+    try(SentryPolicyServiceClient client =
+                SentryServiceClientFactory.create(getSentryConf())) {
+      UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+      String requestorName = ugi.getShortUserName();
 
-    // check the requestor name
-    if (StringUtils.isEmpty(requestorName)) {
-      // The exception message will be recoreded in log file.
-      throw new Exception("The requestor name is empty.");
-    }
+      if (isCreateRole) {
+        command = new CreateRoleCmd(roleName);
+      } else if (isDropRole) {
+        command = new DropRoleCmd(roleName);
+      } else if (isAddRoleGroup) {
+        command = new GrantRoleToGroupsCmd(roleName, groupName);
+      } else if (isDeleteRoleGroup) {
+        command = new RevokeRoleFromGroupsCmd(roleName, groupName);
+      } else if (isGrantPrivilegeRole) {
+        command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr);
+      } else if (isRevokePrivilegeRole) {
+        command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr);
+      } else if (isListRole) {
+        command = new ListRolesCmd(groupName);
+      } else if (isListPrivilege) {
+        command = new ListPrivilegesCmd(roleName);
+      }
 
-    if (command != null) {
-      command.execute(client, requestorName);
+      // check the requestor name
+      if (StringUtils.isEmpty(requestorName)) {
+        // The exception message will be recoreded in log file.
+        throw new Exception("The requestor name is empty.");
+      }
+
+      if (command != null) {
+        command.execute(client, requestorName);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
deleted file mode 100644
index acf9b05..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import com.google.common.net.HostAndPort;
-import org.apache.commons.pool2.PooledObjectFactory;
-import org.apache.commons.pool2.impl.AbandonedConfig;
-import org.apache.commons.pool2.impl.GenericObjectPool;
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryUserException;
-import org.apache.sentry.core.common.transport.SentryClientInvocationHandler;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The PoolClientInvocationHandler is a proxy class for handling thrift
- * call. For every thrift call, get the instance of
- * SentryPolicyServiceBaseClient from the commons-pool, and return the instance
- * to the commons-pool after complete the call. For any exception with the call,
- * discard the instance and create a new one added to the commons-pool. Then,
- * get the instance and do the call again. For the thread safe, the commons-pool
- * will manage the connection pool, and every thread can get the connection by
- * borrowObject() and return the connection to the pool by returnObject().
- *
- * TODO: Current pool model does not manage the opening connections very well,
- * e.g. opening connections with failed servers should be closed promptly.
- */
-
-public class PoolClientInvocationHandler extends SentryClientInvocationHandler {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PoolClientInvocationHandler.class);
-
-  private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred ";
-
-  private final Configuration conf;
-
-  /**
-   * The configuration to use for our object pools.
-   * Null if we are not using object pools.
-   */
-  private final GenericObjectPoolConfig poolConfig;
-
-  /**
-   * The total number of connection retries to attempt per endpoint.
-   */
-  private final int connectionRetryTotal;
-
-  /**
-   * The configured sentry servers.
-   */
-  private final Endpoint[] endpoints;
-
-  /**
-   * The endpoint which we are currently using.  This can be read without any locks.
-   * It must be written while holding the endpoints lock.
-   */
-  private volatile int freshestEndpointIdx = 0;
-
-  private class Endpoint {
-    /**
-     * The server address or hostname.
-     */
-    private final String addr;
-
-    /**
-     * The server port.
-     */
-    private final int port;
-
-    /**
-     * The server's poolFactory used to create new clients.
-     */
-    private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory;
-
-    /**
-     * The server's pool of cached clients.
-     */
-    private final GenericObjectPool<SentryPolicyServiceClient> pool;
-
-    Endpoint(String addr, int port) {
-      this.addr = addr;
-      this.port = port;
-      this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf);
-      this.pool = new GenericObjectPool<SentryPolicyServiceClient>(
-          this.poolFactory, poolConfig, new AbandonedConfig());
-    }
-
-    GenericObjectPool<SentryPolicyServiceClient> getPool() {
-      return pool;
-    }
-
-    String getEndPointStr() {
-      return new String("endpoint at [address " + addr + ", port " + port + "]");
-    }
-  }
-
-  public PoolClientInvocationHandler(Configuration conf) throws Exception {
-    this.conf = conf;
-
-    this.poolConfig = new GenericObjectPoolConfig();
-    // config the pool size for commons-pool
-    this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL,
-        ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT));
-    this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE,
-        ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT));
-    this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE,
-        ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT));
-
-    // get the retry number for reconnecting service
-    this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL,
-        ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT);
-
-    String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS);
-    if (hostsAndPortsStr == null) {
-      throw new RuntimeException("Config key " +
-          ClientConfig.SERVER_RPC_ADDRESS + " is required");
-    }
-    int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT,
-        ClientConfig.SERVER_RPC_PORT_DEFAULT);
-    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
-    HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort);
-    this.endpoints = new Endpoint[hostsAndPorts.length];
-    for (int i = 0; i < this.endpoints.length; i++) {
-      this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort());
-      LOGGER.info("Initiate sentry sever endpoint: hostname " +
-          hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort());
-    }
-  }
-
-  @Override
-  public Object invokeImpl(Object proxy, Method method, Object[] args)
-      throws Exception {
-    int retryCount = 0;
-    /**
-     * The maximum number of retries that we will do.  Each endpoint gets its
-     * own set of retries.
-     */
-    int retryLimit = connectionRetryTotal * endpoints.length;
-
-    /**
-     * The index of the endpoint to use.
-     */
-    int endpointIdx = freshestEndpointIdx;
-
-    /**
-     * A list of exceptions from each endpoint.  This starts as null to avoid
-     * memory allocation in the common case where there is no error.
-     */
-    Exception exc[] = null;
-
-    Object ret = null;
-
-    while (retryCount < retryLimit) {
-      GenericObjectPool<SentryPolicyServiceClient> pool =
-          endpoints[endpointIdx].getPool();
-      try {
-        if ((exc != null) &&
-            (exc[endpointIdx] instanceof TTransportException)) {
-          // If there was a TTransportException last time we tried to contact
-          // this endpoint, attempt to create a new connection before we try
-          // again.
-          synchronized (endpoints) {
-            // If there has room, create new instance and add it to the
-            // commons-pool.  This instance will be returned first from the
-            // commons-pool, because the configuration is LIFO.
-            if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) {
-              pool.addObject();
-            }
-          }
-        }
-        // Try to make the RPC.
-        ret = invokeFromPool(method, args, pool);
-        break;
-      } catch (TTransportException e) {
-        if (exc == null) {
-          exc = new Exception[endpoints.length];
-        }
-        exc[endpointIdx] = e;
-      }
-
-      Exception lastExc = exc[endpointIdx];
-      synchronized (endpoints) {
-        int curFreshestEndpointIdx = freshestEndpointIdx;
-        if (curFreshestEndpointIdx == endpointIdx) {
-          curFreshestEndpointIdx =
-              (curFreshestEndpointIdx  + 1) %  endpoints.length;
-          freshestEndpointIdx = curFreshestEndpointIdx;
-        }
-        endpointIdx = curFreshestEndpointIdx;
-      }
-      // Increase the retry num, and throw the exception if can't retry again.
-      retryCount++;
-      if (retryCount == connectionRetryTotal) {
-        for (int i = 0; i < exc.length; i++) {
-          // Since freshestEndpointIdx is shared by multiple threads, it is possible that
-          // the ith endpoint has been tried in another thread and skipped in the current
-          // thread.
-          if (exc[i] != null) {
-            LOGGER.error("Sentry server " + endpoints[i].getEndPointStr()
-                + " is in unreachable.");
-          }
-        }
-        throw new SentryUserException("Sentry servers are unreachable. " +
-            "Diagnostics is needed for unreachable servers.", lastExc);
-      }
-    }
-    return ret;
-  }
-
-  private Object invokeFromPool(Method method, Object[] args,
-      GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception {
-    Object result = null;
-    SentryPolicyServiceClient client;
-    try {
-      // get the connection from the pool, don't know if the connection is broken.
-      client = pool.borrowObject();
-    } catch (Exception e) {
-      LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
-      // If the exception is caused by connection problem, throw the TTransportException
-      // for reconnect.
-      if (e instanceof IOException) {
-        throw new TTransportException(e);
-      }
-      throw new SentryUserException(e.getMessage(), e);
-    }
-    try {
-      // do the thrift call
-      result = method.invoke(client, args);
-    } catch (InvocationTargetException e) {
-      // Get the target exception, check if SentryUserException or TTransportException is
wrapped.
-      // TTransportException or IOException means there has connection problem with the pool.
-      Throwable targetException = e.getCause();
-      if (targetException instanceof SentryUserException) {
-        Throwable sentryTargetException = targetException.getCause();
-        // If there has connection problem, eg, invalid connection if the service restarted,
-        // sentryTargetException instanceof TTransportException or IOException = true.
-        if (sentryTargetException instanceof TTransportException
-            || sentryTargetException instanceof IOException) {
-          // If the exception is caused by connection problem, destroy the instance and
-          // remove it from the commons-pool. Throw the TTransportException for reconnect.
-          pool.invalidateObject(client);
-          throw new TTransportException(sentryTargetException);
-        }
-        // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
-        throw (SentryUserException) targetException;
-      }
-      throw e;
-    } finally{
-      try {
-        // return the instance to commons-pool
-        pool.returnObject(client);
-      } catch (Exception e) {
-        LOGGER.error(POOL_EXCEPTION_MESSAGE, e);
-        throw e;
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void close() {
-    for (int i = 0; i < endpoints.length; i++) {
-      try {
-        endpoints[i].getPool().close();
-      } catch (Exception e) {
-        LOGGER.debug(POOL_EXCEPTION_MESSAGE, e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 9beb07b..ec938da 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -313,7 +313,8 @@ public class SentryService implements Callable, SigUtils.SigListener {
       hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower,
               initDelay, period, TimeUnit.MILLISECONDS);
     } catch (IllegalArgumentException e) {
-      LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period
is %s ms", period), e);
+      LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period
is %s ms",
+              period), e);
       throw e;
     }
   }
@@ -381,7 +382,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
       sentryStoreCleanService.scheduleWithFixedDelay(
               storeCleaner, 0, storeCleanPeriodSecs, TimeUnit.SECONDS);
 
-      LOGGER.info("sentry store cleaner is scheduled with interval %d seconds", storeCleanPeriodSecs);
+      LOGGER.info("sentry store cleaner is scheduled with interval {} seconds", storeCleanPeriodSecs);
     }
     catch(IllegalArgumentException e){
       LOGGER.error("Could not start SentryStoreCleaner due to illegal argument", e);

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index 7db9310..f3aa587 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -19,36 +19,120 @@
 package org.apache.sentry.service.thrift;
 
 import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
 import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.ThreadSafe;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.sentry.core.common.utils.SentryConstants.KERBEROS_MODE;
+
+@ThreadSafe
 public final class SentryServiceClientFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientFactory.class);
+
   private static final SentryPolicyClientTransportConfig transportConfig =
           new SentryPolicyClientTransportConfig();
+  private final Configuration conf;
+  private final SentryTransportPool transportPool;
 
-  private SentryServiceClientFactory() {
-  }
+  private static final AtomicReference<SentryServiceClientFactory> clientFactory =
+          new AtomicReference<>();
 
+  /**
+   * Create a client instance. The supplied configuration is only used the first time and
+   * ignored afterwords. Tests that want to supply different configurations
+   * should call {@link #factoryReset(SentryServiceClientFactory)} to force new configuration
+   * read.
+   * @param conf Configuration
+   * @return client instance
+   * @throws Exception
+   */
   public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
-    boolean pooled = conf.getBoolean(
-      ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT);
-    if (pooled) {
-      return (SentryPolicyServiceClient) Proxy
-        .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
-          SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
-          new PoolClientInvocationHandler(conf));
-    } else {
-      return (SentryPolicyServiceClient) Proxy
-        .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
-          SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
-          new RetryClientInvocationHandler(conf,
-            new SentryPolicyServiceClientDefaultImpl(conf,transportConfig), transportConfig));
+    SentryServiceClientFactory factory = clientFactory.get();
+    if (factory != null) {
+      return factory.create();
+    }
+    factory = new SentryServiceClientFactory(conf);
+    boolean ok = clientFactory.compareAndSet(null, factory);
+    if (ok) {
+      return factory.create();
+    }
+    // Close old factory
+    factory.close();
+    return clientFactory.get().create();
+  }
+
+  private SentryServiceClientFactory(Configuration conf) {
+    Configuration clientConf = conf;
+
+    // When kerberos is enabled,  UserGroupInformation should have been initialized with
+    // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done.
+    // Instead of depending on the callers to update this configuration and to be
+    // sure that UserGroupInformation is properly initialized, sentry client is explicitly
+    // doing it.
+    //
+    // This whole piece of code is a bit ugly but we want to avoid doing this in the transport
+    // code during connection establishment, so we are doing it upfront here instead.
+    boolean useKerberos = transportConfig.isKerberosEnabled(conf);
+
+    if (useKerberos) {
+      LOGGER.info("Using Kerberos authentication");
+      String authMode = conf.get(HADOOP_SECURITY_AUTHENTICATION, "");
+      if (authMode != KERBEROS_MODE) {
+        // Force auth mode to be Kerberos
+        clientConf = new Configuration(conf);
+        clientConf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS_MODE);
+      }
+    }
+
+    this.conf = clientConf;
+
+    boolean useUGI = transportConfig.useUserGroupInformation(conf);
+
+    if (useUGI) {
+      LOGGER.info("Using UserGroupInformation authentication");
+      UserGroupInformation.setConfiguration(this.conf);
+    }
+
+    transportPool = new SentryTransportPool(conf, transportConfig,
+            new SentryTransportFactory(conf, transportConfig));
+  }
+
+  private SentryPolicyServiceClient create() throws Exception {
+    return (SentryPolicyServiceClient) Proxy
+      .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
+        SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+        new RetryClientInvocationHandler(conf,
+          new SentryPolicyServiceClientDefaultImpl(conf, transportPool), transportConfig));
+  }
+
+  /**
+   * Reset existing factory and return the old one.
+   * Only used by tests.
+   * @param factory new factory to use. May be null.
+   * @return
+   */
+  public static  SentryServiceClientFactory factoryReset(SentryServiceClientFactory factory)
{
+    return clientFactory.getAndSet(factory);
+  }
+
+  void close() {
+    try {
+      transportPool.close();
+    } catch (Exception e) {
+      LOGGER.error("failed to close transport pool", e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
deleted file mode 100644
index 0164fa6..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import org.apache.commons.pool2.BasePooledObjectFactory;
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SentryServiceClientPoolFactory is for connection pool to manage the object. Implement
the related
- * method to create object, destroy object and wrap object.
- */
-
-public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<SentryPolicyServiceClient>
{
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class);
-
-  private final String addr;
-  private final int port;
-  private final Configuration conf;
-
-  public SentryServiceClientPoolFactory(String addr, int port,
-                                        Configuration conf) {
-    this.addr = addr;
-    this.port = port;
-    this.conf = conf;
-  }
-
-  @Override
-  public SentryPolicyServiceClient create() throws Exception {
-    LOGGER.debug("Creating Sentry Service Client...");
-    return new SentryPolicyServiceClientDefaultImpl(addr, port, conf);
-  }
-
-  @Override
-  public PooledObject<SentryPolicyServiceClient> wrap(SentryPolicyServiceClient client)
{
-    return new DefaultPooledObject<SentryPolicyServiceClient>(client);
-  }
-
-  @Override
-  public void destroyObject(PooledObject<SentryPolicyServiceClient> pooledObject) {
-    SentryPolicyServiceClient client = pooledObject.getObject();
-    LOGGER.debug("Destroying Sentry Service Client: " + client);
-    if (client != null) {
-      // The close() of TSocket or TSaslClientTransport is called actually, and there has
no
-      // exception even there has some problems, eg, the client is closed already.
-      // The close here is just try to close the socket and the client will be destroyed
soon.
-      client.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
index a4dd8a6..32e67b9 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java
@@ -44,6 +44,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio
     runTestAsSubject(new TestOperation() {
       @Override
       public void runTestAsSubject() throws Exception {
+        SentryServiceClientFactory oldFactory = SentryServiceClientFactory.factoryReset(null);
         Configuration confWithSmallMaxMsgSize = new Configuration(conf);
         confWithSmallMaxMsgSize.setLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
20);
         // create a client with a small thrift max message size
@@ -63,6 +64,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio
         } finally {
           Assert.assertEquals(true, exceptionThrown);
           clientWithSmallMaxMsgSize.close();
+          SentryServiceClientFactory.factoryReset(oldFactory);
         }
 
         // client can still talk with sentry server when message size is smaller.

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
deleted file mode 100644
index a202775..0000000
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import com.google.common.net.HostAndPort;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestPoolClientInvocationHandler {
-  private static final Logger LOGGER =
-    LoggerFactory.getLogger(TestPoolClientInvocationHandler.class);
-
-  private void expectParseHostPortStrings(String hostsAndPortsStr,
-        String[] expectedHosts, int[] expectedPorts) throws Exception {
-    boolean success = false;
-    String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
-    HostAndPort[] hostsAndPorts;
-    try {
-      hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, 8038);
-      success = true;
-    } finally {
-      if (!success) {
-        LOGGER.error("Caught exception while parsing hosts/ports string " +
-            hostsAndPortsStr);
-      }
-    }
-    String[] hosts = new String[hostsAndPortsStrArr.length];
-    int[] ports = new int[hostsAndPortsStrArr.length];
-    parseHostsAndPorts(hostsAndPorts, hosts, ports);
-    Assert.assertArrayEquals("Got unexpected hosts results while " +
-        "parsing " + hostsAndPortsStr, expectedHosts, hosts);
-    Assert.assertArrayEquals("Got unexpected ports results while " +
-        "parsing " + hostsAndPortsStr, expectedPorts, ports);
-  }
-
-  private void parseHostsAndPorts(HostAndPort[] hostsAndPorts, String[] hosts, int[] ports)
{
-    for (int i = 0; i < hostsAndPorts.length; i++) {
-      hosts[i] = hostsAndPorts[i].getHostText();
-      ports[i] = hostsAndPorts[i].getPort();
-    }
-  }
-
-  @SuppressWarnings("PMD.AvoidUsingHardCodedIP")
-  @Test
-  public void testParseHostPortStrings() throws Exception {
-    expectParseHostPortStrings("foo", new String[] {"foo"}, new int[] {8038});
-    expectParseHostPortStrings("foo,bar",
-        new String[] {"foo", "bar"},
-        new int[] {8038, 8038});
-    expectParseHostPortStrings("foo:2020,bar:2021",
-        new String[] {"foo", "bar"},
-        new int[] {2020, 2021});
-    expectParseHostPortStrings("127.0.0.1:2020,127.1.0.1",
-        new String[] {"127.0.0.1", "127.1.0.1"},
-        new int[] {2020, 8038});
-    expectParseHostPortStrings("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:433",
-        new String[] {"2001:db8:85a3:8d3:1319:8a2e:370:7348"},
-        new int[] {433});
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
index bead003..7c45999 100644
--- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java
@@ -28,6 +28,7 @@ import org.apache.sentry.core.model.kafka.KafkaActionConstant;
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.kafka.conf.KafkaAuthConf;
 import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
+import org.apache.sentry.provider.db.generic.UpdatableCache;
 import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
 import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
 import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
@@ -79,8 +80,12 @@ public class AbstractKafkaSentryTestBase {
 
   @BeforeClass
   public static void beforeTestEndToEnd() throws Exception {
+    // Stop background update thread
+    UpdatableCache.disable();
     setupConf();
     startSentryServer();
+    // We started a new server, invalidate all connections to the old one
+    SentryGenericServiceClientFactory.factoryReset();
     setUserGroups();
     setAdminPrivilege();
     startKafkaServer();
@@ -88,8 +93,10 @@ public class AbstractKafkaSentryTestBase {
 
   @AfterClass
   public static void afterTestEndToEnd() throws Exception {
-    stopSentryServer();
+    // Stop background update thread
+    UpdatableCache.disable();
     stopKafkaServer();
+    stopSentryServer();
   }
 
   private static void stopKafkaServer() {
@@ -170,10 +177,8 @@ public class AbstractKafkaSentryTestBase {
   }
 
   public static void setAdminPrivilege() throws Exception {
-    SentryGenericServiceClient sentryClient = null;
-    try {
-      /** grant all privilege to admin user */
-      sentryClient = getSentryClient();
+    try (SentryGenericServiceClient sentryClient = getSentryClient()){
+      // grant all privilege to admin user
       sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT);
       sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP));
       final ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
@@ -184,14 +189,10 @@ public class AbstractKafkaSentryTestBase {
       sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT,
           new TSentryPrivilege(COMPONENT, "kafka", authorizables,
               KafkaActionConstant.ALL));
-    } finally {
-      if (sentryClient != null) {
-        sentryClient.close();
-      }
     }
   }
 
-  protected static SentryGenericServiceClient getSentryClient() throws Exception {
+  static SentryGenericServiceClient getSentryClient() throws Exception {
     return SentryGenericServiceClientFactory.create(getClientConfig());
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
index 0b1ef68..6d2cabf 100644
--- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
+++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
@@ -34,6 +34,7 @@ import org.apache.sentry.core.model.kafka.KafkaActionConstant;
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.core.model.kafka.Topic;
 import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
 import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
 import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
 import org.junit.Assert;
@@ -55,6 +56,8 @@ public class TestAuthorize extends AbstractKafkaSentryTestBase {
 
   @Test
   public void testProduceConsumeForSuperuser() {
+    LOGGER.debug("testProduceConsumeForSuperuser");
+    SentryGenericServiceClientFactory.factoryReset();
     try {
       final String SuperuserName = "test";
       testProduce(SuperuserName);
@@ -66,8 +69,11 @@ public class TestAuthorize extends AbstractKafkaSentryTestBase {
 
   @Test
   public void testProduceConsumeCycle() throws Exception {
+    LOGGER.debug("testProduceConsumeCycle");
     final String localhost = InetAddress.getLocalHost().getHostAddress();
 
+    // SentryGenericServiceClientFactory.factoryReset();
+
     // START TESTING PRODUCER
     try {
       testProduce("user1");

http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
index 8a01e1c..80f158a 100644
--- a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
+++ b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java
@@ -197,19 +197,14 @@ public class AbstractSqoopSentryTestBase {
   }
 
   public static void setAdminPrivilege() throws Exception {
-    SentryGenericServiceClient sentryClient = null;
-    try {
-      /** grant all privilege to admin user */
-      sentryClient = SentryGenericServiceClientFactory.create(getClientConfig());
+    try (SentryGenericServiceClient sentryClient =
+                 SentryGenericServiceClientFactory.create(getClientConfig())){
+      // grant all privilege to admin user
       sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT);
       sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP));
       sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT,
           new TSentryPrivilege(COMPONENT, SQOOP_SERVER_NAME, new ArrayList<TAuthorizable>(),
               SqoopActionConstant.ALL));
-    } finally {
-      if (sentryClient != null) {
-        sentryClient.close();
-      }
     }
   }
 


Mime
View raw message