kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [1/3] kudu git commit: KUDU-2264. java: automatically attempt to re-acquire Kerberos credentials
Date Fri, 09 Mar 2018 19:49:49 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 5d016e1a7 -> 0ba702c20


KUDU-2264. java: automatically attempt to re-acquire Kerberos credentials

This fixes the Java client to notice when the Kerberos credentials it
has are about to expire, and initiates a re-login when necessary.

It also adds a long javadoc for AsyncKuduClient indicating the proper
way to authenticate to a secured Kudu cluster for various scenarios,
since this is a common question we've gotten.

Change-Id: I514253e0a7f067dbc8ffe4eaf5a7a2c32900b539
Reviewed-on: http://gerrit.cloudera.org:8080/9050
Reviewed-by: Hao Hao <hao.hao@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/55e611d3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/55e611d3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/55e611d3

Branch: refs/heads/master
Commit: 55e611d3006084db0d32d33fc38ca55fbf206b9a
Parents: 5d016e1
Author: Todd Lipcon <todd@apache.org>
Authored: Tue Jan 16 23:15:57 2018 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Mar 8 23:59:05 2018 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 196 +++++++++++--
 .../kudu/client/AuthnTokenReacquirer.java       |  14 +-
 .../org/apache/kudu/client/KuduException.java   |   4 +
 .../java/org/apache/kudu/client/KuduRpc.java    |   8 +
 .../java/org/apache/kudu/client/Negotiator.java |  25 +-
 .../org/apache/kudu/client/SecurityContext.java | 172 ++++++++++-
 .../java/org/apache/kudu/util/SecurityUtil.java | 112 +++++++-
 .../org/apache/kudu/client/BaseKuduTest.java    |   2 +-
 .../org/apache/kudu/client/MiniKuduCluster.java |  39 ++-
 .../org/apache/kudu/client/TestNegotiator.java  |   5 +-
 .../org/apache/kudu/client/TestSecurity.java    | 284 +++++++++++++++++--
 src/kudu/tools/kudu-tool-test.cc                |  12 +
 src/kudu/tools/tool.proto                       |  15 +
 src/kudu/tools/tool_action_test.cc              |  10 +
 14 files changed, 796 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index de36e3d..0c7e800 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -47,7 +47,6 @@ import java.util.concurrent.Semaphore;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
-import javax.security.auth.Subject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -78,41 +77,177 @@ import org.apache.kudu.master.Master.TableIdentifierPB;
 import org.apache.kudu.util.AsyncUtil;
 import org.apache.kudu.util.NetUtil;
 import org.apache.kudu.util.Pair;
