storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [01/10] storm git commit: STORM-446: Allow superusers to impersonate other users in secure mode.
Date Thu, 12 Mar 2015 17:06:47 GMT
Repository: storm
Updated Branches:
  refs/heads/master ec13a14f3 -> 117256b60


STORM-446: Allow superusers to impersonate other users in secure mode.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/086e9e58
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/086e9e58
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/086e9e58

Branch: refs/heads/master
Commit: 086e9e588e46922198d56faafbc9d25f4a543f47
Parents: 64d7ac6
Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Authored: Thu Feb 26 17:26:09 2015 -0800
Committer: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Committed: Thu Feb 26 17:26:09 2015 -0800

----------------------------------------------------------------------
 SECURITY.md                                     |  34 ++++-
 storm-core/src/jvm/backtype/storm/Config.java   |   6 +
 .../jvm/backtype/storm/ConfigValidation.java    |   8 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |  88 ++++++-----
 .../storm/security/auth/ITransportPlugin.java   |   4 +-
 .../storm/security/auth/ReqContext.java         |  28 +++-
 .../security/auth/SaslTransportPlugin.java      |   3 +-
 .../security/auth/SimpleTransportPlugin.java    |   5 +-
 .../storm/security/auth/TBackoffConnect.java    |   4 +-
 .../storm/security/auth/ThriftClient.java       |  12 +-
 .../authorizer/ImpersonationAuthorizer.java     | 148 +++++++++++++++++++
 .../auth/authorizer/SimpleACLAuthorizer.java    |  20 ++-
 .../auth/digest/DigestSaslTransportPlugin.java  |   6 +-
 .../auth/digest/ServerCallbackHandler.java      |  20 ++-
 .../kerberos/KerberosSaslTransportPlugin.java   |  11 +-
 .../auth/kerberos/ServerCallbackHandler.java    |  38 +++--
 .../jvm/backtype/storm/utils/DRPCClient.java    |   2 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  |  19 ++-
 .../backtype/storm/security/auth/auth_test.clj  | 130 ++++++++++------
 19 files changed, 463 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/SECURITY.md
----------------------------------------------------------------------
diff --git a/SECURITY.md b/SECURITY.md
index 5100735..37a4dae 100644
--- a/SECURITY.md
+++ b/SECURITY.md
@@ -309,7 +309,7 @@ multitenant.scheduler.user.pools:
     "derek": 10
 ```
 
-### Run as User
+### Run worker processes as user who submitted the topology
 By default storm runs workers as the user that is running the supervisor.  This is not ideal for security.  To make storm run the topologies as the user that launched them set.
 
 ```yaml
@@ -329,6 +329,38 @@ min.user.id=$(min_user_id)
 where worker_launcher_group is the same group the supervisor is a part of, and min.user.id is set to the first real user id on the system.
 This config file also needs to be owned by root and not have world or group write permissions.
 
