storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [2/6] storm git commit: [STORM-2482] Refactor the Storm auto credential plugins to be more usable
Date Mon, 08 May 2017 05:05:46 GMT
[STORM-2482] Refactor the Storm auto credential plugins to be more usable

1. Create a new storm module storm-autocreds
2. Move AutoHDFS and AutoHBase to storm-autocreds
3. Refactor code and accepts config keys for customizing the hadoop configuration for the plugins
4. Package the auto cred and dependency jars in the storm binary and deploy to lib-autocreds


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a0122aed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a0122aed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a0122aed

Branch: refs/heads/master
Commit: a0122aed59ea0e151a6017593afec550f88e5081
Parents: 2c597e5
Author: Arun Mahadevan <arunm@apache.org>
Authored: Tue Apr 18 13:29:56 2017 +0530
Committer: Arun Mahadevan <arunm@apache.org>
Committed: Wed May 3 14:13:31 2017 +0530

----------------------------------------------------------------------
 docs/SECURITY.md                                |  18 +-
 docs/storm-hbase.md                             |  35 ++-
 docs/storm-hdfs.md                              |  43 ++-
 external/storm-autocreds/pom.xml                | 103 +++++++
 .../apache/storm/common/AbstractAutoCreds.java  | 250 +++++++++++++++++
 .../apache/storm/hbase/security/AutoHBase.java  | 179 ++++++++++++
 .../storm/hbase/security/HBaseSecurityUtil.java |  73 +++++
 .../apache/storm/hdfs/security/AutoHDFS.java    | 216 ++++++++++++++
 .../storm/hdfs/security/HdfsSecurityUtil.java   |  69 +++++
 external/storm-hbase/pom.xml                    |   5 +
 .../apache/storm/hbase/security/AutoHBase.java  | 243 ----------------
 .../storm/hbase/security/HBaseSecurityUtil.java |  72 -----
 external/storm-hdfs/pom.xml                     |   5 +
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |   2 +-
 .../storm/hdfs/common/security/AutoHDFS.java    | 281 -------------------
 .../hdfs/common/security/HdfsSecurityUtil.java  |  67 -----
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |   2 +-
 .../apache/storm/hdfs/trident/HdfsState.java    |   2 +-
 pom.xml                                         |   1 +
 .../final-package/src/main/assembly/binary.xml  |   9 +-
 20 files changed, 982 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/docs/SECURITY.md
----------------------------------------------------------------------
diff --git a/docs/SECURITY.md b/docs/SECURITY.md
index 66566ce..e73e873 100644
--- a/docs/SECURITY.md
+++ b/docs/SECURITY.md
@@ -424,16 +424,18 @@ nimbus.impersonation.acl:
 
 ### Automatic Credentials Push and Renewal
 Individual topologies have the ability to push credentials (tickets and tokens) to workers so that they can access secure services.  Exposing this to all of the users can be a pain for them.
-To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed.
-These are controlled by the following configs. topology.auto-credentials is a list of java plugins, all of which must implement IAutoCredentials interface, that populate the credentials on gateway 
-and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to org.apache.storm.security.auth.kerberos.AutoTGT.  
-nimbus.credential.renewers.classes should also be set to this value so that nimbus can periodically renew the TGT on behalf of the user.
+To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed. These are controlled by the following configs.
+ 
+`topology.auto-credentials` is a list of java plugins, all of which must implement the `IAutoCredentials` interface, that populate the credentials on gateway 
+and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to `org.apache.storm.security.auth.kerberos.AutoTGT`
+
+`nimbus.credential.renewers.classes` should also be set to `org.apache.storm.security.auth.kerberos.AutoTGT` so that nimbus can periodically renew the TGT on behalf of the user.
 
-nimbus.credential.renewers.freq.secs controls how often the renewer will poll to see if anything needs to be renewed, but the default should be fine.
+`nimbus.credential.renewers.freq.secs` controls how often the renewer will poll to see if anything needs to be renewed, but the default should be fine.
 
-In addition Nimbus itself can be used to get credentials on behalf of the user submitting topologies. This can be configures using nimbus.autocredential.plugins.classes which is a list 
-of fully qualified class names ,all of which must implement INimbusCredentialPlugin.  Nimbus will invoke the populateCredentials method of all the configured implementation as part of topology
-submission. You should use this config with topology.auto-credentials and nimbus.credential.renewers.classes so the credentials can be populated on worker side and nimbus can automatically renew
+In addition Nimbus itself can be used to get credentials on behalf of the user submitting topologies. This can be configured using `nimbus.autocredential.plugins.classes` which is a list 
+of fully qualified class names, all of which must implement `INimbusCredentialPlugin`.  Nimbus will invoke the populateCredentials method of all the configured implementation as part of topology
+submission. You should use this config with `topology.auto-credentials` and `nimbus.credential.renewers.classes` so the credentials can be populated on worker side and nimbus can automatically renew
 them. Currently there are 2 examples of using this config, AutoHDFS and AutoHBase which auto populates hdfs and hbase delegation tokens for topology submitter so they don't have to distribute keytabs
 on all possible worker hosts.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/docs/storm-hbase.md
