sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vam...@apache.org
Subject [03/52] [abbrv] sentry git commit: SENTRY-1593: Implement client failover for Generic and NN clients
Date Wed, 14 Jun 2017 00:56:41 GMT
SENTRY-1593: Implement client failover for Generic and NN clients

CDH-53213 Backport of SENTRY-1593

Change-Id: I59e0deba9160ea26ca609107ec83748b4df7c291
Reviewed-on: http://gerrit.sjc.cloudera.com:8080/22186
Tested-by: Jenkins User
Reviewed-by: Kalyan Kumar Kalvagadda <kkalyan@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/0dbe38aa
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/0dbe38aa
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/0dbe38aa

Branch: refs/for/cdh5-1.5.1_ha
Commit: 0dbe38aa8bd61c8ea44458614fcaa7d1bcbf0d56
Parents: f21172c
Author: Alexander Kolbasov <akolb@cloudera.com>
Authored: Thu Apr 27 21:59:00 2017 -0700
Committer: Kalyan Kumar Kalvagadda <kkalyan@cloudera.com>
Committed: Fri Apr 28 13:12:55 2017 -0700

----------------------------------------------------------------------
 sentry-core/sentry-core-common/pom.xml          |   4 +
 .../exception/SentryHdfsServiceException.java   |  33 ++
 .../transport/RetryClientInvocationHandler.java | 147 +++++
 .../SentryClientInvocationHandler.java          |  54 ++
 .../SentryClientTransportConfigInterface.java   |   7 +
 .../SentryClientTransportConstants.java         |   1 +
 .../SentryHDFSClientTransportConfig.java        |   5 +
 .../SentryPolicyClientTransportConfig.java      |   5 +
 .../common/transport/SentryServiceClient.java   |  43 ++
 .../transport/SentryTransportFactory.java       | 309 ++++++++++
 .../core/common/utils/PolicyStoreConstants.java |  28 +
 .../sentry/core/common/utils/ThriftUtil.java    | 123 ++++
 .../apache/sentry/hdfs/ServiceConstants.java    |   5 +-
 sentry-hdfs/sentry-hdfs-dist/pom.xml            |   4 +
 .../sentry/hdfs/SentryHDFSServiceClient.java    |   2 +-
 .../SentryHDFSServiceClientDefaultImpl.java     | 182 ++----
 .../hdfs/SentryHDFSServiceClientFactory.java    |  26 +-
 .../hdfs/SentryHDFSServiceProcessorFactory.java |   2 +-
 .../sentry/hdfs/SentryHdfsServiceException.java |  33 --
 .../thrift/SentryGenericPolicyProcessor.java    |   2 +-
 .../SentryGenericPolicyProcessorWrapper.java    |   2 +-
 .../SentryGenericServiceClientDefaultImpl.java  | 321 +++++-----
 .../SentryGenericServiceClientFactory.java      |  22 +-
 .../db/log/entity/JsonLogEntityFactory.java     |   2 +-
 .../service/persistent/TransactionManager.java  |   2 +-
 .../db/service/thrift/PolicyStoreConstants.java |  28 -
 .../SentryPolicyServiceClientDefaultImpl.java   | 585 ++++++++-----------
 .../thrift/SentryPolicyStoreProcessor.java      |   2 +-
 .../service/thrift/SentryProcessorWrapper.java  |   1 +
 .../provider/db/service/thrift/ThriftUtil.java  | 123 ----
 .../thrift/PoolClientInvocationHandler.java     |   3 +-
 .../thrift/RetryClientInvocationHandler.java    | 146 -----
 .../thrift/SentryClientInvocationHandler.java   |  54 --
 .../thrift/SentryServiceClientFactory.java      |  28 +-
 .../sentry/service/thrift/ServiceConstants.java |  19 +-
 .../TestSentryGenericPolicyProcessor.java       |   2 +-
 .../thrift/TestSentryGenericServiceClient.java  |  61 ++
 .../db/log/entity/TestJsonLogEntityFactory.java |   2 +-
 .../log/entity/TestJsonLogEntityFactoryGM.java  |   2 +-
 .../thrift/TestSentryPolicyServiceClient.java   |  64 ++
 .../thrift/TestSentryPolicyStoreProcessor.java  |   2 +-
 .../thrift/TestPoolClientInvocationHandler.java |   2 +-
 42 files changed, 1376 insertions(+), 1112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/pom.xml b/sentry-core/sentry-core-common/pom.xml