-import org.apache.kudu.util.SecurityUtil;
 
 /**
  * A fully asynchronous and thread-safe client for Kudu.
  * <p>
- * This client should be
- * instantiated only once. You can use it with any number of tables at the
- * same time. The only case where you should have multiple instances is when
- * you want to use multiple different clusters at the same time.
+ * A single Kudu client instance corresponds to a single remote Kudu cluster.
+ * and can be used to read or write any number of tables within that cluster.
+ * Separate client instances should be used in the case that a single
+ * application needs to access multiple distinct Kudu clusters.
+ *
+ * <h1>Creating a client instance</h1> An {@link AsyncKuduClient} instance may
+ * be created using the {@link AsyncKuduClient.AsyncKuduClientBuilder} class. If
+ * a synchronous API is preferred, {@link KuduClient.KuduClientBuilder} may be
+ * used instead. See the documentation on these classes for more details on
+ * client configuration options.
+ *
+ * <h1>Authenticating to a secure cluster</h1> A Kudu cluster may be configured
+ * such that it requires clients to connect using strong authentication. Clients
+ * can authenticate to such clusters using either of two methods:
+ * <ol>
+ * <li><em>Kerberos credentials</em></li>
+ * <li><em>Authentication tokens</em></li>
+ * </ol>
+ *
+ * In a typical environment, Kerberos credentials are used for non-distributed
+ * client applications and for applications which <em>spawn</em> distributed
+ * jobs. Tokens are used for the <em>tasks</em> of distributed jobs, since those
+ * tasks do not have access to the user's Kerberos credentials.
+ *
+ * <h2>Authenticating using Kerberos credentials</h2>
+ *
+ * In order to integrate with Kerberos, Kudu uses the standard <em>Java
+ * Authentication and Authorization Service</em> (JAAS) API provided by the JDK.
+ * JAAS provides a common way for applications to initialize Kerberos
+ * credentials, store these credentials in a {@link javax.security.auth.Subject}
+ * instance, and associate the Subject the current thread of execution. The Kudu
+ * client then accesses the Kerberos credentials in the
+ * {@link javax.security.auth.Subject} and uses them to authenticate to the
+ * remote cluster as necessary.
+ * <p>
+ * Kerberos credentials are typically obtained in one of two ways:
+ * <ol>
+ * <li>The <em>Kerberos ticket cache</em></li>
+ * <li>A <em>keytab</em> file</li>
+ * </ol>
+ *
+ * <h3>Authenticating from the Kerberos ticket cache</h3>
+ *
+ * The Kerberos <em>ticket cache</em> is a file stored on the local file system
+ * which is automatically initialized when a user runs <em>kinit</em> at the
+ * command line. This is the predominant method for authenticating users in
+ * interactive applications: the user is expected to have run <em>kinit</em>
+ * recently, and the application will find the appropriate credentials in the
+ * ticket cache.
+ * <p>
+ * In the case of the Kudu client, Kudu will automatically look for credentials
+ * in the standard system-configured ticket cache location. No additional code
+ * needs to be written to enable this behavior.
+ * <p>
+ * Kudu will automatically detect if the ticket it has obtained from the ticket
+ * cache is about to expire. When that is the case, it will attempt to re-read
+ * the ticket cache to obtain a new ticket with a later expiration time. So, if
+ * an application needs to run for longer than the lifetime of a single ticket,
+ * the user must ensure that the ticket cache is periodically refreshed, for
+ * example by re-running 'kinit' once each day.
+ *
+ * <h3>Authenticating from a keytab</h3>
+ *
+ * Long-running applications typically obtain Kerberos credentials from a
+ * Kerberos <em>keytab</em> file. A keytab is essentially a saved password, and
+ * allows the application to obtain new Kerberos tickets whenever the prior
+ * ticket is about to expire.
+ * <p>
+ * The Kudu client does not provide any utility code to facilitate logging in
+ * from a keytab. Instead, applications should invoke the JAAS APIs directly,
+ * and then ensure that the resulting {@link javax.security.auth.Subject}
+ * instance is associated with the current thread's
+ * {@link java.security.AccessControlContext} when instantiating the Kudu client
+ * instance for the first time. The {@link javax.security.auth.Subject} instance
+ * will be stored and used whenever Kerberos authentication is required.
+ * <p>
+ * <b>Note</b>: if the Kudu client is instantiated with a
+ * {@link javax.security.auth.Subject} as described above, it will <em>not</em>
+ * make any attempt to re-login from the keytab. Instead, the application should
+ * arrange to periodically re-initiate the login process and update the
+ * credentials stored in the same Subject instance as was provided when the
+ * client was instantiated.
+ * <p>
+ * In the context of the Hadoop ecosystem, the {@code UserGroupInformation}
+ * class provides utility methods to login from a keytab and then run code as
+ * the resulting {@link javax.security.auth.Subject}: <code><pre>
+ *   UserGroupInformation.loginUserFromKeytab("my-app", "/path/to/app.keytab");
+ *   KuduClient c = UserGroupInformation.getLoginUser().doAs(
+ *     new PrivilegedExceptionAction<KuduClient>() {
+ *       &#64;Override
+ *       public KuduClient run() throws Exception {
+ *         return myClientBuilder.build();
+ *       }
+ *     }
+ *   );
+ * </pre></code> The {@code UserGroupInformation} class will also automatically
+ * start a thread to periodically re-login from the keytab.
+ *
+ * <h3>Debugging Kudu's usage of Kerberos credentials</h3>
+ *
+ * The Kudu client emits DEBUG-level logs under the
+ * {@code org.apache.kudu.client.SecurityContext} slf4j category. Enabling DEBUG
+ * logging for this class may help you understand which credentials are being
+ * obtained by the Kudu client when it is instantiated. Additionally, if the
+ * Java system property {@code kudu.jaas.debug} is set to {@code true}, Kudu
+ * will enable the {@code debug} option when configuring {@code Krb5LoginModule}
+ * when it attempts to log in from a ticket cache. JDK-specific system properties
+ * such as {@code sun.security.krb5.debug} may also be useful in troubleshooting
+ * Kerberos authentication failures.
+ *
+ * <h2>Authenticating using tokens</h2>
+ *
+ * In the case of distributed applications, the worker tasks often do not have
+ * access to Kerberos credentials such as ticket caches or keytabs.
+ * Additionally, there may be hundreds or thousands of workers with relatively
+ * short life-times, and if each task attempted to authenticate using Kerberos,
+ * the amount of load on the Kerberos infrastructure could be substantial enough
+ * to cause instability. To solve this issue, Kudu provides support for
+ * <em>authentication tokens</em>.
  * <p>
- * If you play by the rules, this client is completely
- * thread-safe. Read the documentation carefully to know what the requirements
- * are for this guarantee to apply.
+ * An authentication token is a time-limited credential which can be obtained by
+ * an application which has already authenticated via Kerberos. The token is
+ * represented by an opaque byte string, and it can be passed from one client to
+ * another to transfer credentials.
  * <p>
+ * A token may be generated using the
+ * {@link AsyncKuduClient#exportAuthenticationCredentials()} API, and then
+ * imported to another client using
+ * {@link AsyncKuduClient#importAuthenticationCredentials(byte[])}.
+ *
+ * <h2>Authentication in Spark jobs</h2>
+ *
+ * Note that the Spark integration provided by the <em>kudu-spark</em> package
+ * automatically handles the interaction with Kerberos and the passing of tokens
+ * from the Spark driver to tasks. Refer to the Kudu documentation for details
+ * on how to submit a Spark job on a secure cluster.
+ *
+ * <h1>API Compatibility</h1>
+ *
+ * Note that some methods in the Kudu client implementation are public but
+ * annotated with the {@link InterfaceAudience.Private} annotation. This
+ * annotation indicates that, despite having {@code public} visibility, the
+ * method is not part of the public API and there is no guarantee that its
+ * existence or behavior will be maintained in subsequent versions of the Kudu
+ * client library.
+ *
+ * Other APIs are annotated with the {@link InterfaceStability.Unstable} annotation.
+ * These APIs are meant for public consumption but may change between minor releases.
+ * Note that the asynchronous client is currently considered unstable.
+ *
+ * <h1>Thread Safety</h1>
+ *
+ * The Kudu client instance itself is thread-safe; however, not all associated
+ * classes are themselves thread-safe. For example, neither
+ * {@link AsyncKuduSession} nor its synchronous wrapper {@link KuduSession} is
+ * thread-safe. Refer to the documentation for each individual class for more
+ * details.
+ *
+ * <h1>Asynchronous usage</h1>
+ *
  * This client is fully non-blocking, any blocking operation will return a
  * {@link Deferred} instance to which you can attach a {@link Callback} chain
  * that will execute when the asynchronous operation completes.
- *
- * <h1>Note regarding {@code KuduRpc} instances passed to this class</h1>
- * Every {@link KuduRpc} passed to a method of this class should not be
- * changed or re-used until the {@code Deferred} returned by that method
- * calls you back.  <strong>Changing or re-using any {@link KuduRpc} for
- * an RPC in flight will lead to <em>unpredictable</em> results and voids
- * your warranty</strong>.
- *
- * <h1>{@code throws} clauses</h1>
- * None of the asynchronous methods in this API are expected to throw an
- * exception.  But the {@link Deferred} object they return to you can carry an
- * exception that you should handle (using "errbacks", see the javadoc of
- * {@link Deferred}).  In order to be able to do proper asynchronous error
- * handling, you need to know what types of exceptions you're expected to face
- * in your errbacks.  In order to document that, the methods of this API use
- * javadoc's {@code @throws} to spell out the exception types you should
- * handle in your errback.  Asynchronous exceptions will be indicated as such
- * in the javadoc with "(deferred)".
+ * <p>
+ * The asynchronous calls themselves typically do not throw exceptions. Instead,
+ * an {@code errback} should be attached which will be called with the Exception
+ * that occurred.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -193,7 +328,8 @@ public class AsyncKuduClient implements AutoCloseable {
 
   private final RequestTracker requestTracker;
 
-  private final SecurityContext securityContext;
+  @VisibleForTesting
+  final SecurityContext securityContext;
 
   /** A helper to facilitate re-acquiring of authentication token if current one expires. */
   private final AuthnTokenReacquirer tokenReacquirer;