----------------------------------------------------------------------
diff --git a/docs/storm-hbase.md b/docs/storm-hbase.md
index a1b0764..0fb2c14 100644
--- a/docs/storm-hbase.md
+++ b/docs/storm-hbase.md
@@ -56,22 +56,43 @@ The approach described above requires that all potential worker hosts have "stor
 multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute
 it to all workers. Instead of doing that you could use the following approach:
 
-Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
-The nimbus need to start with following configurations:
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. The nimbus should be started with following configurations:
 
+```
 nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
 nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
 hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
 hbase.kerberos.principal: "superuser@EXAMPLE.com"
-nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
-if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is 
-atleast 1 hour less then that.)
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed,  if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is atleast 1 hour less then that.)
+```
 
 Your topology configuration should have:
-topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
+
+```
+topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]
+```
 
 If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration 
-files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. 
+files(core-site.xml, hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath.
+
+As an alternative to adding the configuration files (core-site.xml, hdfs-site.xml and hbase-site.xml) to the classpath, you could specify the configurations as a part of the topology configuration. E.g. in you custom storm.yaml (or -c option while submitting the topology),
+
+```
+hbaseCredentialsConfigKeys : ["cluster1", "cluster2"] (the hbase clusters you want to fetch the tokens from)
+cluster1: [{"config1": "value1", "config2": "value2", ... }] (A map of config key-values specific to cluster1)
+cluster2: [{"config1": "value1", "hbase.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs", "hbase.kerberos.principal": "cluster2user@EXAMPLE.com"}] (here along with other configs, we have custom keytab and principal for "cluster2" which will override the keytab/principal specified at topology level)
+```
+
+Instead of specifying key values you may also directly specify the resource files for e.g.,
+
+```
+cluster1: [{"resources": ["/path/to/core-site1.xml", "/path/to/hbase-site1.xml"]}]
+cluster2: [{"resources": ["/path/to/core-site2.xml", "/path/to/hbase-site2.xml"]}]
+```
+
+Storm will download the tokens separately for each of the clusters and populate it into the subject and also renew the tokens periodically. 
+This way it would be possible to run multiple bolts connecting to separate HBase cluster within the same topology.
+
 Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every
 topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
 topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/docs/storm-hdfs.md
----------------------------------------------------------------------
diff --git a/docs/storm-hdfs.md b/docs/storm-hdfs.md
index d2c7567..8391efd 100644
--- a/docs/storm-hdfs.md
+++ b/docs/storm-hdfs.md
@@ -412,23 +412,44 @@ If your topology is going to interact with secure HDFS, your bolts/states needs
 currently have 2 options to support this:
 
 ### Using HDFS delegation tokens 
-Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
-The nimbus need to start with following configurations:
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. The nimbus should be started with following configurations:
 
-nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
-nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+```
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.security.AutoHDFS"]
+nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.security.AutoHDFS"]
 hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
 hdfs.kerberos.principal: "superuser@EXAMPLE.com" 
-nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be
-less then 24 hours.)
-topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property
-specified in hadoop's core-site.xml)
+nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be less then 24 hours.)
+topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property specified in hadoop's core-site.xml)
+```
 
 Your topology configuration should have:
-topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] 
 