+### Impersonating a user
+A storm client may submit requests on behalf of another user. For example, if a `userX` submits an oozie workflow and as part of workflow execution if user `oozie` wants to submit a topology on behalf of `userX`
+it can do so by leveraging the impersonation feature.In order to submit topology as some other user , you can use `StormSubmitter.submitTopologyAs` API. Alternatively you can use `NimbusClient.getConfiguredClientAs` 
+to get a nimbus client as some other user and perform any nimbus action(i.e. kill/rebalance/activate/deactivate) using this client. 
+
+`SimpleACLAuthorizer` performs authorization for any user trying to impersonate. To authorize a user for impersonation following acl config needs to be supplied to nimbus as part of storm config. 
+If you add a new entry, you need to restart the nimbus for the entry to be effective.
+
+```yaml
+nimbus.impersonation.acl:
+    impersonating_user1:
+        hosts:
+            [comma separated list of hosts from which impersonating_user1 is allowed to impersonate other users]
+        groups:
+            [comma separated list of groups whose users impersonating_user1 is allowed to impersonate]
+    impersonating_user2:
+        hosts:
+            [comma separated list of hosts from which impersonating_user2 is allowed to impersonate other users]
+        groups:
+            [comma separated list of groups whose users impersonating_user2 is allowed to impersonate]
+```
+
+To support the oozie use case following config can be supplied:
+```yaml
+nimbus.impersonation.acl:
+    oozie:
+        hosts:
+            [oozie-host1, oozie-host2, 127.0.0.1]
+        groups:
+            [some-group-that-userX-is-part-of]
+```
+
 ### Automatic Credentials Push and Renewal
 Individual topologies have the ability to push credentials (tickets and tokens) to workers so that they can access secure services.  Exposing this to all of the users can be a pain for them.
 To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed.

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 73a1976..1237c28 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -442,6 +442,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_AUTHORIZER_SCHEMA = String.class;
 
     /**
+     * Impersonation user ACL config entries.
+     */
+    public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl";
+    public static final Object NIMBUS_IMPERSONATION_ACL_SCHEMA = ConfigValidation.MapOfStringToMapValidator;
+
+    /**
      * How often nimbus should wake up to renew credentials if needed.
      */
     public static final String NIMBUS_CREDENTIAL_RENEW_FREQ_SECS = "nimbus.credential.renewers.freq.secs";

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index 14394a0..24991d7 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -168,13 +168,19 @@ public class ConfigValidation {
      * Validates a list of Strings.
      */
     public static Object StringsValidator = listFv(String.class, true);
-    
+
     /**
      * Validates a map of Strings to Numbers.
      */
     public static Object MapOfStringToNumberValidator = mapFv(String.class, Number.class, true);
 
     /**
+     * Validates a map of Strings to a map of Strings to a list.
+     * {str -> {str -> [str,str]}
+     */
+    public static Object MapOfStringToMapValidator = mapFv(fv(String.class, false), mapFv(fv(String.class, false), listFv(String.class, false), false), true);
+
+    /**
      * Validates is a list of Maps.
      */
     public static Object MapsValidator = listFv(Map.class, true);

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 54cc701..63cc5a1 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -175,22 +175,19 @@ public class StormSubmitter {
     }
 
     /**
-     * Submits a topology to run on the cluster. A topology runs forever or until
-     * explicitly killed.
      *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}.
-     * @param topology the processing to execute.
-     * @param opts to manipulate the starting of the topology
-     * @param progressListener to track the progress of the jar upload process
-     * @throws AlreadyAliveException if a topology with this name is already running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     * @throws AuthorizationException if authorization is failed
+     * @param name
+     * @param stormConf
+     * @param topology
+     * @param opts
+     * @param progressListener
+     * @param asUser The user as which this topology should be submitted.
+     * @throws AlreadyAliveException
+     * @throws InvalidTopologyException
+     * @throws AuthorizationException
      */
-    @SuppressWarnings("unchecked")
-    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
-             ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+    public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
+            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
         if(!Utils.isValidConf(stormConf)) {
             throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
         }
@@ -218,25 +215,25 @@ public class StormSubmitter {
             if(localNimbus!=null) {
                 LOG.info("Submitting topology " + name + " in local mode");
                 if(opts!=null) {
-                    localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);                    
+                    localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
                 } else {
                     // this is for backwards compatibility
-                    localNimbus.submitTopology(name, stormConf, topology);                                            
+                    localNimbus.submitTopology(name, stormConf, topology);
                 }
             } else {
                 String serConf = JSONValue.toJSONString(stormConf);
-                NimbusClient client = NimbusClient.getConfiguredClient(conf);
-                if(topologyNameExists(conf, name)) {
+                NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
+                if(topologyNameExists(conf, name, asUser)) {
                     throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                 }
-                String jar = submitJar(conf, progressListener);
+                String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);
                 try {
                     LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
                     if(opts!=null) {
-                        client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);                    
+                        client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
                     } else {
                         // this is for backwards compatibility
-                        client.getClient().submitTopology(name, jar, serConf, topology);                                            
+                        client.getClient().submitTopology(name, jar, serConf, topology);
                     }
                 } catch(InvalidTopologyException e) {
                     LOG.warn("Topology submission exception: "+e.get_msg());
@@ -255,6 +252,26 @@ public class StormSubmitter {
     }
 
     /**
+     * Submits a topology to run on the cluster. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @param opts to manipulate the starting of the topology
+     * @param progressListener to track the progress of the jar upload process
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
+     */
+    @SuppressWarnings("unchecked")
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
+             ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+
+    }
+
+    /**
      * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
      * explicitly killed.
      *
@@ -310,8 +327,8 @@ public class StormSubmitter {
         });
     }
 
-    private static boolean topologyNameExists(Map conf, String name) {
-        NimbusClient client = NimbusClient.getConfiguredClient(conf);
+    private static boolean topologyNameExists(Map conf, String name, String asUser) {
+        NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
         try {
             ClusterSummary summary = client.getClient().getClusterInfo();
             for(TopologySummary s : summary.get_topologies()) {
@@ -342,19 +359,13 @@ public class StormSubmitter {
         return submitJar(conf, localJar, null);
     }
 
-    /**
-     * Submit jar file
-     * @param conf the topology-specific configuration. See {@link Config}.
-     * @param localJar file path of the jar file to submit
-     * @param listener progress listener to track the jar file upload
-     * @return the remote location of the submitted jar
-     */
-    public static String submitJar(Map conf, String localJar, ProgressListener listener) {
+
+    public static String submitJarAs(Map conf, String localJar, ProgressListener listener, String asUser) {
         if (localJar == null) {
             throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
         }
 
-        NimbusClient client = NimbusClient.getConfiguredClient(conf);
+        NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
         try {
             String uploadLocation = client.getClient().beginFileUpload();
             LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
@@ -385,13 +396,24 @@ public class StormSubmitter {
             LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
             return uploadLocation;
         } catch(Exception e) {
-            throw new RuntimeException(e);            
+            throw new RuntimeException(e);
         } finally {
             client.close();
         }
     }
 
     /**
+     * Submit jar file
+     * @param conf the topology-specific configuration. See {@link Config}.
+     * @param localJar file path of the jar file to submit
+     * @param listener progress listener to track the jar file upload
+     * @return the remote location of the submitted jar
+     */
+    public static String submitJar(Map conf, String localJar, ProgressListener listener) {
+        return submitJarAs(conf,localJar, listener, null);
+    }
+
+    /**
      * Interface use to track progress of file upload
      */
     public interface ProgressListener {

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
index 7575d71..5ba2557 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
@@ -54,6 +54,8 @@ public interface ITransportPlugin {
      * Connect to the specified server via framed transport 
      * @param transport The underlying Thrift transport.
      * @param serverHost server host
+     * @param asUser the user as which the connection should be established, and all the subsequent actions should be executed.
+     *               Only applicable when using secure storm cluster. A null/blank value here will just indicate to use the logged in user.
      */
-    public TTransport connect(TTransport transport, String serverHost) throws IOException, TTransportException;
+    public TTransport connect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java b/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
index 4033f18..a252f85 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
@@ -22,6 +22,9 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.net.InetAddress;
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.Principal;
@@ -39,6 +42,10 @@ public class ReqContext {
     private InetAddress _remoteAddr;
     private Integer _reqID;
     private Map _storm_conf;
+    private Principal realPrincipal;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReqContext.class);
+
 
     /**
      * Get a request context associated with current thread
@@ -87,7 +94,7 @@ public class ReqContext {
      * Set remote subject explicitly
      */
     public void setSubject(Subject subject) {
-        _subject = subject;	
+        _subject = subject;
     }
 
     /**
@@ -106,6 +113,24 @@ public class ReqContext {
         if (princs.size()==0) return null;
         return (Principal) (princs.toArray()[0]);
     }
+
+    public void setRealPrincipal(Principal realPrincipal) {
+        this.realPrincipal = realPrincipal;
+    }
+    /**
+     * The real principal associated with the subject.
+     */
+    public Principal realPrincipal() {
+        return this.realPrincipal;
+    }
+
+    /**
+     * Returns true if this request is an impersonation request.
+     * @return
+     */
+    public boolean isImpersonating() {
+        return this.realPrincipal != null;
+    }
     
     /**
      * request ID of this request
@@ -113,4 +138,5 @@ public class ReqContext {
     public Integer requestID() {
         return _reqID;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
index 64bec19..532ebd7 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
@@ -116,7 +116,6 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
             TTransport trans = inProt.getTransport();
             //Sasl transport
             TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
-
             //remote address
             TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
             Socket socket = tsocket.getSocket();
@@ -128,7 +127,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
             Subject remoteUser = new Subject();
             remoteUser.getPrincipals().add(new User(authId));
             req_context.setSubject(remoteUser);
-            
+
             //invoke service handler
             return wrapped.process(inProt, outProt);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
index 0e1a365..ab06635 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
@@ -90,8 +90,11 @@ public class SimpleTransportPlugin implements ITransportPlugin {
     /**
      * Connect to the specified server via framed transport 
      * @param transport The underlying Thrift transport.
+     * @param serverHost unused.
+     * @param asUser unused.
      */
-    public TTransport connect(TTransport transport, String serverHost) throws TTransportException {
+    @Override
+    public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException {
         //create a framed transport
         TTransport conn = new TFramedTransport(transport);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java b/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
index eaed61f..f547868 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
@@ -40,12 +40,12 @@ public class TBackoffConnect {
                                                               retryTimes);
     }
 
-    public TTransport doConnectWithRetry(ITransportPlugin transportPlugin, TTransport underlyingTransport, String host) throws IOException {
+    public TTransport doConnectWithRetry(ITransportPlugin transportPlugin, TTransport underlyingTransport, String host, String asUser) throws IOException {
         boolean connected = false;
         TTransport transportResult = null;
         while(!connected) {
             try {
-                transportResult = transportPlugin.connect(underlyingTransport, host);
+                transportResult = transportPlugin.connect(underlyingTransport, host, asUser);
                 connected = true;
             } catch (TTransportException ex) {
                 retryNext(ex);

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java b/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
index ec337d9..9f77ab9 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
@@ -39,12 +39,17 @@ public class ThriftClient {
     private Integer _timeout;
     private Map _conf;
     private ThriftConnectionType _type;
+    private String _asUser;
 
     public ThriftClient(Map storm_conf, ThriftConnectionType type, String host) {
-        this(storm_conf, type, host, null, null);
+        this(storm_conf, type, host, null, null, null);
     }
 
-    public ThriftClient(Map storm_conf, ThriftConnectionType type, String host, Integer port, Integer timeout) {
+    public ThriftClient(Map storm_conf, ThriftConnectionType type, String host, Integer port, Integer timeout){
+        this(storm_conf, type, host, port, timeout, null);
+    }
+
+    public ThriftClient(Map storm_conf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser) {
         //create a socket with server
         if (host==null) {
             throw new IllegalArgumentException("host is not set");
@@ -63,6 +68,7 @@ public class ThriftClient {
         _timeout = timeout;
         _conf = storm_conf;
         _type = type;
+        _asUser = asUser;
         reconnect();
     }
 
@@ -94,7 +100,7 @@ public class ThriftClient {
                                       Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_TIMES)),
                                       Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
                                       Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
-            _transport = connectionRetry.doConnectWithRetry(transportPlugin, underlyingTransport, _host);
+            _transport = connectionRetry.doConnectWithRetry(transportPlugin, underlyingTransport, _host, _asUser);
         } catch (IOException ex) {
             throw new RuntimeException(ex);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
new file mode 100644
index 0000000..1e947ae
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
@@ -0,0 +1,148 @@
+package backtype.storm.security.auth.authorizer;
+
+import backtype.storm.Config;
+import backtype.storm.security.auth.*;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+
+
+public class ImpersonationAuthorizer implements IAuthorizer {
+    private static final Logger LOG = LoggerFactory.getLogger(ImpersonationAuthorizer.class);
+
+    protected Map<String, ImpersonationACL> userImpersonationACL;
+    protected IPrincipalToLocal _ptol;
+    protected IGroupMappingServiceProvider _groupMappingProvider;
+
+    @Override
+    public void prepare(Map conf) {
+        userImpersonationACL = new HashMap<String, ImpersonationACL>();
+
+        Map<String, Map<String, List<String>>> userToHostAndGroup = (Map<String, Map<String, List<String>>>) conf.get(Config.NIMBUS_IMPERSONATION_ACL);
+
+        if (userToHostAndGroup != null) {
+            for (String user : userToHostAndGroup.keySet()) {
+                Set<String> groups = ImmutableSet.copyOf(userToHostAndGroup.get(user).get("groups"));
+                Set<String> hosts = ImmutableSet.copyOf(userToHostAndGroup.get(user).get("hosts"));
+                userImpersonationACL.put(user, new ImpersonationACL(user, groups, hosts));
+            }
+        }
+
+        _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+        _groupMappingProvider = AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
+    }
+
+    @Override
+    public boolean permit(ReqContext context, String operation, Map topology_conf) {
+        if (!context.isImpersonating()) {
+            LOG.debug("Not an impersonation attempt.");
+            return true;
+        }
+
+        String impersonatingPrincipal = context.realPrincipal().getName();
+        String impersonatingUser = _ptol.toLocal(context.realPrincipal());
+        String userBeingImpersonated = _ptol.toLocal(context.principal());
+        InetAddress remoteAddress = context.remoteAddress();
+
+        LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}",
+                impersonatingUser, impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress);
+
+        /**
+         * no config is present for impersonating principal or user, do not permit impersonation.
+         */
+        if (!userImpersonationACL.containsKey(impersonatingPrincipal) && !userImpersonationACL.containsKey(impersonatingUser)) {
+            LOG.info("user = {}, principal = {} is trying to impersonate user {}, but config {} does not have entry for impersonating user or principal." +
+                    "Please see SECURITY.MD to learn how to configure users for impersonation."
+                    , impersonatingUser, impersonatingPrincipal, userBeingImpersonated, Config.NIMBUS_IMPERSONATION_ACL);
+            return false;
+        }
+
+        ImpersonationACL principalACL = userImpersonationACL.get(impersonatingPrincipal);
+        ImpersonationACL userACL = userImpersonationACL.get(impersonatingUser);
+
+        Set<String> authorizedHosts = new HashSet<String>();
+        Set<String> authorizedGroups = new HashSet<String>();
+
+        if (principalACL != null) {
+            authorizedHosts.addAll(principalACL.authorizedHosts);
+            authorizedGroups.addAll(principalACL.authorizedGroups);
+        }
+
+        if (userACL != null) {
+            authorizedHosts.addAll(userACL.authorizedHosts);
+            authorizedGroups.addAll(userACL.authorizedGroups);
+        }
+
+        LOG.debug("user = {}, principal = {} is allowed to impersonate groups = {} from hosts = {} ",
+                impersonatingUser, impersonatingPrincipal, authorizedGroups, authorizedHosts);
+
+        if (!isAllowedToImpersonateFromHost(authorizedHosts, remoteAddress)) {
+            LOG.info("user = {}, principal = {} is not allowed to impersonate from host {} ",
+                    impersonatingUser, impersonatingPrincipal, remoteAddress);
+            return false;
+        }
+
+        if (!isAllowedToImpersonateUser(authorizedGroups, userBeingImpersonated)) {
+            LOG.info("user = {}, principal = {} is not allowed to impersonate any group that user {} is part of.",
+                    impersonatingUser, impersonatingPrincipal, userBeingImpersonated);
+            return false;
+        }
+
+        LOG.info("Allowing impersonation of user {} by user {}", userBeingImpersonated, impersonatingUser);
+        return true;
+    }
+
+    private boolean isAllowedToImpersonateFromHost(Set<String> authorizedHosts, InetAddress remoteAddress) {
+        return authorizedHosts.contains(remoteAddress.getCanonicalHostName()) ||
+                authorizedHosts.contains(remoteAddress.getHostName()) ||
+                authorizedHosts.contains(remoteAddress.getHostAddress());
+    }
+
+    private boolean isAllowedToImpersonateUser(Set<String> authorizedGroups, String userBeingImpersonated) {
+        Set<String> groups = null;
+        try {
+            groups = _groupMappingProvider.getGroups(userBeingImpersonated);
+        } catch (IOException e) {
+            throw new RuntimeException("failed to get groups for user " + userBeingImpersonated);
+        }
+
+        if (groups == null || groups.isEmpty()) {
+            return false;
+        }
+
+        for (String group : groups) {
+            if (authorizedGroups.contains(group)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    protected class ImpersonationACL {
+        public String impersonatingUser;
+        //Groups this user is authorized to impersonate.
+        public Set<String> authorizedGroups;
+        //Hosts this user is authorized to impersonate from.
+        public Set<String> authorizedHosts;
+
+        private ImpersonationACL(String impersonatingUser, Set<String> authorizedGroups, Set<String> authorizedHosts) {
+            this.impersonatingUser = impersonatingUser;
+            this.authorizedGroups = authorizedGroups;
+            this.authorizedHosts = authorizedHosts;
+        }
+
+        @Override
+        public String toString() {
+            return "ImpersonationACL{" +
+                    "impersonatingUser='" + impersonatingUser + '\'' +
+                    ", authorizedGroups=" + authorizedGroups +
+                    ", authorizedHosts=" + authorizedHosts +
+                    '}';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
index 1a3433e..25114b9 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
@@ -50,6 +50,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
     protected Set<String> _supervisors;
     protected IPrincipalToLocal _ptol;
     protected IGroupMappingServiceProvider _groupMappingProvider;
+    protected ImpersonationAuthorizer _impersonationAuthorizer;
     /**
      * Invoked once immediately after construction
      * @param conf Storm configuration
@@ -68,25 +69,32 @@ public class SimpleACLAuthorizer implements IAuthorizer {
 
         _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
         _groupMappingProvider = AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
+        _impersonationAuthorizer = new ImpersonationAuthorizer();
+        _impersonationAuthorizer.prepare(conf);
     }
 
     /**
      * permit() method is invoked for each incoming Thrift request
      * @param context request context includes info about
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology
+     * @param topology_conf configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     @Override
     public boolean permit(ReqContext context, String operation, Map topology_conf) {
-        LOG.info("[req "+ context.requestID()+ "] Access "
-                 + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
-                 + (context.principal() == null? "" : (" principal:"+ context.principal()))
-                 +" op:"+operation
-                 + (topology_conf == null? "" : (" topoology:"+topology_conf.get(Config.TOPOLOGY_NAME))));
+        LOG.info("[req " + context.requestID() + "] Access "
+                + " from: " + (context.remoteAddress() == null ? "null" : context.remoteAddress().toString())
+                + (context.principal() == null ? "" : (" principal:" + context.principal()))
+                + " op:" + operation
+                + (topology_conf == null ? "" : (" topoology:" + topology_conf.get(Config.TOPOLOGY_NAME))));
 
         String principal = context.principal().getName();
         String user = _ptol.toLocal(context.principal());
+
+        if(!_impersonationAuthorizer.permit(context, operation, topology_conf)) {
+            return false;
+        }
+
         if (_admins.contains(principal) || _admins.contains(user)) {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
index cb68579..ad642d8 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -51,10 +51,10 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
     }
 
     @Override
-    public TTransport connect(TTransport transport, String serverHost) throws TTransportException, IOException {
+    public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
         ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
-        TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST, 
-                null, 
+        TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST,
+                null,
                 AuthUtils.SERVICE, 
                 serverHost,
                 null,

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
index a0e4839..0ff3d35 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
@@ -20,8 +20,13 @@ package backtype.storm.security.auth.digest;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+
+import backtype.storm.security.auth.ReqContext;
+import backtype.storm.security.auth.SaslTransportPlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -107,8 +112,19 @@ public class ServerCallbackHandler implements CallbackHandler {
 
     private void handleAuthorizeCallback(AuthorizeCallback ac) {
         String authenticationID = ac.getAuthenticationID();
-        LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
-        ac.setAuthorizedID(authenticationID);
+        LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID());
+
+        //if authorizationId is not set, set it to authenticationId.
+        if(ac.getAuthorizationID() == null) {
+            ac.setAuthorizedID(authenticationID);
+        }
+
+        //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
+        //add the authNid as the real user in reqContext's subject which will be used during authorization.
+        if(!authenticationID.equals(ac.getAuthorizationID())) {
+            ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
+        }
+
         ac.setAuthorized(true);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 451f87b..b6cccad 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -32,6 +32,8 @@ import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
 import javax.security.sasl.Sasl;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.transport.TSaslClientTransport;
 import org.apache.thrift.transport.TSaslServerTransport;
 import org.apache.thrift.transport.TTransport;
@@ -51,7 +53,7 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
 
     public TTransportFactory getServerTransportFactory() throws IOException {
         //create an authentication callback handler
-        CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf);
+        CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, storm_conf);
         
         //login our principal
         Subject subject = null;
@@ -92,7 +94,8 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         return wrapFactory;
     }
 
-    public TTransport connect(TTransport transport, String serverHost) throws TTransportException, IOException {
+    @Override
+    public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
         //create an authentication callback handler
         ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
         
@@ -114,7 +117,7 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
                         +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
         }
 
-        final String principal = getPrincipal(subject); 
+        final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
         String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
         if (serviceName == null) {
             serviceName = AuthUtils.SERVICE; 
@@ -138,7 +141,7 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
                     new PrivilegedExceptionAction<Void>() {
                 public Void run() {
                     try {
-                        LOG.debug("do as:"+ principal);
+                        LOG.info("do as:"+ principal);
                         sasalTransport.open();
                     }
                     catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
index 9dc75c4..7b143f0 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -18,22 +18,19 @@
 
 package backtype.storm.security.auth.kerberos;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.ReqContext;
+import backtype.storm.security.auth.SaslTransportPlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.*;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-
-import backtype.storm.security.auth.AuthUtils;
+import java.io.IOException;
+import java.util.Map;
 
 /**
  * SASL server side callback handler
@@ -43,7 +40,7 @@ public class ServerCallbackHandler implements CallbackHandler {
 
     private String userName;
 
-    public ServerCallbackHandler(Configuration configuration) throws IOException {
+    public ServerCallbackHandler(Configuration configuration, Map stormConf) throws IOException {
         if (configuration==null) return;
 
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
@@ -52,6 +49,7 @@ public class ServerCallbackHandler implements CallbackHandler {
             LOG.error(errorMessage);
             throw new IOException(errorMessage);
         }
+
     }
 
     public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
@@ -78,9 +76,19 @@ public class ServerCallbackHandler implements CallbackHandler {
 
     private void handleAuthorizeCallback(AuthorizeCallback ac) {
         String authenticationID = ac.getAuthenticationID();
-        LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
-        ac.setAuthorized(true);
+        LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID());
+
+        //if authorizationId is not set, set it to authenticationId.
+        if(ac.getAuthorizationID() == null) {
+            ac.setAuthorizedID(authenticationID);
+        }
 
-        ac.setAuthorizedID(authenticationID);
+        //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
+        //add the authNid as the real user in reqContext's subject which will be used during authorization.
+        if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+            ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
+        }
+
+        ac.setAuthorized(true);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
index 3218e49..b2a2a7d 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
@@ -41,7 +41,7 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
     }
 
     public DRPCClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
-        super(conf, ThriftConnectionType.DRPC, host, port, timeout);
+        super(conf, ThriftConnectionType.DRPC, host, port, timeout, null);
         this.host = host;
         this.port = port;
         this.client = new DistributedRPC.Client(_protocol);

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index 273e232..b171353 100644
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@ -30,6 +30,7 @@ public class NimbusClient extends ThriftClient {
     private Nimbus.Client _client;
     private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
 
+
     public static NimbusClient getConfiguredClient(Map conf) {
         try {
             String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
@@ -39,17 +40,31 @@ public class NimbusClient extends ThriftClient {
         }
     }
 
+    public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
+        try {
+            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
+            return new NimbusClient(conf, nimbusHost, null, null, asUser);
+        } catch (TTransportException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
     public NimbusClient(Map conf, String host, int port) throws TTransportException {
         this(conf, host, port, null);
     }
 
     public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
-        super(conf, ThriftConnectionType.NIMBUS, host, port, timeout);
+        super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null);
+        _client = new Nimbus.Client(_protocol);
+    }
+
+    public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException {
+        super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser);
         _client = new Nimbus.Client(_protocol);
     }
 
     public NimbusClient(Map conf, String host) throws TTransportException {
-        super(conf, ThriftConnectionType.NIMBUS, host, null, null);
+        super(conf, ThriftConnectionType.NIMBUS, host, null, null, null);
         _client = new Nimbus.Client(_protocol);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/086e9e58/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
index 7ad3aa4..2aa6e5b 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/auth_test.clj
@@ -16,7 +16,9 @@
 (ns backtype.storm.security.auth.auth-test
   (:use [clojure test])
   (:require [backtype.storm.daemon [nimbus :as nimbus]])
-  (:import [org.apache.thrift TException])
+  (:import [org.apache.thrift TException]
+           [backtype.storm.security.auth.authorizer ImpersonationAuthorizer]
+           [java.net Inet4Address])
   (:import [org.apache.thrift.transport TTransportException])
   (:import [java.nio ByteBuffer])
   (:import [java.security Principal AccessController])
@@ -64,7 +66,7 @@
      :scheduler nil
      }))
 
-(defn dummy-service-handler 
+(defn dummy-service-handler
   ([conf inimbus auth-context]
      (let [nimbus-d (nimbus-data conf inimbus)
            topo-conf (atom nil)]
@@ -73,55 +75,55 @@
                                         ^SubmitOptions submitOptions]
            (if (not (nil? serializedConf)) (swap! topo-conf (fn [prev new] new) (from-json serializedConf)))
            (nimbus/check-authorization! nimbus-d storm-name @topo-conf "submitTopology" auth-context))
-         
+
          (^void killTopology [this ^String storm-name]
            (nimbus/check-authorization! nimbus-d storm-name @topo-conf "killTopology" auth-context))
-         
+
          (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
            (nimbus/check-authorization! nimbus-d storm-name @topo-conf "killTopology" auth-context))
-         
+
          (^void rebalance [this ^String storm-name ^RebalanceOptions options]
            (nimbus/check-authorization! nimbus-d storm-name @topo-conf "rebalance" auth-context))
-         
+
          (activate [this storm-name]
            (nimbus/check-authorization! nimbus-d storm-name @topo-conf "activate" auth-context))
-         
+
          (deactivate [this storm-name]
            (nimbus/check-authorization! nimbus-d storm-name @topo-conf "deactivate" auth-context))
 
          (uploadNewCredentials [this storm-name creds]
-           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "uploadNewCredentials" auth-context)) 
-         
+           (nimbus/check-authorization! nimbus-d storm-name @topo-conf "uploadNewCredentials" auth-context))
+
          (beginFileUpload [this])
-         
+
          (^void uploadChunk [this ^String location ^ByteBuffer chunk])
-         
+
          (^void finishFileUpload [this ^String location])
-         
+
          (^String beginFileDownload [this ^String file]
            (nimbus/check-authorization! nimbus-d nil nil "fileDownload" auth-context)
            "Done!")
-         
+
          (^ByteBuffer downloadChunk [this ^String id])
-         
+
          (^String getNimbusConf [this])
-         
+
          (^String getTopologyConf [this ^String id])
-         
+
          (^StormTopology getTopology [this ^String id])
-         
+
          (^StormTopology getUserTopology [this ^String id])
-         
+
          (^ClusterSummary getClusterInfo [this])
-         
+
          (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
   ([conf inimbus]
      (dummy-service-handler conf inimbus nil)))
-     
 
-(defn launch-server [server-port login-cfg aznClass transportPluginClass serverConf] 
+
+(defn launch-server [server-port login-cfg aznClass transportPluginClass serverConf]
   (let [conf1 (merge (read-storm-config)
-                     {NIMBUS-AUTHORIZER aznClass 
+                     {NIMBUS-AUTHORIZER aznClass
                       NIMBUS-HOST "localhost"
                       NIMBUS-THRIFT-PORT server-port
                       STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
@@ -129,9 +131,9 @@
         conf (if serverConf (merge conf2 serverConf) conf2)
         nimbus (nimbus/standalone-nimbus)
         service-handler (dummy-service-handler conf nimbus)
-        server (ThriftServer. 
-                conf 
-                (Nimbus$Processor. service-handler) 
+        server (ThriftServer.
+                conf
+                (Nimbus$Processor. service-handler)
                 ThriftConnectionType/NIMBUS)]
     (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
     (.start (Thread. #(.serve server)))
@@ -160,7 +162,7 @@
             nimbus_client (.getClient client)]
         (.activate nimbus_client "security_auth_test_topology")
         (.close client))
-      
+
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
                                "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"
@@ -168,11 +170,11 @@
         (testing "(Negative authentication) Server: Simple vs. Client: Digest"
           (is (thrown-cause?  org.apache.thrift.transport.TTransportException
                               (NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
-  
-(deftest negative-whitelist-authorization-test 
+
+(deftest negative-whitelist-authorization-test
   (let [a-port (available-port)]
     (with-server [a-port nil
-                  "backtype.storm.security.auth.authorizer.SimpleWhitelistAuthorizer" 
+                  "backtype.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
                   "backtype.storm.testing.SingleUserSimpleTransport" nil]
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.testing.SingleUserSimpleTransport"})
@@ -183,10 +185,10 @@
                              (.activate nimbus_client "security_auth_test_topology"))))
         (.close client)))))
 
-(deftest positive-whitelist-authorization-test 
+(deftest positive-whitelist-authorization-test
     (let [a-port (available-port)]
       (with-server [a-port nil
-                    "backtype.storm.security.auth.authorizer.SimpleWhitelistAuthorizer" 
+                    "backtype.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
                     "backtype.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}]
         (let [storm-conf (merge (read-storm-config)
                                 {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.testing.SingleUserSimpleTransport"})
@@ -205,7 +207,7 @@
         supervisor-user (mk-subject "supervisor")
         user-a (mk-subject "user-a")
         user-b (mk-subject "user-b")]
-  (.prepare authorizer cluster-conf) 
+  (.prepare authorizer cluster-conf)
   (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
   (is (= true (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
   (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
@@ -292,7 +294,7 @@
                         NIMBUS-SUPERVISOR-USERS ["admin"]})
         authorizer (SimpleACLAuthorizer. )
         admin-user (mk-subject "admin")]
-  (.prepare authorizer cluster-conf) 
+  (.prepare authorizer cluster-conf)
   (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
   (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
   (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
@@ -310,10 +312,10 @@
 ))
 
 
-(deftest positive-authorization-test 
+(deftest positive-authorization-test
   (let [a-port (available-port)]
     (with-server [a-port nil
-                  "backtype.storm.security.auth.authorizer.NoopAuthorizer" 
+                  "backtype.storm.security.auth.authorizer.NoopAuthorizer"
                   "backtype.storm.security.auth.SimpleTransportPlugin" nil]
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
@@ -323,10 +325,10 @@
           (.activate nimbus_client "security_auth_test_topology"))
         (.close client)))))
 
-(deftest deny-authorization-test 
+(deftest deny-authorization-test
   (let [a-port (available-port)]
     (with-server [a-port nil
-                  "backtype.storm.security.auth.authorizer.DenyAuthorizer" 
+                  "backtype.storm.security.auth.authorizer.DenyAuthorizer"
                   "backtype.storm.security.auth.SimpleTransportPlugin" nil]
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
@@ -343,7 +345,7 @@
 (deftest digest-authentication-test
   (let [a-port (available-port)]
     (with-server [a-port
-                  "test/clj/backtype/storm/security/auth/jaas_digest.conf" 
+                  "test/clj/backtype/storm/security/auth/jaas_digest.conf"
                   nil
                   "backtype.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
       (let [storm-conf (merge (read-storm-config)
@@ -355,7 +357,7 @@
         (testing "(Positive authentication) valid digest authentication"
           (.activate nimbus_client "security_auth_test_topology"))
         (.close client))
-      
+
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
                                STORM-NIMBUS-RETRY-TIMES 0})
@@ -365,7 +367,7 @@
           (is (thrown-cause? org.apache.thrift.transport.TTransportException
                              (.activate nimbus_client "security_auth_test_topology"))))
         (.close client))
-      
+
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
                                "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf"
@@ -373,13 +375,13 @@
         (testing "(Negative authentication) Invalid  password"
           (is (thrown-cause? TTransportException
                              (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-      
+
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
                                "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf"
                                STORM-NIMBUS-RETRY-TIMES 0})]
         (testing "(Negative authentication) Unknown user"
-          (is (thrown-cause? TTransportException 
+          (is (thrown-cause? TTransportException
                              (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
 
       (let [storm-conf (merge (read-storm-config)
@@ -387,9 +389,9 @@
                                "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/nonexistent.conf"
                                STORM-NIMBUS-RETRY-TIMES 0})]
         (testing "(Negative authentication) nonexistent configuration file"
-          (is (thrown-cause? RuntimeException 
+          (is (thrown-cause? RuntimeException
                              (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-      
+
       (let [storm-conf (merge (read-storm-config)
                               {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
                                "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf"
@@ -397,8 +399,46 @@
         (testing "(Negative authentication) Missing client"
           (is (thrown-cause? java.io.IOException
                              (NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
-        
+
 (deftest test-GetTransportPlugin-throws-RuntimeException
   (let [conf (merge (read-storm-config)
                     {Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
     (is (thrown-cause? RuntimeException (AuthUtils/GetTransportPlugin conf nil nil)))))
+
+(defn mk-impersonating-req-context [impersonating-user user-being-impersonated remote-address]
+  (let [impersonating-principal (mk-principal impersonating-user)
+        principal-being-impersonated (mk-principal user-being-impersonated)
+        subject (Subject. true #{principal-being-impersonated} #{} #{})
+        req_context (ReqContext. subject)]
+    (.setRemoteAddress req_context remote-address)
+    (.setRealPrincipal req_context impersonating-principal)
+    req_context))
+
+(deftest impersonation-authorizer-test
+  (let [impersonating-user "admin"
+        user-being-impersonated (System/getProperty "user.name")
+        groups (ShellBasedGroupsMapping.)
+        _ (.prepare groups (read-storm-config))
+        groups (.getGroups groups user-being-impersonated)
+        cluster-conf (merge (read-storm-config)
+                       {Config/NIMBUS_IMPERSONATION_ACL {impersonating-user {"hosts" [ (.getHostName (InetAddress/getLocalHost))]
+                                                                            "groups" groups}}})
+        authorizer (ImpersonationAuthorizer. )
+        unauthorized-host (com.google.common.net.InetAddresses/forString "10.10.10.10")
+        ]
+
+    (.prepare authorizer cluster-conf)
+    ;;non impersonating request, should be permitted.
+    (is (= true (.permit authorizer (ReqContext. (mk-subject "anyuser")) "fileUpload" nil)))
+
+    ;;user with no impersonation acl should be reject
+    (is (= false (.permit authorizer (mk-impersonating-req-context "user-with-no-acl" user-being-impersonated (InetAddress/getLocalHost)) "someOperation" nil)))
+
+    ;;request from hosts that are not authorized should be rejected, commented because
+    (is (= false (.permit authorizer (mk-impersonating-req-context impersonating-user user-being-impersonated unauthorized-host) "someOperation" nil)))
+
+    ;;request to impersonate users from unauthroized groups should be rejected.
+    (is (= false (.permit authorizer (mk-impersonating-req-context impersonating-user "unauthroized-user" (InetAddress/getLocalHost)) "someOperation" nil)))
+
+    ;;request from authorized hosts and group should be allowed.
+    (is (= true (.permit authorizer (mk-impersonating-req-context impersonating-user user-being-impersonated (InetAddress/getLocalHost)) "someOperation" nil)))))


Mime
View raw message