@@ -213,7 +349,7 @@ public class AsyncKuduClient implements AutoCloseable {
     this.timer = b.timer;
     this.requestTracker = new RequestTracker(UUID.randomUUID().toString().replace("-", ""));
 
-    this.securityContext = new SecurityContext(b.subject);
+    this.securityContext = new SecurityContext();
     this.connectionCache = new ConnectionCache(
         securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
     this.tokenReacquirer = new AuthnTokenReacquirer(this);
@@ -1986,7 +2122,6 @@ public class AsyncKuduClient implements AutoCloseable {
     private int bossCount = DEFAULT_BOSS_COUNT;
     private int workerCount = DEFAULT_WORKER_COUNT;
     private boolean statisticsDisabled = false;
-    private Subject subject;
 
     /**
      * Creates a new builder for a client that will connect to the specified masters.
@@ -2143,7 +2278,6 @@ public class AsyncKuduClient implements AutoCloseable {
      * @return a new asynchronous Kudu client
      */
     public AsyncKuduClient build() {
-      subject = SecurityUtil.getSubjectOrLogin();
       return new AsyncKuduClient(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
index 8f4811b..515878e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.concurrent.GuardedBy;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.stumbleupon.async.Callback;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -156,16 +155,19 @@ final class AuthnTokenReacquirer {
           return null;
         }
 
-        failQueuedRpcs();
+        Exception reason = new NonRecoverableException(Status.NotAuthorized(String.format(
+            "cannot re-acquire authentication token after %d attempts (%s)",
+            MAX_ATTEMPTS,
+            e.getMessage())));
+        failQueuedRpcs(reason);
         return null;
       }
 
-      /** Handle the affected RPCs if authn token re-acquisition fails. */
-      void failQueuedRpcs() {
+      /** Handle the affected RPCs if authn token re-acquisition fails.
+       */
+      void failQueuedRpcs(Exception reason) {
         List<KuduRpc<?>> rpcList = swapQueuedRpcs();
         for (KuduRpc<?> rpc : rpcList) {
-          Exception reason = new NonRecoverableException(Status.NotAuthorized(String.format(
-              "cannot re-acquire authentication token after %d attempts", MAX_ATTEMPTS)));
           rpc.errback(reason);
         }
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
index 0f1ea97..3aa566a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
@@ -27,6 +27,7 @@
 package org.apache.kudu.client;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import com.stumbleupon.async.DeferredGroupException;
 import com.stumbleupon.async.TimeoutException;
@@ -84,6 +85,9 @@ public abstract class KuduException extends IOException {
     // The message may be null.
     String message =  e.getMessage() == null ? "" : e.getMessage();
     if (e instanceof KuduException) {
+      // TODO(KUDU-2330) this can lead to methods of the synchronous client
+      // throwing exceptions without the stack trace indicating where
+      // the method was called!
       return (KuduException) e;
     } else if (e instanceof DeferredGroupException) {
       // The cause of a DeferredGroupException is the first exception it sees, we're just going to

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 0de894f..21a58aa 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -64,6 +64,14 @@ import org.apache.kudu.util.Slice;
  * familiar with the term "defensive copy", we don't do it in order to avoid
  * unnecessary memory copies when you know you won't be changing (or event
  * holding a reference to) the byte array, which is frequently the case.
+ *
+ *
+ * <h1>Note regarding {@code KuduRpc} instances passed into {@link AsyncKuduSession}</h1>
+ * Every {@link KuduRpc} passed to a method of AsyncKuduSession should not be
+ * changed or re-used until the {@code Deferred} returned by that method
+ * calls you back.  <strong>Changing or re-using any {@link KuduRpc} for
+ * an RPC in flight will lead to <em>unpredictable</em> results and voids
+ * your warranty</strong>.
  */
 @InterfaceAudience.Private
 public abstract class KuduRpc<R> {

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index fd609d3..95ad907 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -158,7 +158,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   private static enum AuthnTokenNotUsedReason {
     NONE_AVAILABLE("no token is available"),
     NO_TRUSTED_CERTS("no TLS certificates are trusted by the client"),
-    FORBIDDEN_BY_POLICY("this connection does not allow authentication by tokens"),
+    FORBIDDEN_BY_POLICY("this connection will be used to acquire a new token and " +
+                        "therefore requires primary credentials"),
     NOT_CHOSEN_BY_SERVER("the server chose not to accept token authentication");
 
     AuthnTokenNotUsedReason(String msg) {
@@ -393,6 +394,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   }
 
   private void chooseAndInitializeSaslMech(NegotiatePB response) throws KuduException {
+    securityContext.refreshSubject();
     // Gather the set of server-supported mechanisms.
     Map<String, String> errorsByMech = Maps.newHashMap();
     Set<SaslMechanism> serverMechs = Sets.newHashSet();
@@ -416,11 +418,17 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     // choose that mech.
     for (SaslMechanism clientMech : SaslMechanism.values()) {
 
-      if (clientMech.equals(SaslMechanism.GSSAPI) &&
-          (securityContext.getSubject() == null ||
-           securityContext.getSubject().getPrivateCredentials(KerberosTicket.class).isEmpty())) {
-        errorsByMech.put(clientMech.name(), "client does not have Kerberos credentials (tgt)");
-        continue;
+      if (clientMech.equals(SaslMechanism.GSSAPI)) {
+        Subject s = securityContext.getSubject();
+        if (s == null ||
+            s.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+          errorsByMech.put(clientMech.name(), "client does not have Kerberos credentials (tgt)");
+          continue;
+        }
+        if (SecurityUtil.isTgtExpired(s)) {
+          errorsByMech.put(clientMech.name(), "client Kerberos credentials (TGT) have expired");
+          continue;
+        }
       }
 
       if (!serverMechs.contains(clientMech)) {
@@ -754,6 +762,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     chan.getPipeline().remove(this);
 
     Channels.write(chan, makeConnectionContext());
+    LOG.debug("Authenticated connection {} using {}/{}",
+        chan, chosenAuthnType, chosenMech);
     Channels.fireMessageReceived(chan, new Success(serverFeatures));
   }
 
@@ -808,7 +818,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
           ((GSSException) cause).getMajor() == GSSException.NO_CRED) {
         throw new NonRecoverableException(
             Status.ConfigurationError(
-                "Server requires Kerberos, but this client is not authenticated (kinit)"),
+                "Server requires Kerberos, but this client is not authenticated " +
+                "(missing or expired TGT)"),
             saslException);
       }
       throw saslException;

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
index e9bf0c7..613491b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
@@ -17,13 +17,20 @@
 
 package org.apache.kudu.client;
 
+import java.security.AccessControlContext;
+import java.security.AccessController;
 import java.security.KeyStore;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 import java.security.cert.CertificateException;
 import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.net.ssl.SSLContext;
@@ -32,6 +39,7 @@ import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509TrustManager;
 import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
@@ -43,6 +51,10 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.kudu.client.Client.AuthenticationCredentialsPB;
 import org.apache.kudu.security.Token.SignedTokenPB;
 import org.apache.kudu.security.Token.TokenPB;
+import org.apache.kudu.util.Pair;
+import org.apache.kudu.util.SecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class associated with a single AsyncKuduClient which stores security-related
@@ -51,6 +63,10 @@ import org.apache.kudu.security.Token.TokenPB;
  * Each client has a single instance of this class. This class is threadsafe.
  */
 class SecurityContext {
+  private static final Logger LOG = LoggerFactory.getLogger(SecurityContext.class);
+
+  private static final long REFRESH_RATE_LIMIT_SECS = 10;
+
   @GuardedBy("this")
   @Nullable
   private SignedTokenPB authnToken;
@@ -67,11 +83,34 @@ class SecurityContext {
    */
   private final SSLContext sslContextTrustAny;
 
+  private final Object subjectLock = new Object();
+
   /**
    * The JAAS Subject that the client's credentials are stored in.
    */
   @Nullable
-  private final Subject subject;
+  @GuardedBy("subjectLock")
+  private Subject subject;
+
+  private enum SubjectType {
+    /**
+     * The Subject was created when this class was instantiated.
+     */
+    CREATED,
+    /**
+     * A Subject with appropriate credentials was provided by the caller who
+     * instantiated this class.
+     */
+    PROVIDED,
+    /**
+     * We have no Subject at all (i.e we could not login on our own, and the
+     * caller did not provide a Subject with appropriate credentials).
+     */
+    NONE
+  };
+
+  @Nonnull
+  private final SubjectType subjectType;
 
   /**
    * The currently trusted CA certs, in DER format.
@@ -79,14 +118,17 @@ class SecurityContext {
   @VisibleForTesting
   List<ByteString> trustedCertDers = Collections.emptyList();
 
-  /**
-   * Construct SecurityContext object with the specified JAAS subject.
-   *
-   * @param subject JAAS Subject that the client's credentials are stored in
-   */
-  SecurityContext(Subject subject) {
+  @GuardedBy("subjectLock")
+  private long nextAllowedRefreshNanotime = 0;
+
+  @GuardedBy("subjectLock")
+  private boolean loggedRefreshFailure = false;
+
+  SecurityContext() {
     try {
-      this.subject = subject;
+      Pair<SubjectType, Subject> p = setupSubject();
+      this.subjectType = p.getFirst();
+      this.subject = p.getSecond();
 
       this.sslContextWithCert = SSLContext.getInstance("TLS");
       sslContextWithCert.init(null, new TrustManager[] { trustManager }, null);
@@ -98,9 +140,121 @@ class SecurityContext {
     }
   }
 
+  private static Pair<SubjectType, Subject> setupSubject() {
+    AccessControlContext context = AccessController.getContext();
+    Subject subject = Subject.getSubject(context);
+    if (subject != null) {
+      if (!subject.getPrincipals(KerberosPrincipal.class).isEmpty()) {
+        LOG.debug("Using caller-provided subject with Kerberos principal {}. " +
+            "Caller is responsible for refreshing credentials.",
+            SecurityUtil.getKerberosPrincipalOrNull(subject));
+        return new Pair<>(SubjectType.PROVIDED, subject);
+      }
+      LOG.debug("Caller-provided subject {} does not have any Kerberos credentials. " +
+          "Ignoring it.", subject.toString());
+    }
+
+    subject = SecurityUtil.getSubjectFromTicketCacheOrNull();
+    if (subject != null) {
+      return new Pair<>(SubjectType.CREATED, subject);
+    }
+    // If we weren't able to login from a ticket cache when we create the client,
+    // we shouldn't later pick one up.
+    return new Pair<>(SubjectType.NONE, null);
+  }
+
+  /**
+   * Check if the Subject associated with this SecurityContext needs to be refreshed,
+   * and if so, do so. If there is no associated subject this is a no-op.
+   */
+  public void refreshSubject() {
+    if (subjectType == SubjectType.NONE) {
+      return;
+    }
+    synchronized (subjectLock) {
+      Subject localSubject = subject;
+
+      boolean needed = SecurityUtil.needsRefresh(localSubject);
+      if (!needed) {
+        // If we don't need to refresh, but we previously logged a warning
+        // about a failure to refresh, then someone must have externally
+        // refreshed the Subject.
+        if (loggedRefreshFailure) {
+          LOG.info("Credentials appear to have been refreshed externally, subject={}", subject);
+          loggedRefreshFailure = false;
+        }
+        return;
+      }
+
+      // Our current credentials are stale and need a refresh.
+
+      if (subjectType == SubjectType.PROVIDED) {
+        // In the case that the user provided the subject, we don't attempt to
+        // muck with the tickets inside it. Instead, just log a warning
+        // if we haven't already.
+        if (!loggedRefreshFailure) {
+          LOG.warn("Caller-provided Subject has a Kerberos ticket that is about to expire. " +
+                   "Kudu expects the application to renew or re-acquire its own tickets " +
+                   "before expiration.");
+          loggedRefreshFailure = true;
+        }
+        return;
+      }
+
+      // Don't attempt to refresh if we recently attempted to and failed. This
+      // prevents flooding the KDC, etc.
+      long now = System.nanoTime();
+      // If we recently failed to refresh, don't retry.
+      if (now < nextAllowedRefreshNanotime) {
+        return;
+      }
+
+      LOG.debug("Refreshing Kerberos credentials...");
+      Subject newSubject;
+      try {
+        newSubject = Subject.doAs(new Subject(), new PrivilegedExceptionAction<Subject>() {
+          @Override
+          public Subject run() {
+            return SecurityUtil.getSubjectFromTicketCacheOrNull();
+          }
+        });
+      } catch (PrivilegedActionException e) {
+        throw new RuntimeException(e.getCause());
+      }
+      if (newSubject == null || SecurityUtil.getKerberosPrincipalOrNull(newSubject) == null) {
+        LOG.warn("Tried to refresh Kerberos credentials but was unable to re-login from ticket cache");
+        loggedRefreshFailure = true;
+        nextAllowedRefreshNanotime = now + TimeUnit.SECONDS.toNanos(REFRESH_RATE_LIMIT_SECS);
+        return;
+      }
+      // It's possible that the ticket cache ended up with a different principal.
+      // If we accepted this new subject, that would cause us to switch principals
+      // in the context of a single Kudu client, or potentially have a different
+      // principal in use on different connections (eg one principal talking to one
+      // master and another principal to another). This would be very hard to diagnose
+      // so let's just refuse the re-login attempt if the principal switched.
+      KerberosPrincipal oldPrincipal = SecurityUtil.getKerberosPrincipalOrNull(localSubject);
+      KerberosPrincipal principal = SecurityUtil.getKerberosPrincipalOrNull(newSubject);
+      if (!oldPrincipal.equals(principal)) {
+        LOG.error("Attempted to refresh Kerberos credentials from ticket cache but found that " +
+            "the new Kerberos principal {} did not match the original principal {}. Ignoring.",
+            principal, oldPrincipal);
+        loggedRefreshFailure = true;
+        nextAllowedRefreshNanotime = now + TimeUnit.SECONDS.toNanos(REFRESH_RATE_LIMIT_SECS);
+        return;
+      }
+
+      loggedRefreshFailure = false;
+      this.subject = newSubject;
+      LOG.info("Successfully refreshed Kerberos credentials from ticket cache");
+    }
+  }
+
   @Nullable
   public Subject getSubject() {
-    return subject;
+    synchronized (subjectLock) {
+      return subject;
+    }
   }
 
   @Nullable

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
index 50af6ab..35b61bf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
@@ -17,15 +17,17 @@
 
 package org.apache.kudu.util;
 
-import java.security.AccessControlContext;
-import java.security.AccessController;
 import java.security.MessageDigest;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
 import javax.security.auth.Subject;
 import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginContext;
@@ -65,20 +67,19 @@ public abstract class SecurityUtil {
       .build();
 
   /**
+   * If we have Kerberos credentials that are within this specified window
+   * of expiration, then refresh them.
+   */
+  private static final long REFRESH_BEFORE_EXPIRATION_SECS = 10;
+
+  /**
    * Return the Subject associated with the current thread's AccessController,
    * if that subject has Kerberos credentials. If there is no such subject, or
-   * the subject has no Kerberos credentials, logins in a new subject from the
-   * currently configured TicketCache.
+   * the subject has no Kerberos credentials, a new subject is logged in from
+   * the currently configured TicketCache.
    */
-  public static Subject getSubjectOrLogin() {
-    AccessControlContext context = AccessController.getContext();
-    Subject subject = Subject.getSubject(context);
-    if (subject != null &&
-        !subject.getPrincipals(KerberosPrincipal.class).isEmpty()) {
-      LOG.debug("Using existing subject with Kerberos credentials: {}",
-          subject.toString());
-      return subject;
-    }
+  @Nullable
+  public static Subject getSubjectFromTicketCacheOrNull() {
     // If there isn't any current subject with krb5 principals, try to login
     // using the ticket cache.
     Configuration conf = new Configuration() {
@@ -115,7 +116,7 @@ public abstract class SecurityUtil {
     try {
       LoginContext loginContext = new LoginContext("kudu", new Subject(), null, conf);
       loginContext.login();
-      subject = loginContext.getSubject();
+      Subject subject = loginContext.getSubject();
       LOG.debug("Logged in as subject: {}", Joiner.on(",").join(subject.getPrincipals()));
       return subject;
     } catch (LoginException e) {
@@ -157,4 +158,85 @@ public abstract class SecurityUtil {
       throw Throwables.propagate(e);
     }
   }
-}
+
+  /**
+   * @return true if 'subject' contains a Kerberos TGT that is about to expire, or
+   * if it contains no TGT at all.
+   */
+  public static boolean needsRefresh(Subject subject) {
+    long deadline = System.currentTimeMillis() + REFRESH_BEFORE_EXPIRATION_SECS * 1000;
+    return tgtExpiresBefore(subject, deadline);
+  }
+
+  /**
+   * @return true if 'subject' contains a Kerberos TGT that is expired, or if it contains
+   * no TGT at all.
+   */
+  public static boolean isTgtExpired(Subject subject) {
+    return tgtExpiresBefore(subject, System.currentTimeMillis());
+  }
+
+  private static boolean tgtExpiresBefore(Subject subject, long deadlineMillis) {
+    KerberosTicket tgt = findTgt(subject);
+    if (tgt != null) {
+      return tgt.getEndTime().getTime() < deadlineMillis;
+    }
+    // We didn't find any TGT. This likely means that it expired and got
+    // removed during a connection attempt. So, we need to get a new one.
+    return true;
+  }
+
+  private static KerberosTicket findTgt(Subject subject) {
+    Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
+    // tickets is a Collections.synchronizedSet() wrapper, so we need to synchronize
+    // on it to iterate it.
+    synchronized (tickets) {
+      for (KerberosTicket ticket : tickets) {
+        if (SecurityUtil.isTGSPrincipal(ticket.getServer())) {
+          return ticket;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * @return true if 'principal' matches the expected pattern for a TGT
+   */
+  private static boolean isTGSPrincipal(KerberosPrincipal principal) {
+    // When a principal foo@BAR authenticates to realm BAR, it will get a service
+    // ticket with the service principal 'krbtgt/BAR@BAR'. Note that this is the
+    // case even when the credentials will be used to authenticate to a remote
+    // realm using cross-realm trust.
+    //
+    // For example, if the user alice@AD.CORP is connecting to a Kudu service
+    // kudu/host@CLUSTER.LOCAL, the ticket cache will contain the following
+    // tickets:
+    //
+    //   krbtgt/AD.CORP@AD.CORP
+    //   krbtgt/CLUSTER.LOCAL@AD.CORP   (cross-realm trust ticket)
+    //   kudu/host@CLUSTER.LOCAL        (service in remote realm)
+    //
+    // Here we are simply trying to identify the first of those tickets.
+    return principal != null && principal.getName().equals(
+        "krbtgt/" + principal.getRealm() + "@" + principal.getRealm());
+  }
+
+  /**
+   * @return the KerberosPrincipal object associated with the given Subject.
+   * If there is no Principal, returns null. If there is more than one principal
+   * (not expected), logs a warning and also returns null.
+   */
+  public static KerberosPrincipal getKerberosPrincipalOrNull(Subject newSubject) {
+    Set<KerberosPrincipal> principals = newSubject.getPrincipals(KerberosPrincipal.class);
+    if (principals.size() > 1) {
+      LOG.warn("JAAS Subject unexpectedly includes more than one principal: {}",
+          Joiner.on(", ").join(principals));
+      return null;
+    } else if (principals.isEmpty()) {
+      return null;
+    }
+
+    return principals.iterator().next();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 56b6127..c4c8540 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -300,7 +300,7 @@ public class BaseKuduTest {
     return option;
   }
 
-  protected Insert createBasicSchemaInsert(KuduTable table, int key) {
+  protected static Insert createBasicSchemaInsert(KuduTable table, int key) {
     Insert insert = table.newInsert();
     PartialRow row = insert.getRow();
     row.addInt(0, key);

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 5d5a498..e7a315f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -34,6 +34,7 @@ import com.google.common.net.HostAndPort;
 import org.apache.kudu.Common.HostPortPB;
 import org.apache.kudu.tools.Tool.ControlShellRequestPB;
 import org.apache.kudu.tools.Tool.ControlShellResponsePB;
+import org.apache.kudu.tools.Tool.CreateClusterRequestPB.MiniKdcOptionsPB;
 import org.apache.kudu.tools.Tool.CreateClusterRequestPB;
 import org.apache.kudu.tools.Tool.DaemonIdentifierPB;
 import org.apache.kudu.tools.Tool.DaemonInfoPB;
@@ -41,6 +42,7 @@ import org.apache.kudu.tools.Tool.GetKDCEnvVarsRequestPB;
 import org.apache.kudu.tools.Tool.GetMastersRequestPB;
 import org.apache.kudu.tools.Tool.GetTServersRequestPB;
 import org.apache.kudu.tools.Tool.KdestroyRequestPB;
+import org.apache.kudu.tools.Tool.KinitRequestPB;
 import org.apache.kudu.tools.Tool.StartClusterRequestPB;
 import org.apache.kudu.tools.Tool.StartDaemonRequestPB;
 import org.apache.kudu.tools.Tool.StopDaemonRequestPB;
@@ -87,16 +89,20 @@ public class MiniKuduCluster implements AutoCloseable {
   private final ImmutableList<String> extraTserverFlags;
   private final ImmutableList<String> extraMasterFlags;
 
+  private MiniKdcOptionsPB kdcOptionsPb;
+
   private MiniKuduCluster(boolean enableKerberos,
       int numMasters,
       int numTservers,
       List<String> extraTserverFlags,
-      List<String> extraMasterFlags) {
+      List<String> extraMasterFlags,
+      MiniKdcOptionsPB kdcOptionsPb) {
     this.enableKerberos = enableKerberos;
     this.numMasters = numMasters;
     this.numTservers = numTservers;
     this.extraTserverFlags = ImmutableList.copyOf(extraTserverFlags);
     this.extraMasterFlags = ImmutableList.copyOf(extraMasterFlags);
+    this.kdcOptionsPb = kdcOptionsPb;
   }
 
   /**
@@ -167,6 +173,7 @@ public class MiniKuduCluster implements AutoCloseable {
             .setEnableKerberos(enableKerberos)
             .addAllExtraMasterFlags(extraMasterFlags)
             .addAllExtraTserverFlags(extraTserverFlags)
+            .setMiniKdcOptions(kdcOptionsPb)
             .build())
         .build());
     sendRequestToCluster(
@@ -356,7 +363,7 @@ public class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
-   * Removes all credentials for all principals from the KDC credential cache.
+   * Removes all credentials for all principals from the Kerberos credential cache.
    */
   public void kdestroy() throws IOException {
     sendRequestToCluster(ControlShellRequestPB.newBuilder()
@@ -364,6 +371,18 @@ public class MiniKuduCluster implements AutoCloseable {
                                               .build());
   }
 
+  /**
+   * Re-initialize Kerberos credentials for the given username, writing them
+   * into the Kerberos credential cache.
+   * @param username the username to kinit as
+   */
+  public void kinit(String username) throws IOException {
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setKinit(KinitRequestPB.newBuilder().setUsername(username).build())
+        .build());
+  }
+
+
   /** {@override} */
   @Override
   public void close() {
@@ -474,6 +493,9 @@ public class MiniKuduCluster implements AutoCloseable {
     private final List<String> extraTserverFlags = new ArrayList<>();
     private final List<String> extraMasterFlags = new ArrayList<>();
 
+    private MiniKdcOptionsPB.Builder kdcOptionsPb =
+        MiniKdcOptionsPB.newBuilder();
+
     public MiniKuduClusterBuilder numMasters(int numMasters) {
       this.numMasters = numMasters;
       return this;
@@ -511,6 +533,16 @@ public class MiniKuduCluster implements AutoCloseable {
       return this;
     }
 
+    public MiniKuduClusterBuilder kdcTicketLifetime(String lifetime) {
+      this.kdcOptionsPb.setTicketLifetime(lifetime);
+      return this;
+    }
+
+    public MiniKuduClusterBuilder kdcRenewLifetime(String lifetime) {
+      this.kdcOptionsPb.setRenewLifetime(lifetime);
+      return this;
+    }
+
     /**
      * Builds and starts a new {@link MiniKuduCluster} using builder state.
      * @return the newly started {@link MiniKuduCluster}
@@ -520,7 +552,8 @@ public class MiniKuduCluster implements AutoCloseable {
       MiniKuduCluster cluster =
           new MiniKuduCluster(enableKerberos,
               numMasters, numTservers,
-              extraTserverFlags, extraMasterFlags);
+              extraTserverFlags, extraMasterFlags,
+              kdcOptionsPb.build());
       try {
         cluster.start();
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index d10f7b0..bd1a6ed 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
-import java.security.AccessController;
 import java.security.KeyStore;
 import java.security.cert.Certificate;
 import java.util.List;
@@ -33,7 +32,6 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLException;
-import javax.security.auth.Subject;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -97,8 +95,7 @@ public class TestNegotiator {
   public void setup() {
     serverEngine = createServerEngine();
     serverEngine.setUseClientMode(false);
-    secContext = new SecurityContext(Subject.getSubject(
-        AccessController.getContext()));
+    secContext = new SecurityContext();
   }
 
   private void startNegotiation(boolean fakeLoopback) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
index 793e1f3..2af042d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
@@ -16,31 +16,110 @@ package org.apache.kudu.client;
 import static org.apache.kudu.util.AssertHelpers.assertEventuallyTrue;
 import static org.junit.Assert.assertNotNull;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-import com.stumbleupon.async.Deferred;
+import javax.security.auth.Subject;
 
 import org.apache.kudu.client.Client.AuthenticationCredentialsPB;
+import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder;
 import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
+import org.apache.kudu.util.AssertHelpers;
 import org.apache.kudu.util.AssertHelpers.BooleanExpression;
+import org.apache.kudu.util.CapturingLogAppender;
 import org.apache.kudu.util.SecurityUtil;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.hamcrest.CoreMatchers;
-
-public class TestSecurity extends BaseKuduTest {
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableSet;
+import com.stumbleupon.async.Deferred;
 
+public class TestSecurity {
   private static final String TABLE_NAME = "TestSecurity-table";
+  private static final int TICKET_LIFETIME_SECS = 10;
+  private static final int RENEWABLE_LIFETIME_SECS = 30;
+
+  private final CapturingLogAppender cla = new CapturingLogAppender();
+  private MiniKuduCluster miniCluster;
+  private KuduClient client;
+
+  private enum Option {
+    LONG_LEADER_ELECTION,
+    SHORT_TOKENS_AND_TICKETS,
+    START_TSERVERS
+  };
+
+  private void startCluster(Set<Option> opts) throws IOException {
+    Preconditions.checkState(miniCluster == null);
+    MiniKuduClusterBuilder mcb = new MiniKuduClusterBuilder();
+    mcb.enableKerberos();
+    if (opts.contains(Option.LONG_LEADER_ELECTION)) {
+      mcb.addMasterFlag("--leader_failure_max_missed_heartbeat_periods=10.0");
+    }
+    if (opts.contains(Option.SHORT_TOKENS_AND_TICKETS)) {
+      mcb.addMasterFlag("--authn_token_validity_seconds=" + TICKET_LIFETIME_SECS)
+         .kdcRenewLifetime(RENEWABLE_LIFETIME_SECS + "s")
+         .kdcTicketLifetime(TICKET_LIFETIME_SECS + "s");
+    }
+    miniCluster = mcb.numMasters(3)
+        .numTservers(opts.contains(Option.START_TSERVERS) ? 3 : 0)
+        .build();
+    miniCluster.kinit("test-admin");
+    client = new KuduClient.KuduClientBuilder(miniCluster.getMasterAddresses()).build();
+
+    // TODO(todd): it seems that exportAuthenticationCredentials() doesn't properly retry
+    // in the case that there is no leader, even though NoLeaderFoundException is a RecoverableException.
+    // So, we have to use a hack of calling listTabletServers, which _does_ properly retry,
+    // in order to wait for the masters to elect a leader.
+    client.listTabletServers();
+  }
+
+  @After
+  public void reset() throws IOException, InterruptedException {
+    if (client != null) {
+      client.close();
+    }
+    if (miniCluster != null) {
+      miniCluster.shutdown();
+    }
+  }
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    miniClusterBuilder.enableKerberos()
-    .addMasterFlag("--leader_failure_max_missed_heartbeat_periods=10.0")
-    .addMasterFlag("--rpc_trace_negotiation");
+  /**
+   * Create a KuduClient associated with the given Subject.
+   */
+  private KuduClient createClientFromSubject(Subject subject)
+      throws PrivilegedActionException {
+    return Subject.doAs(subject, new PrivilegedExceptionAction<KuduClient>() {
+      @Override
+      public KuduClient run() throws Exception {
+        return createClient();
+      }
+    });
+  }
 
-    BaseKuduTest.setUpBeforeClass();
+  private KuduClient createClient() {
+    return new KuduClient.KuduClientBuilder(miniCluster.getMasterAddresses()).build();
+  }
+
+  private void checkClientCanReconnect(KuduClient client) throws IOException {
+    // Cycle the masters to ensure that we have to re-connect and thus
+    // re-negotiate an authenticated RPC connection. Without this step,
+    // we'd just hang onto our existing authenticated connections which
+    // would continue to work even though our credentials might have
+    // expired (we only authenticate when a connection is negotiated, not
+    // for each call).
+    miniCluster.killMasters();
+    miniCluster.restartDeadMasters();
+    client.listTabletServers();
   }
 
   /**
@@ -50,12 +129,15 @@ public class TestSecurity extends BaseKuduTest {
    */
   @Test
   public void testImportExportAuthenticationCredentials() throws Exception {
-    byte[] authnData = client.exportAuthenticationCredentials().join();
+    startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS,
+        Option.START_TSERVERS));
+    byte[] authnData = client.exportAuthenticationCredentials();
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     try {
-      KuduClient newClient = new KuduClient.KuduClientBuilder(masterAddresses).build();
+      KuduClient newClient = new KuduClient.KuduClientBuilder(
+          miniCluster.getMasterAddresses()).build();
 
       // Test that a client with no credentials cannot list servers.
       try {
@@ -72,10 +154,10 @@ public class TestSecurity extends BaseKuduTest {
       // If we import the authentication data from the old authenticated client,
       // we should now be able to perform all of the normal client operations.
       newClient.importAuthenticationCredentials(authnData);
-      KuduTable table = newClient.createTable(TABLE_NAME, basicSchema,
-          getBasicCreateTableOptions());
+      KuduTable table = newClient.createTable(TABLE_NAME, BaseKuduTest.basicSchema,
+          BaseKuduTest.getBasicCreateTableOptions());
       KuduSession session = newClient.newSession();
-      session.apply(createBasicSchemaInsert(table, 1));
+      session.apply(BaseKuduTest.createBasicSchemaInsert(table, 1));
       session.flush();
     } finally {
       // Restore ticket cache for other test cases.
@@ -89,7 +171,8 @@ public class TestSecurity extends BaseKuduTest {
    */
   @Test
   public void testErrorMessageWithNoCaCert() throws Exception {
-    byte[] authnData = client.exportAuthenticationCredentials().join();
+    startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS));
+    byte[] authnData = client.exportAuthenticationCredentials();
 
     // Remove the CA certs from the credentials.
     authnData = AuthenticationCredentialsPB.parseFrom(authnData).toBuilder()
@@ -98,7 +181,7 @@ public class TestSecurity extends BaseKuduTest {
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     try {
-      KuduClient newClient = new KuduClient.KuduClientBuilder(masterAddresses).build();
+      KuduClient newClient = createClient();
       newClient.importAuthenticationCredentials(authnData);
 
       // We shouldn't be able to connect because we have no appropriate CA cert.
@@ -126,12 +209,13 @@ public class TestSecurity extends BaseKuduTest {
    */
   @Test
   public void testKudu2267() throws Exception {
-    byte[] authnData = client.exportAuthenticationCredentials().join();
+    startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS));
+    byte[] authnData = client.exportAuthenticationCredentials();
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     try {
-      final KuduClient newClient = new KuduClient.KuduClientBuilder(masterAddresses).build();
+      final KuduClient newClient = createClient();
       newClient.importAuthenticationCredentials(authnData);
 
       // Try to connect to all the masters and assert there is no
@@ -140,10 +224,11 @@ public class TestSecurity extends BaseKuduTest {
           new BooleanExpression() {
             @Override
             public boolean get() throws Exception {
-              ConnectToCluster connector = new ConnectToCluster(masterHostPorts);
+              ConnectToCluster connector = new ConnectToCluster(miniCluster.getMasterHostPorts());
               List<Deferred<ConnectToMasterResponsePB>> deferreds =
                       connector.connectToMasters(newClient.asyncClient.getMasterTable(), null,
-                      DEFAULT_SLEEP, Connection.CredentialsPolicy.ANY_CREDENTIALS);
+                      /* timeout = */50000,
+                      Connection.CredentialsPolicy.ANY_CREDENTIALS);
               // Wait for all Deferreds are called back.
               for (Deferred<ConnectToMasterResponsePB> deferred : deferreds) {
                 deferred.join();
@@ -151,7 +236,7 @@ public class TestSecurity extends BaseKuduTest {
               List<Exception> s = connector.getExceptionsReceived();
               return s.size() == 0;
             }
-      }, DEFAULT_SLEEP);
+      }, /* timeout = */50000);
     } finally {
       System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, oldTicketCache);
     }
@@ -165,19 +250,166 @@ public class TestSecurity extends BaseKuduTest {
    */
   @Test
   public void testConnectToNonLeaderMasters() throws Exception {
-    byte[] authnData = client.exportAuthenticationCredentials().join();
+    startCluster(ImmutableSet.of(Option.LONG_LEADER_ELECTION));
+    System.err.println("=> started cluster");
+    byte[] authnData = client.exportAuthenticationCredentials();
+    System.err.println("=> exported auth");
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     try {
-      KuduClient newClient = new KuduClient.KuduClientBuilder(masterAddresses).build();
+      KuduClient newClient = createClient();
       newClient.importAuthenticationCredentials(authnData);
+      System.err.println("=> imported auth");
 
       miniCluster.killMasters();
       miniCluster.restartDeadMasters();
       newClient.listTabletServers();
+      System.err.println("=> listTabletServers");
     } finally {
       System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, oldTicketCache);
     }
   }
-}
+
+  /**
+   * Test that, if our Kerberos credentials expire, that we will automatically
+   * re-login from an available ticket cache.
+   */
+  @Test(timeout=300000)
+  public void testRenewAndReacquireKeberosCredentials() throws Exception {
+    startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS));
+    Stopwatch timeSinceKinit = Stopwatch.createStarted();
+    try (Closeable c = cla.attach()) {
+      for (Stopwatch sw = Stopwatch.createStarted();
+           sw.elapsed(TimeUnit.SECONDS) < RENEWABLE_LIFETIME_SECS * 2;) {
+        if (timeSinceKinit.elapsed(TimeUnit.SECONDS) > TICKET_LIFETIME_SECS + 5) {
+          // We have gotten past the initial lifetime and well into the renewable
+          // lifetime. If we haven't failed yet, that means that Kudu
+          // successfully renewed the ticket.
+          //
+          // We can now re-kinit to get a new ticket, to ensure that Kudu
+          // will properly re-login from the on-disk cache when its in-memory
+          // ticket is no longer renewable.
+          miniCluster.kinit("test-admin");
+          timeSinceKinit.reset().start();
+        }
+        Thread.sleep(5000);
+        // Ensure that we don't use an authentication token to reconnect.
+        client.asyncClient.securityContext.setAuthenticationToken(null);
+        checkClientCanReconnect(client);
+      }
+    }
+    Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
+        "Successfully refreshed Kerberos credentials from ticket cache"));
+  }
+
+  /**
+   * Test that, if the ticket cache is refreshed but contains a different principal
+   * from the original one, we will not accept it.
+   */
+  @Test(timeout=300000)
+  public void testDoNotSwitchPrincipalsInExistingClient() throws Exception {
+    startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS));
+    // Switch the ticket cache to a different user.
+    miniCluster.kinit("test-user");
+    try (Closeable c = cla.attach()) {
+      // We should eventually fail to connect because the initial credentials will
+      // have expired and the client should refuse to refresh credentials with a
+      // different principal.
+      assertEventualAuthenticationFailure(client,
+          "server requires authentication, but " +
+          "client Kerberos credentials (TGT) have expired");
+    }
+    Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
+        "found that the new Kerberos principal test-user@KRBTEST.COM " +
+        "did not match the original principal test-admin@KRBTEST.COM"));
+  }
+
+  private void assertEventualAuthenticationFailure(
+      final KuduClient client,
+      final String exceptionSubstring) throws Exception {
+    AssertHelpers.assertEventuallyTrue("should eventually fail to connect",
+        new BooleanExpression() {
+          @Override
+          public boolean get() throws Exception {
+            Thread.sleep(3000);
+            miniCluster.killMasters();
+            miniCluster.restartDeadMasters();
+            try {
+              client.listTabletServers();
+            } catch (Exception e) {
+              if (e.toString().contains(exceptionSubstring)) {
+                return true;
+              }
+              throw e;
+            }
+            return false;
+          }
+    }, 60000);
+  }
+
+  /**
+   * Test that, if an externally-provided subject is used when the client
+   * is created, the client will not attempt to refresh anything, and will
+   * eventually fail with appropriate warnings in the log.
+   */
+  @Test(timeout=300000)
+  public void testExternallyProvidedSubjectExpires() throws Exception {
+    startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS));
+    Subject subject = SecurityUtil.getSubjectFromTicketCacheOrNull();
+    Assert.assertNotNull(subject);
+    try (Closeable c = cla.attach()) {
+      // Create a client attached to our own subject.
+      KuduClient newClient = createClientFromSubject(subject);
+      // It should not get auto-refreshed.
+      try {
+        assertEventualAuthenticationFailure(newClient,
+            "server requires authentication, but " +
+            "client Kerberos credentials (TGT) have expired");
+      } finally {
+        newClient.close();
+      }
+    }
+    Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
+        "Using caller-provided subject with Kerberos principal test-admin@KRBTEST.COM."));
+    Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
+        "Caller-provided Subject has a Kerberos ticket that is about to expire"));
+  }
+
+  /**
+   * Test that, so long as we are periodically renewing a caller-provided Subject's
+   * credentials, the client will continue to operate fine.
+   *
+   * This simulates the case of using the Kudu client from an application using
+   * the UserGroupInformation class from Hadoop, which spawns a thread to
+   * renew credentials from a keytab.
+   */
+  @Test(timeout=300000)
+  public void testExternallyProvidedSubjectRefreshedExternally() throws Exception {
+    startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS));
+    Subject subject = SecurityUtil.getSubjectFromTicketCacheOrNull();
+    Assert.assertNotNull(subject);
+    try (Closeable c = cla.attach()) {
+      // Create a client attached to our own subject.
+      KuduClient newClient = createClientFromSubject(subject);
+      // Run for longer than the renewable lifetime - this ensures that we
+      // are indeed picking up the new credentials.
+      for (Stopwatch sw = Stopwatch.createStarted();
+          sw.elapsed(TimeUnit.SECONDS) < RENEWABLE_LIFETIME_SECS + 5;
+          Thread.sleep(3000)) {
+        miniCluster.kinit("test-admin");
+
+        // Update the existing subject in-place by copying over the credentials from
+        // a newly logged-in subject.
+        Subject newSubject = SecurityUtil.getSubjectFromTicketCacheOrNull();
+        subject.getPrivateCredentials().clear();
+        subject.getPrivateCredentials().addAll(newSubject.getPrivateCredentials());
+        // Ensure that we don't use an authentication token to reconnect.
+        newClient.asyncClient.securityContext.setAuthenticationToken(null);
+        checkClientCanReconnect(newClient);
+      }
+    }
+    Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
+        "Using caller-provided subject with Kerberos principal test-admin@KRBTEST.COM."));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 2463dfa..5ca8f57 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2131,6 +2131,18 @@ TEST_P(ControlShellToolTest, TestControlShell) {
       req.mutable_start_daemon()->mutable_id()->set_type(KDC);
       ASSERT_OK(SendReceive(req, &resp));
     }
+    // Test kinit by deleting the ticket cache, kinitting, and
+    // ensuring it is recreated.
+    {
+      char* ccache_path = getenv("KRB5CCNAME");
+      ASSERT_TRUE(ccache_path);
+      ASSERT_OK(env_->DeleteFile(ccache_path));
+      ControlShellRequestPB req;
+      ControlShellResponsePB resp;
+      req.mutable_kinit()->set_username("test-user");
+      ASSERT_OK(SendReceive(req, &resp));
+      ASSERT_TRUE(env_->FileExists(ccache_path));
+    }
   }
 
   // Destroy the cluster.

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/src/kudu/tools/tool.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
index c591925..2757ee7 100644
--- a/src/kudu/tools/tool.proto
+++ b/src/kudu/tools/tool.proto
@@ -50,6 +50,14 @@ message CreateClusterRequestPB {
   // a format that's expected by gflag (i.e. "--foo=bar").
   repeated string extra_master_flags = 5;
   repeated string extra_tserver_flags = 6;
+
+  message MiniKdcOptionsPB {
+    // The default lifetime for initial ticket requests.
+    optional string ticket_lifetime = 1;
+    // The default renewable lifetime for initial ticket requests.
+    optional string renew_lifetime = 2;
+  }
+  optional MiniKdcOptionsPB mini_kdc_options = 8;
 }
 
 // Destroys a cluster created via 'create_cluster'.
@@ -143,6 +151,12 @@ message GetKDCEnvVarsRequestPB {}
 // Removes all credentials for all principals from the KDC credential cache.
 message KdestroyRequestPB {};
 
+// Establishes a new Kerberos ticket cache (equivalent to running 'kinit').
+message KinitRequestPB {
+  optional string username = 1 [ default = "test-admin" ];
+};
+
+
 // Sent by the control shell in response to a control shell command request.
 message ControlShellResponsePB {
 
@@ -176,5 +190,6 @@ message ControlShellRequestPB {
     GetTServersRequestPB get_tservers = 8;
     GetKDCEnvVarsRequestPB get_kdc_env_vars = 9;
     KdestroyRequestPB kdestroy = 10;
+    KinitRequestPB kinit = 11;
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/55e611d3/src/kudu/tools/tool_action_test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc
index c5fc888..ce0150c 100644
--- a/src/kudu/tools/tool_action_test.cc
+++ b/src/kudu/tools/tool_action_test.cc
@@ -172,6 +172,8 @@ Status ProcessRequest(const ControlShellRequestPB& req,
       }
       if (opts.enable_kerberos) {
         opts.mini_kdc_options.data_root = JoinPathSegments(opts.cluster_root, "krb5kdc");
+        opts.mini_kdc_options.ticket_lifetime = cc.mini_kdc_options().ticket_lifetime();
+        opts.mini_kdc_options.renew_lifetime = cc.mini_kdc_options().renew_lifetime();
       }
 
       cluster->reset(new ExternalMiniCluster(std::move(opts)));
@@ -278,6 +280,14 @@ Status ProcessRequest(const ControlShellRequestPB& req,
       RETURN_NOT_OK((*cluster)->kdc()->Kdestroy());
       break;
     }
+    case ControlShellRequestPB::kKinit:
+    {
+      if (!(*cluster)->kdc()) {
+        RETURN_NOT_OK(Status::NotFound("kdc not found"));
+      }
+      RETURN_NOT_OK((*cluster)->kdc()->Kinit(req.kinit().username()));
+      break;
+    }
     default:
       RETURN_NOT_OK(Status::InvalidArgument("unknown cluster control request"));
   }


Mime
View raw message