-If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hadoop configuration 
-files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath. 
+```
+topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"]
+```
+
+If nimbus did not have the above configuration you need to add and then restart it. Ensure the hadoop configuration 
+files (core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath.
+
+As an alternative to adding the configuration files (core-site.xml and hdfs-site.xml) to the classpath, you could specify the configurations
+as a part of the topology configuration. E.g. in you custom storm.yaml (or -c option while submitting the topology),
+
+```
+hdfsCredentialsConfigKeys : ["cluster1", "cluster2"] (the hdfs clusters you want to fetch the tokens from)
+cluster1: [{"config1": "value1", "config2": "value2", ... }] (A map of config key-values specific to cluster1)
+cluster2: [{"config1": "value1", "hdfs.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs", "hdfs.kerberos.principal": "cluster2user@EXAMPLE.com"}] (here along with other configs, we have custom keytab and principal for "cluster2" which will override the keytab/principal specified at topology level)
+```
+
+Instead of specifying key values you may also directly specify the resource files for e.g.,
+
+```
+cluster1: [{"resources": ["/path/to/core-site1.xml", "/path/to/hdfs-site1.xml"]}]
+cluster2: [{"resources": ["/path/to/core-site2.xml", "/path/to/hdfs-site2.xml"]}]
+```
+
+Storm will download the tokens separately for each of the clusters and populate it into the subject and also renew the tokens periodically. This way it would be possible to run multiple bolts connecting to separate HDFS cluster within the same topology.
+
 Nimbus will use the keytab and principal specified in the config to authenticate with Namenode. From then on for every
 topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
 topology submitter user. If topology was started with topology.auto-credentials set to AutoHDFS, nimbus will push the

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml
new file mode 100644
index 0000000..3762cfd
--- /dev/null
+++ b/external/storm-autocreds/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.1-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>storm-autocreds</artifactId>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>${provided.scope}</scope>
+            <exclusions>
+                <!--log4j-over-slf4j must be excluded for hadoop-minicluster
+                    see: http://stackoverflow.com/q/20469026/3542091 -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+    <build>
+    <plugins>
+       <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>appassembler-maven-plugin</artifactId>
+          <version>1.9</version>
+          <executions>
+                    <execution>
+                        <id>create-repo</id>
+                        <goals>
+                            <goal>create-repository</goal>
+                        </goals>
+                        <configuration>
+                            <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
+                            <repositoryLayout>flat</repositoryLayout>
+                        </configuration>
+                    </execution>
+          </executions>
+        </plugin>
+    </plugins>
+   </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
new file mode 100644
index 0000000..816e263
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.common;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.storm.security.INimbusCredentialPlugin;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The base class that for auto credential plugins that abstracts out some of the common functionality.
+ */
+public abstract class AbstractAutoCreds implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractAutoCreds.class);
+    public static final String CONFIG_KEY_RESOURCES = "resources";
+
+    private List<String> configKeys = new ArrayList<>();
+    private Map<String, Map<String, Object>> configMap = new HashMap<>();
+
+    @Override
+    public void prepare(Map conf) {
+        doPrepare(conf);
+        String configKeyString = getConfigKeyString();
+        if (conf.containsKey(configKeyString)) {
+            configKeys.addAll((List<String>) conf.get(configKeyString));
+            for (String key : configKeys) {
+                if (conf.containsKey(key)) {
+                    Map<String, Object> config = (Map<String, Object>) conf.get(key);
+                    configMap.put(key, config);
+                    LOG.info("configKey = {}, config = {}", key, config);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map conf) {
+        try {
+            if (configKeys != null) {
+                Map<String, Object> updatedConf = updateConfigs(conf);
+                for (String configKey : configKeys) {
+                    credentials.put(getCredentialKey(configKey),
+                            DatatypeConverter.printBase64Binary(getHadoopCredentials(updatedConf, configKey)));
+                }
+            } else {
+                credentials.put(getCredentialKey(StringUtils.EMPTY),
+                        DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+            }
+            LOG.info("Tokens added to credentials map.");
+        } catch (Exception e) {
+            LOG.error("Could not populate credentials.", e);
+        }
+    }
+
+    private Map<String, Object> updateConfigs(Map topologyConf) {
+        Map<String, Object> res = new HashMap<>(topologyConf);
+        if (configKeys != null) {
+            for (String configKey : configKeys) {
+                if (!res.containsKey(configKey) && configMap.containsKey(configKey)) {
+                    res.put(configKey, configMap.get(configKey));
+                }
+            }
+        }
+        return res;
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        doRenew(credentials, updateConfigs(topologyConf));
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials) {
+        credentials.put(getCredentialKey(StringUtils.EMPTY),
+                DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void populateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void updateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
+    }
+
+    protected Set<Pair<String, Credentials>> getCredentials(Map<String, String> credentials) {
+        Set<Pair<String, Credentials>> res = new HashSet<>();
+        if (configKeys != null) {
+            for (String configKey : configKeys) {
+                Credentials cred = doGetCredentials(credentials, configKey);
+                if (cred != null) {
+                    res.add(new Pair(configKey, cred));
+                }
+            }
+        } else {
+            Credentials cred = doGetCredentials(credentials, StringUtils.EMPTY);
+            if (cred != null) {
+                res.add(new Pair(StringUtils.EMPTY, cred));
+            }
+        }
+        return res;
+    }
+
+    protected void fillHadoopConfiguration(Map topoConf, String configKey, Configuration configuration) {
+        Map<String, Object> config = (Map<String, Object>) topoConf.get(configKey);
+        LOG.info("TopoConf {}, got config {}, for configKey {}", topoConf, config, configKey);
+        if (config != null) {
+            List<String> resourcesToLoad = new ArrayList<>();
+            for (Map.Entry<String, Object> entry : config.entrySet()) {
+                if (entry.getKey().equals(CONFIG_KEY_RESOURCES)) {
+                    resourcesToLoad.addAll((List<String>) entry.getValue());
+                } else {
+                    configuration.set(entry.getKey(), String.valueOf(entry.getValue()));
+                }
+            }
+            LOG.info("Resources to load {}", resourcesToLoad);
+            // add configs from resources like hdfs-site.xml
+            for (String pathStr : resourcesToLoad) {
+                configuration.addResource(new Path(Paths.get(pathStr).toUri()));
+            }
+        }
+        LOG.info("Initializing UGI with config {}", configuration);
+        UserGroupInformation.setConfiguration(configuration);
+    }
+
+    /**
+     * Prepare the plugin
+     *
+     * @param conf the storm cluster conf set via storm.yaml
+     */
+    protected abstract void doPrepare(Map conf);
+
+    /**
+     * The lookup key for the config key string
+     *
+     * @return the config key string
+     */
+    protected abstract String getConfigKeyString();
+
+    /**
+     * The key with which the credentials are stored in the credentials map
+     */
+    protected abstract String getCredentialKey(String configKey);
+
+    protected abstract byte[] getHadoopCredentials(Map conf, String configKey);
+
+    protected abstract byte[] getHadoopCredentials(Map conf);
+
+    protected abstract void doRenew(Map<String, String> credentials, Map topologyConf);
+
+    @SuppressWarnings("unchecked")
+    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
+        try {
+            for (Pair<String, Credentials> cred : getCredentials(credentials)) {
+                subject.getPrivateCredentials().add(cred.getSecond());
+                LOG.info("Credentials added to the subject.");
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    private void addTokensToUGI(Subject subject) {
+        if (subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            try {
+                                UserGroupInformation.getCurrentUser().addToken(token);
+                                LOG.info("Added delegation tokens to UGI.");
+                            } catch (IOException e) {
+                                LOG.error("Exception while trying to add tokens to ugi", e);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private Credentials doGetCredentials(Map<String, String> credentials, String configKey) {
+        Credentials credential = null;
+        if (credentials != null && credentials.containsKey(getCredentialKey(configKey))) {
+            try {
+                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey(configKey)));
+                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
+
+                credential = new Credentials();
+                credential.readFields(in);
+            } catch (Exception e) {
+                LOG.error("Could not obtain credentials from credentials map.", e);
+            }
+        }
+        return credential;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
new file mode 100644
index 0000000..fcbb463
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.security;
+
+import org.apache.storm.Config;
+import org.apache.storm.common.AbstractAutoCreds;
+import org.apache.storm.hdfs.security.HdfsSecurityUtil;
+import org.apache.storm.security.INimbusCredentialPlugin;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.*;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Automatically get hbase delegation tokens and push it to user's topology. The class
+ * assumes that hadoop/hbase configuration files are in your class path.
+ */
+public class AutoHBase extends AbstractAutoCreds {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHBase.class);
+
+    public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
+    public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
+    public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
+
+    public String hbaseKeytab;
+    public String hbasePrincipal;
+
+    @Override
+    public void doPrepare(Map conf) {
+        if(conf.containsKey(HBASE_KEYTAB_FILE_KEY) && conf.containsKey(HBASE_PRINCIPAL_KEY)) {
+            hbaseKeytab = (String) conf.get(HBASE_KEYTAB_FILE_KEY);
+            hbasePrincipal = (String) conf.get(HBASE_PRINCIPAL_KEY);
+        }
+    }
+
+    @Override
+    protected String getConfigKeyString() {
+        return HBaseSecurityUtil.HBASE_CREDENTIALS_CONFIG_KEYS;
+    }
+
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    protected  byte[] getHadoopCredentials(Map conf, String configKey) {
+        Configuration configuration = getHadoopConfiguration(conf, configKey);
+        return getHadoopCredentials(conf, configuration);
+    }
+
+    @Override
+    protected byte[] getHadoopCredentials(Map conf) {
+        return getHadoopCredentials(conf, HBaseConfiguration.create());
+    }
+
+    private Configuration getHadoopConfiguration(Map topoConf, String configKey) {
+        Configuration configuration = HBaseConfiguration.create();
+        fillHadoopConfiguration(topoConf, configKey, configuration);
+        return configuration;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected byte[] getHadoopCredentials(Map conf, Configuration hbaseConf) {
+        try {
+            if(UserGroupInformation.isSecurityEnabled()) {
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+                UserProvider provider = UserProvider.instantiate(hbaseConf);
+
+                if (hbaseConf.get(HBASE_KEYTAB_FILE_KEY) == null) {
+                    hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
+                }
+                if (hbaseConf.get(HBASE_PRINCIPAL_KEY) == null) {
+                    hbaseConf.set(HBASE_PRINCIPAL_KEY, hbasePrincipal);
+                }
+                provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
+
+                LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                User user = User.create(ugi);
+
+                if(user.isHBaseSecurityEnabled(hbaseConf)) {
+                    TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
+
+                    LOG.info("Obtained HBase tokens, adding to user credentials.");
+
+                    Credentials credential= proxyUser.getCredentials();
+                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                    ObjectOutputStream out = new ObjectOutputStream(bao);
+                    credential.write(out);
+                    out.flush();
+                    out.close();
+                    return bao.toByteArray();
+                } else {
+                    throw new RuntimeException("Security is not enabled for HBase.");
+                }
+            } else {
+                throw new RuntimeException("Security is not enabled for Hadoop");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    @Override
+    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+        //HBASE tokens are not renewable so we always have to get new ones.
+        populateCredentials(credentials, topologyConf);
+    }
+
+    @Override
+    protected String getCredentialKey(String configKey) {
+        return HBASE_CREDENTIALS + configKey;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
+        conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
+
+        AutoHBase autoHBase = new AutoHBase();
+        autoHBase.prepare(conf);
+
+        Map<String,String> creds  = new HashMap<String, String>();
+        autoHBase.populateCredentials(creds, conf);
+        LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
+
+        Subject s = new Subject();
+        autoHBase.populateSubject(s, creds);
+        LOG.info("Got a Subject " + s);
+
+        autoHBase.renew(creds, conf);
+        LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
new file mode 100644
index 0000000..4e0dcab
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
+
+/**
+ * This class provides util methods for storm-hbase connector communicating
+ * with secured HBase.
+ */
+public class HBaseSecurityUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);
+
+    public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
+    public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
+    public static final String HBASE_CREDENTIALS_CONFIG_KEYS = "hbaseCredentialsConfigKeys";
+    private static  UserProvider legacyProvider = null;
+
+    public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
+        //Allowing keytab based login for backward compatibility.
+        if (UserGroupInformation.isSecurityEnabled() && (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
+                !(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName())))) {
+            LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
+            //insure that if keytab is used only one login per process executed
+            if(legacyProvider == null) {
+                synchronized (HBaseSecurityUtil.class) {
+                    if(legacyProvider == null) {
+                        legacyProvider = UserProvider.instantiate(hbaseConfig);
+                        String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+                        if (keytab != null) {
+                            hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+                        }
+                        String userName = (String) conf.get(STORM_USER_NAME_KEY);
+                        if (userName != null) {
+                            hbaseConfig.set(STORM_USER_NAME_KEY, userName);
+                        }
+                        legacyProvider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
+                                InetAddress.getLocalHost().getCanonicalHostName());
+                    }
+                }
+            }
+            return legacyProvider;
+        } else {
+            return UserProvider.instantiate(hbaseConfig);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
new file mode 100644
index 0000000..e1e8512
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.security;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.storm.Config;
+import org.apache.storm.common.AbstractAutoCreds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.URI;
+import java.nio.file.Paths;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
+
+/**
+ * Automatically get HDFS delegation tokens and push it to user's topology. The class
+ * assumes that HDFS configuration files are in your class path.
+ */
+public class AutoHDFS extends AbstractAutoCreds {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHDFS.class);
+    public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
+    public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
+
+    private String hdfsKeyTab;
+    private String hdfsPrincipal;
+
+    @Override
+    public void doPrepare(Map conf) {
+        if(conf.containsKey(STORM_KEYTAB_FILE_KEY) && conf.containsKey(STORM_USER_NAME_KEY)) {
+            hdfsKeyTab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+            hdfsPrincipal = (String) conf.get(STORM_USER_NAME_KEY);
+        }
+    }
+
+    @Override
+    protected String getConfigKeyString() {
+        return HdfsSecurityUtil.HDFS_CREDENTIALS_CONFIG_KEYS;
+    }
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    protected  byte[] getHadoopCredentials(Map conf, String configKey) {
+        Configuration configuration = getHadoopConfiguration(conf, configKey);
+        return getHadoopCredentials(conf, configuration);
+    }
+
+    @Override
+    protected byte[] getHadoopCredentials(Map conf) {
+        return getHadoopCredentials(conf, new Configuration());
+    }
+
+    private Configuration getHadoopConfiguration(Map topoConf, String configKey) {
+        Configuration configuration = new Configuration();
+        fillHadoopConfiguration(topoConf, configKey, configuration);
+        return configuration;
+    }
+
+    @SuppressWarnings("unchecked")
+    private byte[] getHadoopCredentials(Map conf, final Configuration configuration) {
+        try {
+            if(UserGroupInformation.isSecurityEnabled()) {
+                login(configuration);
+
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+                final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
+                        : FileSystem.getDefaultUri(configuration);
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() {
+                    @Override
+                    public Object run() {
+                        try {
+                            FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
+                            Credentials credential= proxyUser.getCredentials();
+
+                            fileSystem.addDelegationTokens(hdfsPrincipal, credential);
+                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
+                            return credential;
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+
+
+                ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                ObjectOutputStream out = new ObjectOutputStream(bao);
+
+                creds.write(out);
+                out.flush();
+                out.close();
+
+                return bao.toByteArray();
+            } else {
+                throw new RuntimeException("Security is not enabled for HDFS");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+        for (Pair<String, Credentials> cred : getCredentials(credentials)) {
+            try {
+                Configuration configuration = getHadoopConfiguration(topologyConf, cred.getFirst());
+                Collection<Token<? extends TokenIdentifier>> tokens = cred.getSecond().getAllTokens();
+
+                if (tokens != null && !tokens.isEmpty()) {
+                    for (Token token : tokens) {
+                        //We need to re-login some other thread might have logged into hadoop using
+                        // their credentials (e.g. AutoHBase might be also part of nimbu auto creds)
+                        login(configuration);
+                        long expiration = token.renew(configuration);
+                        LOG.info("HDFS delegation token renewed, new expiration time {}", expiration);
+                    }
+                } else {
+                    LOG.debug("No tokens found for credentials, skipping renewal.");
+                }
+            } catch (Exception e) {
+                LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
+                        "renewal period so attempting to get new tokens.", e);
+                populateCredentials(credentials, topologyConf);
+            }
+        }
+    }
+
+    private void login(Configuration configuration) throws IOException {
+        if (configuration.get(STORM_KEYTAB_FILE_KEY) == null) {
+            configuration.set(STORM_KEYTAB_FILE_KEY, hdfsKeyTab);
+        }
+        if (configuration.get(STORM_USER_NAME_KEY) == null) {
+            configuration.set(STORM_USER_NAME_KEY, hdfsPrincipal);
+        }
+        SecurityUtil.login(configuration, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
+
+        LOG.info("Logged into hdfs with principal {}", configuration.get(STORM_USER_NAME_KEY));
+    }
+
+    @Override
+    protected String getCredentialKey(String configKey) {
+        return HDFS_CREDENTIALS + configKey;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
+        conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
+
+        Configuration configuration = new Configuration();
+        AutoHDFS autoHDFS = new AutoHDFS();
+        autoHDFS.prepare(conf);
+
+        Map<String,String> creds  = new HashMap<String, String>();
+        autoHDFS.populateCredentials(creds, conf);
+        LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
+
+        Subject s = new Subject();
+        autoHDFS.populateSubject(s, creds);
+        LOG.info("Got a Subject "+ s);
+
+        autoHDFS.renew(creds, conf);
+        LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
new file mode 100644
index 0000000..c0f3c79
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.security;
+
+import org.apache.storm.security.auth.kerberos.AutoTGT;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
+
+/**
+ * This class provides util methods for storm-hdfs connector communicating
+ * with secured HDFS.
+ */
+public class HdfsSecurityUtil {
+    public static final String STORM_KEYTAB_FILE_KEY = "hdfs.keytab.file";
+    public static final String STORM_USER_NAME_KEY = "hdfs.kerberos.principal";
+    public static final String HDFS_CREDENTIALS_CONFIG_KEYS = "hdfsCredentialsConfigKeys";
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsSecurityUtil.class);
+    private static AtomicBoolean isLoggedIn = new AtomicBoolean();
+    public static void login(Map conf, Configuration hdfsConfig) throws IOException {
+        //If AutoHDFS is specified, do not attempt to login using keytabs, only kept for backward compatibility.
+        if(conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
+                (!(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHDFS.class.getName())) &&
+                 !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoTGT.class.getName())))) {
+            if (UserGroupInformation.isSecurityEnabled()) {
+                // compareAndSet added because of https://issues.apache.org/jira/browse/STORM-1535
+                if (isLoggedIn.compareAndSet(false, true)) {
+                    LOG.info("Logging in using keytab as AutoHDFS is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
+                    String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+                    if (keytab != null) {
+                        hdfsConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+                    }
+                    String userName = (String) conf.get(STORM_USER_NAME_KEY);
+                    if (userName != null) {
+                        hdfsConfig.set(STORM_USER_NAME_KEY, userName);
+                    }
+                    SecurityUtil.login(hdfsConfig, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index fd6bfe5..ea49ad0 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -94,5 +94,10 @@
             <artifactId>caffeine</artifactId>
             <version>${caffeine.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-autocreds</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
deleted file mode 100644
index a2ca68e..0000000
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hbase.security;
-
-import org.apache.storm.Config;
-import org.apache.storm.security.INimbusCredentialPlugin;
-import org.apache.storm.security.auth.IAutoCredentials;
-import org.apache.storm.security.auth.ICredentialsRenewer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import javax.xml.bind.DatatypeConverter;
-import java.io.*;
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Automatically get hbase delegation tokens and push it to user's topology. The class
- * assumes that hadoop/hbase configuration files are in your class path.
- */
-public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
-    private static final Logger LOG = LoggerFactory.getLogger(AutoHBase.class);
-
-    public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
-    public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
-    public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
-
-    public String hbaseKeytab;
-    public String hbasePrincipal;
-
-    @Override
-    public void prepare(Map conf) {
-        if(conf.containsKey(HBASE_KEYTAB_FILE_KEY) && conf.containsKey(HBASE_PRINCIPAL_KEY)) {
-            hbaseKeytab = (String) conf.get(HBASE_KEYTAB_FILE_KEY);
-            hbasePrincipal = (String) conf.get(HBASE_PRINCIPAL_KEY);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        //no op.
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) {
-        try {
-            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
-        } catch (Exception e) {
-            LOG.error("Could not populate HBase credentials.", e);
-        }
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials) {
-        credentials.put(HBASE_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
-    }
-
-    /*
- *
- * @param credentials map with creds.
- * @return instance of org.apache.hadoop.security.Credentials.
- * this class's populateCredentials must have been called before.
- */
-    @SuppressWarnings("unchecked")
-    protected Object getCredentials(Map<String, String> credentials) {
-        Credentials credential = null;
-        if (credentials != null && credentials.containsKey(getCredentialKey())) {
-            try {
-                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey()));
-                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
-
-                credential = new Credentials();
-                credential.readFields(in);
-                LOG.info("Got hbase credentials from credentials Map.");
-            } catch (Exception e) {
-                LOG.error("Could not obtain credentials from credentials map.", e);
-            }
-        }
-        return credential;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void updateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void populateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
-        try {
-            Object credential = getCredentials(credentials);
-            if (credential != null) {
-                subject.getPrivateCredentials().add(credential);
-                LOG.info("Hbase credentials added to subject.");
-            } else {
-                LOG.info("No credential found in credentials map.");
-            }
-        } catch (Exception e) {
-            LOG.error("Failed to initialize and get UserGroupInformation.", e);
-        }
-    }
-
-    public void addTokensToUGI(Subject subject) {
-        if(subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            try {
-                                UserGroupInformation.getCurrentUser().addToken(token);
-                                LOG.info("Added delegation tokens to UGI.");
-                            } catch (IOException e) {
-                                LOG.error("Exception while trying to add tokens to ugi", e);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf) {
-        try {
-            final Configuration hbaseConf = HBaseConfiguration.create();
-            if(UserGroupInformation.isSecurityEnabled()) {
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
-                UserProvider provider = UserProvider.instantiate(hbaseConf);
-
-                hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
-                hbaseConf.set(HBASE_PRINCIPAL_KEY, hbasePrincipal);
-                provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
-
-                LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
-                UserGroupInformation.setConfiguration(hbaseConf);
-
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
-
-                User user = User.create(ugi);
-
-                if(user.isHBaseSecurityEnabled(hbaseConf)) {
-                    TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
-
-                    LOG.info("Obtained HBase tokens, adding to user credentials.");
-
-                    Credentials credential= proxyUser.getCredentials();
-                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
-                    ObjectOutputStream out = new ObjectOutputStream(bao);
-                    credential.write(out);
-                    out.flush();
-                    out.close();
-                    return bao.toByteArray();
-                } else {
-                    throw new RuntimeException("Security is not enabled for HBase.");
-                }
-            } else {
-                throw new RuntimeException("Security is not enabled for Hadoop");
-            }
-        } catch (Exception ex) {
-            throw new RuntimeException("Failed to get delegation tokens." , ex);
-        }
-    }
-
-    @Override
-    public void renew(Map<String, String> credentials, Map topologyConf) {
-        //HBASE tokens are not renewable so we always have to get new ones.
-        populateCredentials(credentials, topologyConf);
-    }
-
-    protected String getCredentialKey() {
-        return HBASE_CREDENTIALS;
-    }
-
-
-    @SuppressWarnings("unchecked")
-    public static void main(String[] args) throws Exception {
-        Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
-        conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
-
-        AutoHBase autoHBase = new AutoHBase();
-        autoHBase.prepare(conf);
-
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHBase.populateCredentials(creds, conf);
-        LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
-
-        Subject s = new Subject();
-        autoHBase.populateSubject(s, creds);
-        LOG.info("Got a Subject " + s);
-
-        autoHBase.renew(creds, conf);
-        LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
deleted file mode 100644
index e579015..0000000
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.security;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
-
-/**
- * This class provides util methods for storm-hbase connector communicating
- * with secured HBase.
- */
-public class HBaseSecurityUtil {
-    private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);
-
-    public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
-    public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
-    private static  UserProvider legacyProvider = null;
-
-    public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
-        //Allowing keytab based login for backward compatibility.
-        if (UserGroupInformation.isSecurityEnabled() && (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
-                !(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName())))) {
-            LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
-            //insure that if keytab is used only one login per process executed
-            if(legacyProvider == null) {
-                synchronized (HBaseSecurityUtil.class) {
-                    if(legacyProvider == null) {
-                        legacyProvider = UserProvider.instantiate(hbaseConfig);
-                        String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-                        if (keytab != null) {
-                            hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
-                        }
-                        String userName = (String) conf.get(STORM_USER_NAME_KEY);
-                        if (userName != null) {
-                            hbaseConfig.set(STORM_USER_NAME_KEY, userName);
-                        }
-                        legacyProvider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
-                                InetAddress.getLocalHost().getCanonicalHostName());
-                    }
-                }
-            }
-            return legacyProvider;
-        } else {
-            return UserProvider.instantiate(hbaseConfig);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 8b3a792..9f16cf2 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -210,6 +210,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-autocreds</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index 681d66a..395cced 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -34,7 +34,7 @@ import org.apache.storm.hdfs.common.AbstractHDFSWriter;
 import org.apache.storm.hdfs.common.NullPartitioner;
 import org.apache.storm.hdfs.common.Partitioner;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.apache.storm.hdfs.security.HdfsSecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
deleted file mode 100644
index ff3f9cc..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hdfs.common.security;
-
-import org.apache.storm.Config;
-import org.apache.storm.security.INimbusCredentialPlugin;
-import org.apache.storm.security.auth.IAutoCredentials;
-import org.apache.storm.security.auth.ICredentialsRenewer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import javax.xml.bind.DatatypeConverter;
-import java.io.*;
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.storm.hdfs.common.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
-import static org.apache.storm.hdfs.common.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
-
-/**
- * Automatically get HDFS delegation tokens and push it to user's topology. The class
- * assumes that HDFS configuration files are in your class path.
- */
-public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
-    private static final Logger LOG = LoggerFactory.getLogger(AutoHDFS.class);
-    public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
-    public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
-
-    private String hdfsKeyTab;
-    private String hdfsPrincipal;
-
-    @Override
-    public void prepare(Map conf) {
-        if(conf.containsKey(STORM_KEYTAB_FILE_KEY) && conf.containsKey(STORM_USER_NAME_KEY)) {
-            this.hdfsKeyTab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-            this.hdfsPrincipal = (String) conf.get(STORM_USER_NAME_KEY);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        //no op.
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) {
-        try {
-            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
-            LOG.info("HDFS tokens added to credentials map.");
-        } catch (Exception e) {
-            LOG.error("Could not populate HDFS credentials.", e);
-        }
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials) {
-        credentials.put(HDFS_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
-    }
-
-    /*
- *
- * @param credentials map with creds.
- * @return instance of org.apache.hadoop.security.Credentials.
- * this class's populateCredentials must have been called before.
- */
-    @SuppressWarnings("unchecked")
-    protected Credentials getCredentials(Map<String, String> credentials) {
-        Credentials credential = null;
-        if (credentials != null && credentials.containsKey(getCredentialKey())) {
-            try {
-                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey()));
-                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
-
-                credential = new Credentials();
-                credential.readFields(in);
-            } catch (Exception e) {
-                LOG.error("Could not obtain credentials from credentials map.", e);
-            }
-        }
-        return credential;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void updateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void populateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
-        try {
-            Credentials credential = getCredentials(credentials);
-            if (credential != null) {
-                subject.getPrivateCredentials().add(credential);
-                LOG.info("HDFS Credentials added to the subject.");
-            } else {
-                LOG.info("No credential found in credentials");
-            }
-        } catch (Exception e) {
-            LOG.error("Failed to initialize and get UserGroupInformation.", e);
-        }
-    }
-
-    public void addTokensToUGI(Subject subject) {
-        if(subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            try {
-                                UserGroupInformation.getCurrentUser().addToken(token);
-                                LOG.info("Added delegation tokens to UGI.");
-                            } catch (IOException e) {
-                                LOG.error("Exception while trying to add tokens to ugi", e);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    @SuppressWarnings("unchecked")
-    public void renew(Map<String, String> credentials, Map topologyConf) {
-        try {
-            Credentials credential = getCredentials(credentials);
-            if (credential != null) {
-                Configuration configuration = new Configuration();
-                Collection<Token<? extends TokenIdentifier>> tokens = credential.getAllTokens();
-
-                if(tokens != null && tokens.isEmpty() == false) {
-                    for (Token token : tokens) {
-                        //We need to re-login some other thread might have logged into hadoop using
-                        // their credentials (e.g. AutoHBase might be also part of nimbu auto creds)
-                        login(configuration);
-                        long expiration = (Long) token.renew(configuration);
-                        LOG.info("HDFS delegation token renewed, new expiration time {}", expiration);
-                    }
-                } else {
-                    LOG.debug("No tokens found for credentials, skipping renewal.");
-                }
-            }
-        } catch (Exception e) {
-            LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
-                    "renewal period so attempting to get new tokens.", e);
-            populateCredentials(credentials, topologyConf);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf) {
-        try {
-            if(UserGroupInformation.isSecurityEnabled()) {
-                final Configuration configuration = new Configuration();
-
-                login(configuration);
-
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
-                final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
-                        : FileSystem.getDefaultUri(configuration);
-
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
-
-                Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() {
-                    @Override
-                    public Object run() {
-                        try {
-                            FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
-                            Credentials credential= proxyUser.getCredentials();
-
-                            fileSystem.addDelegationTokens(hdfsPrincipal, credential);
-                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
-                            return credential;
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                });
-
-
-                ByteArrayOutputStream bao = new ByteArrayOutputStream();
-                ObjectOutputStream out = new ObjectOutputStream(bao);
-
-                creds.write(out);
-                out.flush();
-                out.close();
-
-                return bao.toByteArray();
-            } else {
-                throw new RuntimeException("Security is not enabled for HDFS");
-            }
-        } catch (Exception ex) {
-            throw new RuntimeException("Failed to get delegation tokens." , ex);
-        }
-    }
-
-    private void login(Configuration configuration) throws IOException {
-        configuration.set(STORM_KEYTAB_FILE_KEY, this.hdfsKeyTab);
-        configuration.set(STORM_USER_NAME_KEY, this.hdfsPrincipal);
-        SecurityUtil.login(configuration, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
-
-        LOG.info("Logged into hdfs with principal {}", this.hdfsPrincipal);
-    }
-
-    protected String getCredentialKey() {
-        return HDFS_CREDENTIALS;
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void main(String[] args) throws Exception {
-        Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
-        conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
-
-        Configuration configuration = new Configuration();
-        AutoHDFS autoHDFS = new AutoHDFS();
-        autoHDFS.prepare(conf);
-
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHDFS.populateCredentials(creds, conf);
-        LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
-
-        Subject s = new Subject();
-        autoHDFS.populateSubject(s, creds);
-        LOG.info("Got a Subject "+ s);
-
-        autoHDFS.renew(creds, conf);
-        LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
deleted file mode 100644
index f380b38..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.common.security;
-
-import org.apache.storm.security.auth.kerberos.AutoTGT;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
-
-/**
- * This class provides util methods for storm-hdfs connector communicating
- * with secured HDFS.
- */
-public class HdfsSecurityUtil {
-    public static final String STORM_KEYTAB_FILE_KEY = "hdfs.keytab.file";
-    public static final String STORM_USER_NAME_KEY = "hdfs.kerberos.principal";
-
-    private static final Logger LOG = LoggerFactory.getLogger(HdfsSecurityUtil.class);
-    private static AtomicBoolean isLoggedIn = new AtomicBoolean();
-    public static void login(Map conf, Configuration hdfsConfig) throws IOException {
-        //If AutoHDFS is specified, do not attempt to login using keytabs, only kept for backward compatibility.
-        if(conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
-                (!(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHDFS.class.getName())) &&
-                 !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoTGT.class.getName())))) {
-            if (UserGroupInformation.isSecurityEnabled()) {
-                // compareAndSet added because of https://issues.apache.org/jira/browse/STORM-1535
-                if (isLoggedIn.compareAndSet(false, true)) {
-                    LOG.info("Logging in using keytab as AutoHDFS is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
-                    String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-                    if (keytab != null) {
-                        hdfsConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
-                    }
-                    String userName = (String) conf.get(STORM_USER_NAME_KEY);
-                    if (userName != null) {
-                        hdfsConfig.set(STORM_USER_NAME_KEY, userName);
-                    }
-                    SecurityUtil.login(hdfsConfig, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index fe72610..b956326 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.storm.hdfs.common.HdfsUtils;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.apache.storm.hdfs.security.HdfsSecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 43993c9..a863643 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.storm.Config;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.apache.storm.hdfs.security.HdfsSecurityUtil;
 import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.format.RecordFormat;
 import org.apache.storm.hdfs.trident.format.SequenceFormat;


Mime
View raw message