index d6e4ca3..20376b9 100644
--- a/sentry-core/sentry-core-common/pom.xml
+++ b/sentry-core/sentry-core-common/pom.xml
@@ -62,6 +62,10 @@ limitations under the License.
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java
new file mode 100644
index 0000000..6b09dc2
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.exception;
+
+public class SentryHdfsServiceException extends RuntimeException {
+  private static final long serialVersionUID = 1511645864949767378L;
+
+  public SentryHdfsServiceException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SentryHdfsServiceException(String message) {
+    super(message);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
new file mode 100644
index 0000000..34a594e
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool
+ * model. Currently only one client connection is allowed.
+ * <p>
+ * For every rpc call, if the client is not connected, it will first connect to one of the
+ * sentry servers, and then do the thrift call to the connected sentry server, which will
+ * execute the requested method and return back the response. If it is failed with connection
+ * problem, it will close the current connection and retry (reconnect and resend the
+ * thrift call) no more than rpcRetryTotal times. If the client is already connected, it
+ * will reuse the existing connection, and do the thrift call.
+ * <p>
+ * During reconnection, invocatiaon handler will first cycle through all the configured sentry servers, and
+ * then retry the whole server list no more than connectionFullRetryTotal times. In this
+ * case, it won't introduce more latency when some server fails.
+ * <p>
+ */
+
+public class RetryClientInvocationHandler extends SentryClientInvocationHandler {
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(RetryClientInvocationHandler.class);
+  private SentryServiceClient client = null;
+  private final int maxRetryCount;
+
+  /**
+   * Initialize the sentry configurations, including rpc retry count and client connection
+   * configs for SentryPolicyServiceClientDefaultImpl
+   */
+  public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject,
+                                      SentryClientTransportConfigInterface transportConfig) {
+    Preconditions.checkNotNull(conf, "Configuration object cannot be null");
+    Preconditions.checkNotNull(clientObject, "Client Object cannot be null");
+    client = clientObject;
+    maxRetryCount = transportConfig.getSentryRpcRetryTotal(conf);
+  }
+
+  /**
+   * For every rpc call, if the client is not connected, it will first connect to a sentry
+   * server, and then do the thrift call to the connected sentry server, which will
+   * execute the requested method and return back the response. If it is failed with
+   * connection problem, it will close the current connection, and retry (reconnect and
+   * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException
+   * if failed retry after rpcRetryTotal times.
+   * if it is failed with other exception, method would just re-throw the exception.
+   * Synchronized it for thread safety.
+   */
+  @Override
+  public synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
+    int retryCount = 0;
+    Exception lastExc = null;
+
+    while (retryCount < maxRetryCount) {
+      // Connect to a sentry server if not connected yet.
+      try {
+        client.connect();
+      } catch (IOException e) {
+        // Increase the retry num
+        // Retry when the exception is caused by connection problem.
+        retryCount++;
+        lastExc = e;
+        close();
+        continue;
+      }
+
+      // do the thrift call
+      try {
+        return method.invoke(client, args);
+      } catch (InvocationTargetException e) {
+        // Get the target exception, check if SentryUserException or TTransportException is wrapped.
+        // TTransportException means there is a connection problem.
+        Throwable targetException = e.getCause();
+        if (targetException instanceof SentryUserException ||
+          targetException instanceof SentryHdfsServiceException) {
+          Throwable sentryTargetException = targetException.getCause();
+          // If there has connection problem, eg, invalid connection if the service restarted,
+          // sentryTargetException instanceof TTransportException will be true.
+          if (sentryTargetException instanceof TTransportException) {
+            // Retry when the exception is caused by connection problem.
+            lastExc = new TTransportException(sentryTargetException);
+            LOGGER.error("Thrift call failed with TTransportException", lastExc);
+            // Closing the thrift client on TTransportException. New client object is
+            // created using new socket when an attempt to reconnect is made.
+            close();
+          } else {
+            // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
+            // Do not need to reconnect to the sentry server.
+            if (targetException instanceof SentryUserException) {
+              throw (SentryUserException) targetException;
+            } else {
+              throw (SentryHdfsServiceException) targetException;
+            }
+          }
+        } else {
+          throw e;
+        }
+      }
+
+      // Increase the retry num
+      retryCount++;
+    }
+
+    // Throw the exception as reaching the max rpc retry num.
+    String error = String.format("Request failed, %d retries attempted ", maxRetryCount);
+    LOGGER.error(error, lastExc);
+    throw new SentryUserException(error, lastExc);
+  }
+
+  @Override
+  public synchronized void close() {
+    try {
+      LOGGER.debug("Releasing the current client connection");
+      client.disconnect();
+    } catch (Exception e) {
+      LOGGER.error("Encountered failure while closing the connection");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java
new file mode 100644
index 0000000..bf33fda
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+/**
+ * SentryClientInvocationHandler is the base interface for all the InvocationHandler in SENTRY
+ */
+public abstract class SentryClientInvocationHandler implements InvocationHandler {
+
+  /**
+   * Close the InvocationHandler: An InvocationHandler may create some contexts,
+   * these contexts should be close when the method "close()" of client be called.
+   */
+  @Override
+  public final Object invoke(Object proxy, Method method, Object[] args) throws Exception {
+    // close() doesn't throw exception we supress that in case of connection
+    // loss. Changing SentryPolicyServiceClient#close() to throw an
+    // exception would be a backward incompatible change for Sentry clients.
+    if ("close".equals(method.getName()) && null == args) {
+      close();
+      return null;
+    }
+    return invokeImpl(proxy, method, args);
+  }
+
+  /**
+   * Subclass should implement this method for special function
+   */
+  abstract public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception;
+
+  /**
+   * An abstract method "close", an invocationHandler should close its contexts at here.
+   */
+  public abstract void close();
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
index 6cea596..24192fd 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
@@ -40,6 +40,13 @@ interface SentryClientTransportConfigInterface {
 
   /**
    * @param conf configuration
+   * @return number of times should client re-create the transport and try to connect
+   * before finally giving up.
+   */
+  int getSentryRpcRetryTotal(Configuration conf);
+
+  /**
+   * @param conf configuration
    * @return True, if kerberos should be enabled.
    * False, Iff kerberos is enabled.
    * @throws MissingConfigurationException if property is mandatory and is missing in

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
index 83790d8..3520787 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
@@ -28,6 +28,7 @@ package org.apache.sentry.core.common.transport;
  * <code>SentryClientTransportConfigInterface</code>.
  */
 class SentryClientTransportConstants {
+
   /**
    * max retry num for client rpc
    * {link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])}

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
index 64750e7..74f790b 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
@@ -46,6 +46,11 @@ public final class SentryHDFSClientTransportConfig
   }
 
   @Override
+  public int getSentryRpcRetryTotal(Configuration conf) {
+    return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
+  }
+
+  @Override
   public boolean useUserGroupInformation(Configuration conf)
     throws MissingConfigurationException {
     return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
index 85ddd31..37fd0b3 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
@@ -46,6 +46,11 @@ public final class SentryPolicyClientTransportConfig
   }
 
   @Override
+  public int getSentryRpcRetryTotal(Configuration conf) {
+    return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
+  }
+
+  @Override
   public boolean useUserGroupInformation(Configuration conf)
     throws MissingConfigurationException {
     return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
new file mode 100644
index 0000000..9a10ca5
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+/**
+ * Client interface for Proxy Invocation handlers
+ * <p>
+ * Defines interface that Sentry client's should expose to the Invocation handlers like
+ * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry
+ * client instances .
+ * <p>
+ * All the sentry clients that need retrying and failover capabilities should implement
+ * this interface.
+ */
+public interface SentryServiceClient {
+  /**
+   * Connect to Sentry server.
+   * Either creates a new connection or reuses an existing one.
+   * @throws Exception on failure to acquire a transport towards server.
+   */
+  void connect() throws Exception;
+
+  /**
+   * Disconnect from the server. May close connection or return it to a
+   * pool for reuse.
+   */
+  void disconnect();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
new file mode 100644
index 0000000..9ddb400
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.core.common.exception.MissingConfigurationException;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.sentry.core.common.utils.ThriftUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Create Thrift transports suitable for talking to Sentry
+ */
+
+public class SentryTransportFactory {
+  protected final Configuration conf;
+  private String[] serverPrincipalParts;
+  protected TTransport thriftTransport;
+  private final int connectionTimeout;
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class);
+  // configs for connection retry
+  private final int connectionFullRetryTotal;
+  private final ArrayList<InetSocketAddress> endpoints;
+  private final SentryClientTransportConfigInterface transportConfig;
+  private static final ImmutableMap<String, String> SASL_PROPERTIES =
+    ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
+
+  /**
+   * This transport wraps the Sasl transports to set up the right UGI context for open().
+   */
+  public static class UgiSaslClientTransport extends TSaslClientTransport {
+    UserGroupInformation ugi = null;
+
+    public UgiSaslClientTransport(String mechanism, String protocol,
+                                  String serverName, TTransport transport,
+                                  boolean wrapUgi, Configuration conf)
+      throws IOException, SaslException {
+      super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
+        transport);
+      if (wrapUgi) {
+        // If we don't set the configuration, the UGI will be created based on
+        // what's on the classpath, which may lack the kerberos changes we require
+        UserGroupInformation.setConfiguration(conf);
+        ugi = UserGroupInformation.getLoginUser();
+      }
+    }
+
+    // open the SASL transport with using the current UserGroupInformation
+    // This is needed to get the current login context stored
+    @Override
+    public void open() throws TTransportException {
+      if (ugi == null) {
+        baseOpen();
+      } else {
+        try {
+          if (ugi.isFromKeytab()) {
+            ugi.checkTGTAndReloginFromKeytab();
+          }
+          ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws TTransportException {
+              baseOpen();
+              return null;
+            }
+          });
+        } catch (IOException e) {
+          throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
+        } catch (InterruptedException e) {
+          throw new TTransportException(
+            "Interrupted while opening underlying transport: " + e.getMessage(), e);
+        }
+      }
+    }
+
+    private void baseOpen() throws TTransportException {
+      super.open();
+    }
+  }
+
+  /**
+   * Initialize the object based on the sentry configuration provided.
+   * List of configured servers are reordered randomly preventing all
+   * clients connecting to the same server.
+   *
+   * @param conf            Sentry configuration
+   * @param transportConfig transport configuration to use
+   */
+  public SentryTransportFactory(Configuration conf,
+                                SentryClientTransportConfigInterface transportConfig) throws IOException {
+
+    this.conf = conf;
+    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+    serverPrincipalParts = null;
+    this.transportConfig = transportConfig;
+
+    try {
+      String hostsAndPortsStr;
+      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
+      this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
+
+      hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
+
+      int serverPort = transportConfig.getServerRpcPort(conf);
+
+      String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+      HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
+
+      this.endpoints = new ArrayList(hostsAndPortsStrArr.length);
+      for (HostAndPort endpoint : hostsAndPorts) {
+        this.endpoints.add(
+          new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
+        LOGGER.debug("Added server endpoint: " + endpoint.toString());
+      }
+
+      // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
+      // at the same time after a node failure.
+      Collections.shuffle(endpoints);
+    } catch (MissingConfigurationException e) {
+      throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Initialize object based on the parameters provided provided.
+   *
+   * @param addr            Host address which the client needs to connect
+   * @param port            Host Port which the client needs to connect
+   * @param conf            Sentry configuration
+   * @param transportConfig transport configuration to use
+   */
+  public SentryTransportFactory(String addr, int port, Configuration conf,
+                                SentryClientTransportConfigInterface transportConfig) throws IOException {
+    // copy the configuration because we may make modifications to it.
+    this.conf = new Configuration(conf);
+    serverPrincipalParts = null;
+    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+    this.transportConfig = transportConfig;
+
+    try {
+      this.endpoints = new ArrayList(1);
+      this.endpoints.add(NetUtils.createSocketAddr(addr, port));
+      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
+      this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
+    } catch (MissingConfigurationException e) {
+      throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e);
+    }
+  }
+
+
+  /**
+   * On connection error, Iterates through all the configured servers and tries to connect.
+   * On successful connection, control returns
+   * On connection failure, continues iterating through all the configured sentry servers,
+   * and then retries the whole server list no more than connectionFullRetryTotal times.
+   * In this case, it won't introduce more latency when some server fails.
+   * <p>
+   * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
+   */
+  public TTransport getTransport() throws IOException {
+    IOException currentException = null;
+    for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
+      try {
+        return connectToAvailableServer();
+      } catch (IOException e) {
+        currentException = e;
+        LOGGER.error(
+          String.format("Failed to connect to all the configured sentry servers, " +
+            "Retrying again"));
+      }
+    }
+    // Throws exception on reaching the connectionFullRetryTotal.
+    LOGGER.error(
+      String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
+      currentException);
+    throw currentException;
+  }
+
+  /**
+   * Iterates through all the configured servers and tries to connect.
+   * On connection error, tries to connect to next server.
+   * Control returns on successful connection OR it's done trying to all the
+   * configured servers.
+   *
+   * @throws IOException
+   */
+  private TTransport connectToAvailableServer() throws IOException {
+    IOException currentException = null;
+    for (InetSocketAddress addr : endpoints) {
+      try {
+        return connectToServer(addr);
+      } catch (IOException e) {
+        LOGGER.error(String.format("Failed connection to %s: %s",
+          addr.toString(), e.getMessage()), e);
+        currentException = e;
+      }
+    }
+    throw currentException;
+  }
+
+  /**
+   * Connect to the specified socket address and throw IOException if failed.
+   *
+   * @param serverAddress Address client needs to connect
+   * @throws Exception if there is failure in establishing the connection.
+   */
+  private TTransport connectToServer(InetSocketAddress serverAddress) throws IOException {
+    try {
+      thriftTransport = createTransport(serverAddress);
+      thriftTransport.open();
+    } catch (TTransportException e) {
+      throw new IOException("Failed to open transport: " + e.getMessage(), e);
+    } catch (MissingConfigurationException e) {
+      throw new RuntimeException(e.getMessage(), e);
+    }
+
+    LOGGER.debug("Successfully opened transport: " + thriftTransport + " to " + serverAddress);
+    return thriftTransport;
+  }
+
+  /**
+   * New socket is is created
+   *
+   * @param serverAddress
+   * @return
+   * @throws TTransportException
+   * @throws MissingConfigurationException
+   * @throws IOException
+   */
+  private TTransport createTransport(InetSocketAddress serverAddress)
+    throws TTransportException, MissingConfigurationException, IOException {
+    TTransport socket = new TSocket(serverAddress.getHostName(),
+      serverAddress.getPort(), connectionTimeout);
+
+    if (!transportConfig.isKerberosEnabled(conf)) {
+      return socket;
+    } else {
+      String serverPrincipal = transportConfig.getSentryPrincipal(conf);
+      serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+      LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
+      if (serverPrincipalParts == null) {
+        serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+        Preconditions.checkArgument(serverPrincipalParts.length == 3,
+          "Kerberos principal should have 3 parts: " + serverPrincipal);
+      }
+
+      boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
+      return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
+        serverPrincipalParts[0], serverPrincipalParts[1],
+        socket, wrapUgi, conf);
+    }
+  }
+
+  private boolean isConnected() {
+    return thriftTransport != null && thriftTransport.isOpen();
+  }
+
+  /**
+   * Method currently closes the transport
+   * TODO (Kalyan) Plan is to hold the transport and resuse it accross multiple client's
+   * That way, new connection need not be created for each new client
+   */
+  public void releaseTransport() {
+    close();
+  }
+
+  /**
+   * Method closes the transport
+   */
+  public void close() {
+    if (isConnected()) {
+      thriftTransport.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java
new file mode 100644
index 0000000..8f73d01
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.utils;
+
+public class PolicyStoreConstants {
+  public static final String SENTRY_GENERIC_POLICY_NOTIFICATION = "sentry.generic.policy.notification";
+  public static final String SENTRY_GENERIC_POLICY_STORE = "sentry.generic.policy.store";
+  public static final String SENTRY_GENERIC_POLICY_STORE_DEFAULT =
+      "org.apache.sentry.provider.db.generic.service.persistent.DelegateSentryStore";
+  public static class PolicyStoreServerConfig {
+    public static final String NOTIFICATION_HANDLERS = "sentry.policy.store.notification.handlers";
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
new file mode 100644
index 0000000..9e38a30
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.utils;
+
+import com.google.common.net.HostAndPort;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class ThriftUtil {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ThriftUtil.class);
+
+  public static void setImpersonator(final TProtocol in) {
+    try {
+      TTransport transport = in.getTransport();
+      if (transport instanceof TSaslServerTransport) {
+        String impersonator = ((TSaslServerTransport) transport).getSaslServer()
+            .getAuthorizationID();
+        setImpersonator(impersonator);
+      }
+    } catch (Exception e) {
+      // If there has exception when get impersonator info, log the error information.
+      LOGGER.warn("There is an error when get the impersonator:" + e.getMessage());
+    }
+  }
+
+  public static void setIpAddress(final TProtocol in) {
+    try {
+      TTransport transport = in.getTransport();
+      TSocket tSocket = getUnderlyingSocketFromTransport(transport);
+      if (tSocket != null) {
+        setIpAddress(tSocket.getSocket().getInetAddress().toString());
+      } else {
+        LOGGER.warn("Unknown Transport, cannot determine ipAddress");
+      }
+    } catch (Exception e) {
+      // If there has exception when get impersonator info, log the error information.
+      LOGGER.warn("There is an error when get the client's ip address:" + e.getMessage());
+    }
+  }
+
+  /**
+   * Returns the underlying TSocket from the transport, or null of the transport type is unknown.
+   */
+  private static TSocket getUnderlyingSocketFromTransport(TTransport transport) {
+    Preconditions.checkNotNull(transport);
+    if (transport instanceof TSaslServerTransport) {
+      return (TSocket) ((TSaslServerTransport) transport).getUnderlyingTransport();
+    } else if (transport instanceof TSaslClientTransport) {
+      return (TSocket) ((TSaslClientTransport) transport).getUnderlyingTransport();
+    } else if (transport instanceof TSocket) {
+      return (TSocket) transport;
+    }
+    return null;
+  }
+
+  private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+    @Override
+    protected synchronized String initialValue() {
+      return "";
+    }
+  };
+
+  public static void setIpAddress(String ipAddress) {
+    threadLocalIpAddress.set(ipAddress);
+  }
+
+  public static String getIpAddress() {
+    return threadLocalIpAddress.get();
+  }
+
+  private static ThreadLocal<String> threadLocalImpersonator = new ThreadLocal<String>() {
+    @Override
+    protected synchronized String initialValue() {
+      return "";
+    }
+  };
+
+  public static void setImpersonator(String impersonator) {
+    threadLocalImpersonator.set(impersonator);
+  }
+
+  public static String getImpersonator() {
+    return threadLocalImpersonator.get();
+  }
+
+  /**
+   * Utility function for parsing host and port strings. Expected form should be
+   * (host:port). The hostname could be in ipv6 style. If port is not specified,
+   * defaultPort will be used.
+   */
+  public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) {
+    HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length];
+    for (int i = 0; i < hostsAndPorts.length; i++) {
+     hostsAndPorts[i] =
+          HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort);
+    }
+    return hostsAndPorts;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 95285e4..0203ab3 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -77,12 +77,9 @@ public class ServiceConstants {
     public static final String PRINCIPAL = "sentry.hdfs.service.server.principal";
 
     public static final String SERVER_RPC_PORT = "sentry.hdfs.service.client.server.rpc-port";
-    public static final int SERVER_RPC_PORT_DEFAULT = 8038;
-
+  
     public static final String SERVER_RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address";
 
-    public static final String SERVER_RPC_CONN_TIMEOUT = "sentry.hdfs.service.client.server.rpc-connection-timeout";
-    public static final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000;
     public static final String USE_COMPACT_TRANSPORT = "sentry.hdfs.service.client.compact.transport";
     public static final boolean USE_COMPACT_TRANSPORT_DEFAULT = false;
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-dist/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-dist/pom.xml b/sentry-hdfs/sentry-hdfs-dist/pom.xml
index 8aa10f7..b614254 100644
--- a/sentry-hdfs/sentry-hdfs-dist/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-dist/pom.xml
@@ -49,6 +49,10 @@ limitations under the License.
       <groupId>org.apache.sentry</groupId>
       <artifactId>sentry-core-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-core-common</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index 956b855..de9507b 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -17,7 +17,7 @@
  */
 package org.apache.sentry.hdfs;
 
-import java.io.IOException;
+import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
 
 public interface SentryHDFSServiceClient {
   public static final String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
index 28b1224..798bbef 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,162 +18,80 @@
 package org.apache.sentry.hdfs;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
 import java.util.LinkedList;
-import java.util.Map;
 
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.Sasl;
-
-import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
+import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
+import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryServiceClient;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
 import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
 import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
 import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
 import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
 import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
-import org.apache.sentry.hdfs.ServiceConstants.ClientConfig;
-import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TMultiplexedProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSocket;
+
 import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+/**
+ * Sentry HDFS Service Client
+ * <p>
+ * The public implementation of SentryHDFSServiceClient.
+ * A Sentry Client in which all the operations are synchronized for thread safety
+ * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
+ * So it is important to close and re-open the transport so that new socket is used.
+ */
 
-public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient {
+public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient, SentryServiceClient {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
-
-  /**
-   * This transport wraps the Sasl transports to set up the right UGI context for open().
-   */
-  public static class UgiSaslClientTransport extends TSaslClientTransport {
-    protected UserGroupInformation ugi = null;
-
-    public UgiSaslClientTransport(String mechanism, String authorizationId,
-        String protocol, String serverName, Map<String, String> props,
-        CallbackHandler cbh, TTransport transport, boolean wrapUgi)
-        throws IOException {
-      super(mechanism, authorizationId, protocol, serverName, props, cbh,
-          transport);
-      if (wrapUgi) {
-        ugi = UserGroupInformation.getLoginUser();
-      }
-    }
-
-    // open the SASL transport with using the current UserGroupInformation
-    // This is needed to get the current login context stored
-    @Override
-    public void open() throws TTransportException {
-      if (ugi == null) {
-        baseOpen();
-      } else {
-        try {
-          // ensure that the ticket is valid before connecting to service. Note that
-          // checkTGTAndReloginFromKeytab() renew the ticket only when more than 80%
-          // of ticket lifetime has passed. 
-          if (ugi.isFromKeytab()) {
-            ugi.checkTGTAndReloginFromKeytab();
-          }
-
-          ugi.doAs(new PrivilegedExceptionAction<Void>() {
-            public Void run() throws TTransportException {
-              baseOpen();
-              return null;
-            }
-          });
-        } catch (IOException e) {
-          throw new TTransportException("Failed to open SASL transport", e);
-        } catch (InterruptedException e) {
-          throw new TTransportException(
-              "Interrupted while opening underlying transport", e);
-        }
-      }
-    }
-
-    private void baseOpen() throws TTransportException {
-      super.open();
-    }
-  }
-
-  private final Configuration conf;
-  private final InetSocketAddress serverAddress;
-  private final int connectionTimeout;
-  private boolean kerberos;
-  private TTransport transport;
-
-  private String[] serverPrincipalParts;
   private Client client;
-  private final SentryHDFSClientTransportConfig transportConfig = new SentryHDFSClientTransportConfig();
-  private static final ImmutableMap<String, String> SASL_PROPERTIES =
-    ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
+  private SentryTransportFactory transportFactory;
+  private TTransport transport;
+  private Configuration conf;
 
-  public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException {
+  public SentryHDFSServiceClientDefaultImpl(Configuration conf, SentryHDFSClientTransportConfig transportConfig) throws IOException {
+    transportFactory = new SentryTransportFactory(conf, transportConfig);
     this.conf = conf;
-    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    try {
-      this.serverAddress = NetUtils.createSocketAddr(
-        transportConfig.getSentryServerRpcAddress(conf),
-        transportConfig.getServerRpcPort(conf));
-      this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
-      kerberos = transportConfig.isKerberosEnabled(conf);
-      transport = new TSocket(serverAddress.getHostName(),
-        serverAddress.getPort(), connectionTimeout);
-      if (kerberos) {
-        String serverPrincipal = transportConfig.getSentryPrincipal(conf);
-        // Resolve server host in the same way as we are doing on server side
-        serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
-        LOGGER.info("Using server kerberos principal: " + serverPrincipal);
-
-        serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
-        Preconditions.checkArgument(serverPrincipalParts.length == 3,
-          "Kerberos principal should have 3 parts: " + serverPrincipal);
-        boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
-        transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(),
-          null, serverPrincipalParts[0], serverPrincipalParts[1],
-          SASL_PROPERTIES, null, transport, wrapUgi);
-      } else {
-        serverPrincipalParts = null;
-      }
+  }
 
-      transport.open();
-    } catch (TTransportException e) {
-      throw new IOException("Transport exception while opening transport: " + e.getMessage(), e);
-    } catch (MissingConfigurationException e) {
-      throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
+  /**
+   * Connect to the sentry server
+   *
+   * @throws IOException
+   */
+  @Override
+  public synchronized void connect() throws IOException {
+    if (transport != null && transport.isOpen()) {
+      return;
     }
-    LOGGER.info("Successfully opened transport: " + transport + " to " + serverAddress);
+
+    transport = transportFactory.getTransport();
     TProtocol tProtocol = null;
     long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
-        ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
-    if (conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT,
-        ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
+            ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+    if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT,
+            ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
       tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize);
     } else {
       tProtocol = new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true);
     }
     TMultiplexedProtocol protocol = new TMultiplexedProtocol(
-      tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
+            tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
+
     client = new SentryHDFSService.Client(protocol);
     LOGGER.info("Successfully created client");
   }
 
+  @Override
   public synchronized void notifyHMSUpdate(PathsUpdate update)
-      throws SentryHdfsServiceException {
+          throws SentryHdfsServiceException {
     try {
       client.handle_hms_notification(update.toThrift());
     } catch (Exception e) {
@@ -181,8 +99,9 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie
     }
   }
 
+  @Override
   public synchronized long getLastSeenHMSPathSeqNum()
-      throws SentryHdfsServiceException {
+          throws SentryHdfsServiceException {
     try {
       return client.check_hms_seq_num(-1);
     } catch (Exception e) {
@@ -190,8 +109,9 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie
     }
   }
 
+  @Override
   public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
- throws SentryHdfsServiceException {
+          throws SentryHdfsServiceException {
     SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>());
     try {
       TAuthzUpdateResponse sentryUpdates = client.get_all_authz_updates_from(permSeqNum, pathSeqNum);
@@ -211,9 +131,13 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie
     return retVal;
   }
 
-  public void close() {
-    if (transport != null) {
-      transport.close();
-    }
+  @Override
+  public synchronized void close() {
+    transportFactory.close();
+  }
+
+  @Override
+  public void disconnect() {
+    transportFactory.releaseTransport();
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
index cdf6195..e350103 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,15 +17,29 @@
  */
 package org.apache.sentry.hdfs;
 
+import java.lang.reflect.Proxy;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
 
 /**
  * Client factory to create normal client or proxy with HA invocation handler
  */
 public class SentryHDFSServiceClientFactory {
-  public static SentryHDFSServiceClient create(Configuration conf)
-      throws Exception {
-    return new SentryHDFSServiceClientDefaultImpl(conf);
+  private final static SentryHDFSClientTransportConfig transportConfig =
+          new SentryHDFSClientTransportConfig();
+
+  private SentryHDFSServiceClientFactory() {
+    // Make constructor private to avoid instantiation
   }
 
+  public static SentryHDFSServiceClient create(Configuration conf)
+    throws Exception {
+    return (SentryHDFSServiceClient) Proxy
+      .newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
+        SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
+        new RetryClientInvocationHandler(conf,
+          new SentryHDFSServiceClientDefaultImpl(conf, transportConfig), transportConfig));
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
index 4dc99a2..1ad9a02 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
 import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Iface;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
 import org.apache.sentry.service.thrift.ProcessorFactory;
 import org.apache.thrift.TException;
 import org.apache.thrift.TMultiplexedProcessor;

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
deleted file mode 100644
index 307d8c3..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.hdfs;
-
-public class SentryHdfsServiceException extends RuntimeException {
-  private static final long serialVersionUID = 1511645864949767378L;
-
-  public SentryHdfsServiceException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public SentryHdfsServiceException(String message) {
-    super(message);
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java
index 03faed7..97c0e1d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java
@@ -46,8 +46,8 @@ import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory;
 import org.apache.sentry.provider.db.log.util.Constants;
 import org.apache.sentry.provider.db.service.model.MSentryGMPrivilege;
 import org.apache.sentry.provider.db.service.model.MSentryRole;
-import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants;
 import org.apache.sentry.provider.db.service.thrift.SentryConfigurationException;
+import org.apache.sentry.core.common.utils.PolicyStoreConstants;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor;
 import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
 import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants;

http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java
index d320d0f..a0fc2cc 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java
@@ -18,7 +18,7 @@
 
 package org.apache.sentry.provider.db.generic.service.thrift;
 
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocol;
 


Mime
View raw message