storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [2/4] storm git commit: STORM-2556 Break down AutoCreds implementations into two kinds of classes
Date Wed, 21 Jun 2017 22:36:53 GMT
STORM-2556 Break down AutoCreds implementations into two kinds of classes

* Nimbus plugin: implements INimbusCredentialPlugin, ICredentialsRenewer
* Worker & Topology submitter: implements IAutoCredentials
* Add 'Hadoop' to abstract classes which depends on Hadoop
* also changed field name for storm-core interfaces to make them clear
  * whether it's cluster conf or topology conf


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9ca6d95b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9ca6d95b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9ca6d95b

Branch: refs/heads/master
Commit: 9ca6d95bda80c5685d1ac7262ae6f8520eb8c483
Parents: 38e997e
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Tue Jun 6 08:40:30 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Wed Jun 21 20:33:19 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/common/AbstractAutoCreds.java  | 254 ------------------
 .../storm/common/AbstractHadoopAutoCreds.java   | 136 ++++++++++
 .../AbstractHadoopNimbusPluginAutoCreds.java    | 127 +++++++++
 .../storm/common/CredentialKeyProvider.java     |  30 +++
 .../storm/common/HadoopCredentialUtil.java      |  80 ++++++
 .../apache/storm/hbase/security/AutoHBase.java  | 152 +----------
 .../storm/hbase/security/AutoHBaseCommand.java  |  64 +++++
 .../storm/hbase/security/AutoHBaseNimbus.java   | 131 ++++++++++
 .../storm/hbase/security/HBaseSecurityUtil.java |  10 +-
 .../apache/storm/hdfs/security/AutoHDFS.java    | 187 +-------------
 .../storm/hdfs/security/AutoHDFSCommand.java    |  64 +++++
 .../storm/hdfs/security/AutoHDFSNimbus.java     | 193 ++++++++++++++
 .../storm/hdfs/security/HdfsSecurityUtil.java   |  18 +-
 .../apache/storm/hive/security/AutoHive.java    | 252 +-----------------
 .../storm/hive/security/AutoHiveCommand.java    |  66 +++++
 .../storm/hive/security/AutoHiveNimbus.java     | 255 +++++++++++++++++++
 .../storm/hive/security/HiveSecurityUtil.java   |  32 +++
 .../storm/security/INimbusCredentialPlugin.java |   4 +-
 .../storm/security/auth/IAutoCredentials.java   |   2 +-
 19 files changed, 1229 insertions(+), 828 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
deleted file mode 100644
index 409d2fa..0000000
--- a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.common;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.math3.util.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.storm.security.INimbusCredentialPlugin;
-import org.apache.storm.security.auth.IAutoCredentials;
-import org.apache.storm.security.auth.ICredentialsRenewer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import javax.xml.bind.DatatypeConverter;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * The base class that for auto credential plugins that abstracts out some of the common functionality.
- */
-public abstract class AbstractAutoCreds implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractAutoCreds.class);
-    public static final String CONFIG_KEY_RESOURCES = "resources";
-
-    private Set<String> configKeys = new HashSet<>();
-    private Map<String, Map<String, Object>> configMap = new HashMap<>();
-
-    @Override
-    public void prepare(Map<String, Object> conf) {
-        doPrepare(conf);
-        loadConfigKeys(conf);
-        for (String key : configKeys) {
-            if (conf.containsKey(key)) {
-                Map<String, Object> config = (Map<String, Object>) conf.get(key);
-                configMap.put(key, config);
-                LOG.info("configKey = {}, config = {}", key, config);
-            }
-        }
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials, Map<String, Object> conf) {
-        try {
-            loadConfigKeys(conf);
-            if (!configKeys.isEmpty()) {
-                Map<String, Object> updatedConf = updateConfigs(conf);
-                for (String configKey : configKeys) {
-                    credentials.put(getCredentialKey(configKey),
-                            DatatypeConverter.printBase64Binary(getHadoopCredentials(updatedConf, configKey)));
-                }
-            } else {
-                credentials.put(getCredentialKey(StringUtils.EMPTY),
-                        DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
-            }
-            LOG.info("Tokens added to credentials map.");
-        } catch (Exception e) {
-            LOG.error("Could not populate credentials.", e);
-        }
-    }
-
-    private Map<String, Object> updateConfigs(Map<String, Object> topologyConf) {
-        Map<String, Object> res = new HashMap<>(topologyConf);
-        for (String configKey : configKeys) {
-            if (!res.containsKey(configKey) && configMap.containsKey(configKey)) {
-                res.put(configKey, configMap.get(configKey));
-            }
-        }
-        return res;
-    }
-
-    @Override
-    public void renew(Map<String, String> credentials, Map<String, Object> topologyConf) {
-        doRenew(credentials, updateConfigs(topologyConf));
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials) {
-        credentials.put(getCredentialKey(StringUtils.EMPTY),
-                DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void populateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void updateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    protected Set<Pair<String, Credentials>> getCredentials(Map<String, String> credentials) {
-        Set<Pair<String, Credentials>> res = new HashSet<>();
-        if (!configKeys.isEmpty()) {
-            for (String configKey : configKeys) {
-                Credentials cred = doGetCredentials(credentials, configKey);
-                if (cred != null) {
-                    res.add(new Pair(configKey, cred));
-                }
-            }
-        } else {
-            Credentials cred = doGetCredentials(credentials, StringUtils.EMPTY);
-            if (cred != null) {
-                res.add(new Pair(StringUtils.EMPTY, cred));
-            }
-        }
-        return res;
-    }
-
-    protected void fillHadoopConfiguration(Map<String, Object> topoConf, String configKey, Configuration configuration) {
-        Map<String, Object> config = (Map<String, Object>) topoConf.get(configKey);
-        LOG.info("TopoConf {}, got config {}, for configKey {}", topoConf, config, configKey);
-        if (config != null) {
-            List<String> resourcesToLoad = new ArrayList<>();
-            for (Map.Entry<String, Object> entry : config.entrySet()) {
-                if (entry.getKey().equals(CONFIG_KEY_RESOURCES)) {
-                    resourcesToLoad.addAll((List<String>) entry.getValue());
-                } else {
-                    configuration.set(entry.getKey(), String.valueOf(entry.getValue()));
-                }
-            }
-            LOG.info("Resources to load {}", resourcesToLoad);
-            // add configs from resources like hdfs-site.xml
-            for (String pathStr : resourcesToLoad) {
-                configuration.addResource(new Path(Paths.get(pathStr).toUri()));
-            }
-        }
-        LOG.info("Initializing UGI with config {}", configuration);
-        UserGroupInformation.setConfiguration(configuration);
-    }
-
-    /**
-     * Prepare the plugin
-     *
-     * @param conf the storm cluster conf set via storm.yaml
-     */
-    protected abstract void doPrepare(Map<String, Object> conf);
-
-    /**
-     * The lookup key for the config key string
-     *
-     * @return the config key string
-     */
-    protected abstract String getConfigKeyString();
-
-    /**
-     * The key with which the credentials are stored in the credentials map
-     */
-    protected abstract String getCredentialKey(String configKey);
-
-    protected abstract byte[] getHadoopCredentials(Map<String, Object> conf, String configKey);
-
-    protected abstract byte[] getHadoopCredentials(Map<String, Object> conf);
-
-    protected abstract void doRenew(Map<String, String> credentials, Map<String, Object> topologyConf);
-
-    @SuppressWarnings("unchecked")
-    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
-        try {
-            for (Pair<String, Credentials> cred : getCredentials(credentials)) {
-                subject.getPrivateCredentials().add(cred.getSecond());
-                LOG.info("Credentials added to the subject.");
-            }
-        } catch (Exception e) {
-            LOG.error("Failed to initialize and get UserGroupInformation.", e);
-        }
-    }
-
-    private void addTokensToUGI(Subject subject) {
-        if (subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            try {
-                                UserGroupInformation.getCurrentUser().addToken(token);
-                                LOG.info("Added delegation tokens to UGI.");
-                            } catch (IOException e) {
-                                LOG.error("Exception while trying to add tokens to ugi", e);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    private Credentials doGetCredentials(Map<String, String> credentials, String configKey) {
-        Credentials credential = null;
-        if (credentials != null && credentials.containsKey(getCredentialKey(configKey))) {
-            try {
-                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey(configKey)));
-                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
-
-                credential = new Credentials();
-                credential.readFields(in);
-            } catch (Exception e) {
-                LOG.error("Could not obtain credentials from credentials map.", e);
-            }
-        }
-        return credential;
-
-    }
-
-    private void loadConfigKeys(Map<String, Object> conf) {
-        List<String> keys;
-        String configKeyString = getConfigKeyString();
-        if ((keys = (List<String>) conf.get(configKeyString)) != null) {
-            configKeys.addAll(keys);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopAutoCreds.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopAutoCreds.java
new file mode 100644
index 0000000..2ef0cf3
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopAutoCreds.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.common;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The base class that for auto credential plugins that abstracts out some of the common functionality.
+ */
+public abstract class AbstractHadoopAutoCreds implements IAutoCredentials, CredentialKeyProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractHadoopAutoCreds.class);
+
+    private Set<String> configKeys = new HashSet<>();
+
+    @Override
+    public void prepare(Map topoConf) {
+        doPrepare(topoConf);
+        loadConfigKeys(topoConf);
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials) {
+        credentials.put(getCredentialKey(StringUtils.EMPTY),
+                DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void populateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void updateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
+    }
+
+    public Set<Pair<String, Credentials>> getCredentials(Map<String, String> credentials) {
+        return HadoopCredentialUtil.getCredential(this, credentials, configKeys);
+    }
+
+    /**
+     * Prepare the plugin
+     *
+     * @param topoConf the topology conf
+     */
+    protected abstract void doPrepare(Map topoConf);
+
+    /**
+     * The lookup key for the config key string
+     *
+     * @return the config key string
+     */
+    protected abstract String getConfigKeyString();
+
+    @SuppressWarnings("unchecked")
+    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
+        try {
+            for (Pair<String, Credentials> cred : getCredentials(credentials)) {
+                subject.getPrivateCredentials().add(cred.getSecond());
+                LOG.info("Credentials added to the subject.");
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    private void addTokensToUGI(Subject subject) {
+        if (subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            try {
+                                UserGroupInformation.getCurrentUser().addToken(token);
+                                LOG.info("Added delegation tokens to UGI.");
+                            } catch (IOException e) {
+                                LOG.error("Exception while trying to add tokens to ugi", e);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void loadConfigKeys(Map<String, Object> conf) {
+        List<String> keys;
+        String configKeyString = getConfigKeyString();
+        if ((keys = (List<String>) conf.get(configKeyString)) != null) {
+            configKeys.addAll(keys);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java
new file mode 100644
index 0000000..89ec6f5
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractHadoopNimbusPluginAutoCreds.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.common;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.storm.security.INimbusCredentialPlugin;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.DatatypeConverter;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The base class that for auto credential plugins that abstracts out some of the common functionality.
+ */
+public abstract class AbstractHadoopNimbusPluginAutoCreds
+    implements INimbusCredentialPlugin, ICredentialsRenewer, CredentialKeyProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractHadoopNimbusPluginAutoCreds.class);
+    public static final String CONFIG_KEY_RESOURCES = "resources";
+
+    @Override
+    public void prepare(Map conf) {
+        doPrepare(conf);
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map<String, Object> topologyConf) {
+        try {
+            List<String> configKeys = getConfigKeys(topologyConf);
+            if (!configKeys.isEmpty()) {
+                for (String configKey : configKeys) {
+                    credentials.put(getCredentialKey(configKey),
+                            DatatypeConverter.printBase64Binary(getHadoopCredentials(topologyConf, configKey)));
+                }
+            } else {
+                credentials.put(getCredentialKey(StringUtils.EMPTY),
+                        DatatypeConverter.printBase64Binary(getHadoopCredentials(topologyConf)));
+            }
+            LOG.info("Tokens added to credentials map.");
+        } catch (Exception e) {
+            LOG.error("Could not populate credentials.", e);
+        }
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map<String, Object> topologyConf) {
+        doRenew(credentials, topologyConf);
+    }
+
+    protected Set<Pair<String, Credentials>> getCredentials(Map<String, String> credentials,
+        List<String> configKeys) {
+        return HadoopCredentialUtil.getCredential(this, credentials, configKeys);
+    }
+
+    protected void fillHadoopConfiguration(Map topologyConf, String configKey, Configuration configuration) {
+        Map<String, Object> config = (Map<String, Object>) topologyConf.get(configKey);
+        LOG.info("TopoConf {}, got config {}, for configKey {}", topologyConf, config, configKey);
+        if (config != null) {
+            List<String> resourcesToLoad = new ArrayList<>();
+            for (Map.Entry<String, Object> entry : config.entrySet()) {
+                if (entry.getKey().equals(CONFIG_KEY_RESOURCES)) {
+                    resourcesToLoad.addAll((List<String>) entry.getValue());
+                } else {
+                    configuration.set(entry.getKey(), String.valueOf(entry.getValue()));
+                }
+            }
+            LOG.info("Resources to load {}", resourcesToLoad);
+            // add configs from resources like hdfs-site.xml
+            for (String pathStr : resourcesToLoad) {
+                configuration.addResource(new Path(Paths.get(pathStr).toUri()));
+            }
+        }
+        LOG.info("Initializing UGI with config {}", configuration);
+        UserGroupInformation.setConfiguration(configuration);
+    }
+
+    /**
+     * Prepare the plugin
+     *
+     * @param conf the storm cluster conf set via storm.yaml
+     */
+    protected abstract void doPrepare(Map conf);
+
+    /**
+     * The lookup key for the config key string
+     *
+     * @return the config key string
+     */
+    protected abstract String getConfigKeyString();
+
+    protected abstract byte[] getHadoopCredentials(Map topologyConf, String configKey);
+
+    protected abstract byte[] getHadoopCredentials(Map topologyConf);
+
+    protected abstract void doRenew(Map<String, String> credentials, Map topologyConf);
+
+    protected List<String> getConfigKeys(Map conf) {
+        String configKeyString = getConfigKeyString();
+        return (List<String>) conf.get(configKeyString);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/common/CredentialKeyProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/CredentialKeyProvider.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/CredentialKeyProvider.java
new file mode 100644
index 0000000..3826407
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/CredentialKeyProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.common;
+
+/**
+ * Provider interface for credential key.
+ */
+public interface CredentialKeyProvider {
+  /**
+   * The lookup key for the config key string
+   *
+   * @return the config key string
+   */
+  String getCredentialKey(String configKey);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/common/HadoopCredentialUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/HadoopCredentialUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/HadoopCredentialUtil.java
new file mode 100644
index 0000000..f676abf
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/HadoopCredentialUtil.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.common;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utility class for getting credential for Hadoop.
+ */
+final class HadoopCredentialUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(HadoopCredentialUtil.class);
+
+  private HadoopCredentialUtil() {
+  }
+
+  static Set<Pair<String, Credentials>> getCredential(CredentialKeyProvider provider,
+      Map<String, String> credentials, Collection<String> configKeys) {
+    Set<Pair<String, Credentials>> res = new HashSet<>();
+    if (!configKeys.isEmpty()) {
+      for (String configKey : configKeys) {
+        Credentials cred = doGetCredentials(provider, credentials, configKey);
+        if (cred != null) {
+          res.add(new Pair(configKey, cred));
+        }
+      }
+    } else {
+      Credentials cred = doGetCredentials(provider, credentials, StringUtils.EMPTY);
+      if (cred != null) {
+        res.add(new Pair(StringUtils.EMPTY, cred));
+      }
+    }
+    return res;
+  }
+
+  private static Credentials doGetCredentials(CredentialKeyProvider provider,
+      Map<String, String> credentials, String configKey) {
+    Credentials credential = null;
+    String credentialKey = provider.getCredentialKey(configKey);
+    if (credentials != null && credentials.containsKey(credentialKey)) {
+      try {
+        byte[] credBytes = DatatypeConverter.parseBase64Binary(credentialKey);
+        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
+
+        credential = new Credentials();
+        credential.readFields(in);
+      } catch (Exception e) {
+        LOG.error("Could not obtain credentials from credentials map.", e);
+      }
+    }
+    return credential;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
index 947d9b0..0218be5 100644
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -18,53 +18,19 @@
 
 package org.apache.storm.hbase.security;
 
-import org.apache.storm.Config;
-import org.apache.storm.common.AbstractAutoCreds;
-import org.apache.storm.hdfs.security.HdfsSecurityUtil;
-import org.apache.storm.security.INimbusCredentialPlugin;
-import org.apache.storm.security.auth.IAutoCredentials;
-import org.apache.storm.security.auth.ICredentialsRenewer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.common.AbstractHadoopAutoCreds;
 
-import javax.security.auth.Subject;
-import javax.xml.bind.DatatypeConverter;
-import java.io.*;
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
+
+import static org.apache.storm.hbase.security.HBaseSecurityUtil.HBASE_CREDENTIALS;
 
 /**
- * Automatically get hbase delegation tokens and push it to user's topology. The class
- * assumes that hadoop/hbase configuration files are in your class path.
+ * Auto credentials plugin for HBase implementation. This class provides a way to automatically
+ * push credentials to a topology and to retrieve them in the worker.
  */
-public class AutoHBase extends AbstractAutoCreds {
-    private static final Logger LOG = LoggerFactory.getLogger(AutoHBase.class);
-
-    public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
-    public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
-    public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
-
-    public String hbaseKeytab;
-    public String hbasePrincipal;
-
+public class AutoHBase extends AbstractHadoopAutoCreds {
     @Override
-    public void doPrepare(Map<String, Object> conf) {
-        if(conf.containsKey(HBASE_KEYTAB_FILE_KEY) && conf.containsKey(HBASE_PRINCIPAL_KEY)) {
-            hbaseKeytab = (String) conf.get(HBASE_KEYTAB_FILE_KEY);
-            hbasePrincipal = (String) conf.get(HBASE_PRINCIPAL_KEY);
-        }
+    public void doPrepare(Map conf) {
     }
 
     @Override
@@ -72,108 +38,8 @@ public class AutoHBase extends AbstractAutoCreds {
         return HBaseSecurityUtil.HBASE_CREDENTIALS_CONFIG_KEYS;
     }
 
-
-    @Override
-    public void shutdown() {
-        //no op.
-    }
-
-    @Override
-    protected  byte[] getHadoopCredentials(Map<String, Object> conf, String configKey) {
-        Configuration configuration = getHadoopConfiguration(conf, configKey);
-        return getHadoopCredentials(conf, configuration);
-    }
-
-    @Override
-    protected byte[] getHadoopCredentials(Map<String, Object> conf) {
-        return getHadoopCredentials(conf, HBaseConfiguration.create());
-    }
-
-    private Configuration getHadoopConfiguration(Map<String, Object> topoConf, String configKey) {
-        Configuration configuration = HBaseConfiguration.create();
-        fillHadoopConfiguration(topoConf, configKey, configuration);
-        return configuration;
-    }
-
-    @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map<String, Object> conf, Configuration hbaseConf) {
-        try {
-            if(UserGroupInformation.isSecurityEnabled()) {
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
-                UserProvider provider = UserProvider.instantiate(hbaseConf);
-
-                if (hbaseConf.get(HBASE_KEYTAB_FILE_KEY) == null) {
-                    hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
-                }
-                if (hbaseConf.get(HBASE_PRINCIPAL_KEY) == null) {
-                    hbaseConf.set(HBASE_PRINCIPAL_KEY, hbasePrincipal);
-                }
-                provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
-
-                LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
-
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
-
-                User user = User.create(ugi);
-
-                if(user.isHBaseSecurityEnabled(hbaseConf)) {
-                    TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
-
-                    LOG.info("Obtained HBase tokens, adding to user credentials.");
-
-                    Credentials credential= proxyUser.getCredentials();
-                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
-                    ObjectOutputStream out = new ObjectOutputStream(bao);
-                    credential.write(out);
-                    out.flush();
-                    out.close();
-                    return bao.toByteArray();
-                } else {
-                    throw new RuntimeException("Security is not enabled for HBase.");
-                }
-            } else {
-                throw new RuntimeException("Security is not enabled for Hadoop");
-            }
-        } catch (Exception ex) {
-            throw new RuntimeException("Failed to get delegation tokens." , ex);
-        }
-    }
-
     @Override
-    public void doRenew(Map<String, String> credentials, Map<String, Object> topologyConf) {
-        //HBASE tokens are not renewable so we always have to get new ones.
-        populateCredentials(credentials, topologyConf);
-    }
-
-    @Override
-    protected String getCredentialKey(String configKey) {
+    public String getCredentialKey(String configKey) {
         return HBASE_CREDENTIALS + configKey;
     }
-
-
-    @SuppressWarnings("unchecked")
-    public static void main(String[] args) throws Exception {
-        Map<String, Object> conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
-        conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
-
-        AutoHBase autoHBase = new AutoHBase();
-        autoHBase.prepare(conf);
-
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHBase.populateCredentials(creds, conf);
-        LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
-
-        Subject s = new Subject();
-        autoHBase.populateSubject(s, creds);
-        LOG.info("Got a Subject " + s);
-
-        autoHBase.renew(creds, conf);
-        LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
-    }
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java
new file mode 100644
index 0000000..b239816
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseCommand.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.security;
+
+import org.apache.storm.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.hbase.security.HBaseSecurityUtil.HBASE_KEYTAB_FILE_KEY;
+import static org.apache.storm.hbase.security.HBaseSecurityUtil.HBASE_PRINCIPAL_KEY;
+
+/**
+ * Command tool of Hive credential renewer
+ */
+public final class AutoHBaseCommand {
+  private static final Logger LOG = LoggerFactory.getLogger(AutoHBaseCommand.class);
+
+  private AutoHBaseCommand() {
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void main(String[] args) throws Exception {
+    Map conf = new HashMap();
+    conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+    conf.put(HBASE_PRINCIPAL_KEY, args[1]); // hbase principal storm-hbase@WITZEN.COM
+    conf.put(HBASE_KEYTAB_FILE_KEY,
+        args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
+
+    AutoHBase autoHBase = new AutoHBase();
+    autoHBase.prepare(conf);
+    AutoHBaseNimbus autoHBaseNimbus = new AutoHBaseNimbus();
+    autoHBaseNimbus.prepare(conf);
+
+    Map<String, String> creds = new HashMap<>();
+    autoHBaseNimbus.populateCredentials(creds, conf);
+    LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
+
+    Subject s = new Subject();
+    autoHBase.populateSubject(s, creds);
+    LOG.info("Got a Subject " + s);
+
+    autoHBaseNimbus.renew(creds, conf);
+    LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java
new file mode 100644
index 0000000..a1cf674
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.storm.Config;
+import org.apache.storm.common.AbstractHadoopNimbusPluginAutoCreds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.util.Map;
+
+import static org.apache.storm.hbase.security.HBaseSecurityUtil.HBASE_CREDENTIALS;
+import static org.apache.storm.hbase.security.HBaseSecurityUtil.HBASE_KEYTAB_FILE_KEY;
+import static org.apache.storm.hbase.security.HBaseSecurityUtil.HBASE_PRINCIPAL_KEY;
+
+/**
+ * Auto credentials nimbus plugin for HBase implementation. This class automatically
+ * gets HBase delegation tokens and push it to user's topology.
+ */
+public class AutoHBaseNimbus extends AbstractHadoopNimbusPluginAutoCreds {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHBaseNimbus.class);
+
+    @Override
+    public void doPrepare(Map conf) {
+        // we don't allow any cluster wide configuration
+    }
+
+    @Override
+    protected String getConfigKeyString() {
+        return HBaseSecurityUtil.HBASE_CREDENTIALS_CONFIG_KEYS;
+    }
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    protected  byte[] getHadoopCredentials(Map conf, String configKey) {
+        Configuration configuration = getHadoopConfiguration(conf, configKey);
+        return getHadoopCredentials(conf, configuration);
+    }
+
+    @Override
+    protected byte[] getHadoopCredentials(Map conf) {
+        return getHadoopCredentials(conf, HBaseConfiguration.create());
+    }
+
+    private Configuration getHadoopConfiguration(Map topoConf, String configKey) {
+        Configuration configuration = HBaseConfiguration.create();
+        fillHadoopConfiguration(topoConf, configKey, configuration);
+        return configuration;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected byte[] getHadoopCredentials(Map conf, Configuration hbaseConf) {
+        try {
+            if(UserGroupInformation.isSecurityEnabled()) {
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+                UserProvider provider = UserProvider.instantiate(hbaseConf);
+                provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
+
+                LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                if(User.isHBaseSecurityEnabled(hbaseConf)) {
+                    TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
+
+                    LOG.info("Obtained HBase tokens, adding to user credentials.");
+
+                    Credentials credential= proxyUser.getCredentials();
+                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                    ObjectOutputStream out = new ObjectOutputStream(bao);
+                    credential.write(out);
+                    out.flush();
+                    out.close();
+                    return bao.toByteArray();
+                } else {
+                    throw new RuntimeException("Security is not enabled for HBase.");
+                }
+            } else {
+                throw new RuntimeException("Security is not enabled for Hadoop");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    @Override
+    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+        //HBASE tokens are not renewable so we always have to get new ones.
+        populateCredentials(credentials, topologyConf);
+    }
+
+    @Override
+    public String getCredentialKey(String configKey) {
+        return HBASE_CREDENTIALS + configKey;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
index 32339a6..935aa89 100644
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -40,7 +40,15 @@ public class HBaseSecurityUtil {
     public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
     public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
     public static final String HBASE_CREDENTIALS_CONFIG_KEYS = "hbaseCredentialsConfigKeys";
-    private static  UserProvider legacyProvider = null;
+
+    public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
+    public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
+    public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
+
+    private static UserProvider legacyProvider = null;
+
+    private HBaseSecurityUtil() {
+    }
 
     public static UserProvider login(Map<String, Object> conf, Configuration hbaseConfig) throws IOException {
         //Allowing keytab based login for backward compatibility.

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
index eb5be81..4c2cb62 100644
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
@@ -18,50 +18,19 @@
 
 package org.apache.storm.hdfs.security;
 
-import org.apache.commons.math3.util.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.storm.Config;
-import org.apache.storm.common.AbstractAutoCreds;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.common.AbstractHadoopAutoCreds;
 
-import javax.security.auth.Subject;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
-import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.HDFS_CREDENTIALS;
 
 /**
- * Automatically get HDFS delegation tokens and push it to user's topology. The class
- * assumes that HDFS configuration files are in your class path.
+ * Auto credentials plugin for HDFS implementation. This class provides a way to automatically
+ * push credentials to a topology and to retrieve them in the worker.
  */
-public class AutoHDFS extends AbstractAutoCreds {
-    private static final Logger LOG = LoggerFactory.getLogger(AutoHDFS.class);
-    public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
-    public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
-
-    private String hdfsKeyTab;
-    private String hdfsPrincipal;
-
+public class AutoHDFS extends AbstractHadoopAutoCreds {
     @Override
-    public void doPrepare(Map<String, Object> conf) {
-        if(conf.containsKey(STORM_KEYTAB_FILE_KEY) && conf.containsKey(STORM_USER_NAME_KEY)) {
-            hdfsKeyTab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-            hdfsPrincipal = (String) conf.get(STORM_USER_NAME_KEY);
-        }
+    public void doPrepare(Map conf) {
     }
 
     @Override
@@ -70,147 +39,7 @@ public class AutoHDFS extends AbstractAutoCreds {
     }
 
     @Override
-    public void shutdown() {
-        //no op.
-    }
-
-    @Override
-    protected  byte[] getHadoopCredentials(Map<String, Object> conf, String configKey) {
-        Configuration configuration = getHadoopConfiguration(conf, configKey);
-        return getHadoopCredentials(conf, configuration);
-    }
-
-    @Override
-    protected byte[] getHadoopCredentials(Map<String, Object> conf) {
-        return getHadoopCredentials(conf, new Configuration());
-    }
-
-    private Configuration getHadoopConfiguration(Map<String, Object> topoConf, String configKey) {
-        Configuration configuration = new Configuration();
-        fillHadoopConfiguration(topoConf, configKey, configuration);
-        return configuration;
-    }
-
-    @SuppressWarnings("unchecked")
-    private byte[] getHadoopCredentials(Map<String, Object> conf, final Configuration configuration) {
-        try {
-            if(UserGroupInformation.isSecurityEnabled()) {
-                login(configuration);
-
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
-                final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
-                        : FileSystem.getDefaultUri(configuration);
-
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
-
-                Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() {
-                    @Override
-                    public Object run() {
-                        try {
-                            FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
-                            Credentials credential= proxyUser.getCredentials();
-
-                            if (configuration.get(STORM_USER_NAME_KEY) == null) {
-                                configuration.set(STORM_USER_NAME_KEY, hdfsPrincipal);
-                            }
-
-                            fileSystem.addDelegationTokens(configuration.get(STORM_USER_NAME_KEY), credential);
-                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
-                            return credential;
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                });
-
-
-                ByteArrayOutputStream bao = new ByteArrayOutputStream();
-                ObjectOutputStream out = new ObjectOutputStream(bao);
-
-                creds.write(out);
-                out.flush();
-                out.close();
-
-                return bao.toByteArray();
-            } else {
-                throw new RuntimeException("Security is not enabled for HDFS");
-            }
-        } catch (Exception ex) {
-            throw new RuntimeException("Failed to get delegation tokens." , ex);
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    @SuppressWarnings("unchecked")
-    public void doRenew(Map<String, String> credentials, Map<String, Object> topologyConf) {
-        for (Pair<String, Credentials> cred : getCredentials(credentials)) {
-            try {
-                Configuration configuration = getHadoopConfiguration(topologyConf, cred.getFirst());
-                Collection<Token<? extends TokenIdentifier>> tokens = cred.getSecond().getAllTokens();
-
-                if (tokens != null && !tokens.isEmpty()) {
-                    for (Token token : tokens) {
-                        //We need to re-login some other thread might have logged into hadoop using
-                        // their credentials (e.g. AutoHBase might be also part of nimbu auto creds)
-                        login(configuration);
-                        long expiration = token.renew(configuration);
-                        LOG.info("HDFS delegation token renewed, new expiration time {}", expiration);
-                    }
-                } else {
-                    LOG.debug("No tokens found for credentials, skipping renewal.");
-                }
-            } catch (Exception e) {
-                LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
-                        "renewal period so attempting to get new tokens.", e);
-                populateCredentials(credentials, topologyConf);
-            }
-        }
-    }
-
-    private void login(Configuration configuration) throws IOException {
-        if (configuration.get(STORM_KEYTAB_FILE_KEY) == null) {
-            configuration.set(STORM_KEYTAB_FILE_KEY, hdfsKeyTab);
-        }
-        if (configuration.get(STORM_USER_NAME_KEY) == null) {
-            configuration.set(STORM_USER_NAME_KEY, hdfsPrincipal);
-        }
-        SecurityUtil.login(configuration, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
-
-        LOG.info("Logged into hdfs with principal {}", configuration.get(STORM_USER_NAME_KEY));
-    }
-
-    @Override
-    protected String getCredentialKey(String configKey) {
+    public String getCredentialKey(String configKey) {
         return HDFS_CREDENTIALS + configKey;
     }
-
-    @SuppressWarnings("unchecked")
-    public static void main(String[] args) throws Exception {
-        Map<String, Object> conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
-        conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
-
-        Configuration configuration = new Configuration();
-        AutoHDFS autoHDFS = new AutoHDFS();
-        autoHDFS.prepare(conf);
-
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHDFS.populateCredentials(creds, conf);
-        LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
-
-        Subject s = new Subject();
-        autoHDFS.populateSubject(s, creds);
-        LOG.info("Got a Subject "+ s);
-
-        autoHDFS.renew(creds, conf);
-        LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
-    }
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java
new file mode 100644
index 0000000..803e625
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSCommand.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.security;
+
+import org.apache.storm.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
+
+/**
+ * Command tool of HDFS credential renewer
+ */
+public final class AutoHDFSCommand {
+  private static final Logger LOG = LoggerFactory.getLogger(AutoHDFSCommand.class);
+
+  private AutoHDFSCommand() {
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void main(String[] args) throws Exception {
+    Map conf = new HashMap();
+    conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+    conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
+    conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
+
+    AutoHDFS autoHDFS = new AutoHDFS();
+    autoHDFS.prepare(conf);
+    AutoHDFSNimbus autoHDFSNimbus = new AutoHDFSNimbus();
+    autoHDFSNimbus.prepare(conf);
+
+    Map<String,String> creds  = new HashMap<>();
+    autoHDFSNimbus.populateCredentials(creds, conf);
+    LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
+
+    Subject s = new Subject();
+    autoHDFS.populateSubject(s, creds);
+    LOG.info("Got a Subject "+ s);
+
+    autoHDFSNimbus.renew(creds, conf);
+    LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java
new file mode 100644
index 0000000..78aef12
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.security;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.storm.Config;
+import org.apache.storm.common.AbstractHadoopNimbusPluginAutoCreds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.HDFS_CREDENTIALS;
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.TOPOLOGY_HDFS_URI;
+
+/**
+ * Auto credentials nimbus plugin for HDFS implementation. This class automatically
+ * gets HDFS delegation tokens and push it to user's topology.
+ */
+public class AutoHDFSNimbus extends AbstractHadoopNimbusPluginAutoCreds {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHDFSNimbus.class);
+
+    private String hdfsKeyTab;
+    private String hdfsPrincipal;
+
+    @Override
+    public void doPrepare(Map conf) {
+        if(conf.containsKey(STORM_KEYTAB_FILE_KEY) && conf.containsKey(STORM_USER_NAME_KEY)) {
+            hdfsKeyTab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+            hdfsPrincipal = (String) conf.get(STORM_USER_NAME_KEY);
+        }
+    }
+
+    @Override
+    protected String getConfigKeyString() {
+        return HdfsSecurityUtil.HDFS_CREDENTIALS_CONFIG_KEYS;
+    }
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    protected  byte[] getHadoopCredentials(Map conf, String configKey) {
+        Configuration configuration = getHadoopConfiguration(conf, configKey);
+        return getHadoopCredentials(conf, configuration);
+    }
+
+    @Override
+    protected byte[] getHadoopCredentials(Map conf) {
+        return getHadoopCredentials(conf, new Configuration());
+    }
+
+    private Configuration getHadoopConfiguration(Map topoConf, String configKey) {
+        Configuration configuration = new Configuration();
+        fillHadoopConfiguration(topoConf, configKey, configuration);
+        return configuration;
+    }
+
+    @SuppressWarnings("unchecked")
+    private byte[] getHadoopCredentials(Map conf, final Configuration configuration) {
+        try {
+            if(UserGroupInformation.isSecurityEnabled()) {
+                login(configuration);
+
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+                final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
+                        : FileSystem.getDefaultUri(configuration);
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() {
+                    @Override
+                    public Object run() {
+                        try {
+                            FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
+                            Credentials credential= proxyUser.getCredentials();
+
+                            if (configuration.get(STORM_USER_NAME_KEY) == null) {
+                                configuration.set(STORM_USER_NAME_KEY, hdfsPrincipal);
+                            }
+
+                            fileSystem.addDelegationTokens(configuration.get(STORM_USER_NAME_KEY), credential);
+                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
+                            return credential;
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+
+
+                ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                ObjectOutputStream out = new ObjectOutputStream(bao);
+
+                creds.write(out);
+                out.flush();
+                out.close();
+
+                return bao.toByteArray();
+            } else {
+                throw new RuntimeException("Security is not enabled for HDFS");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+        List<String> confKeys = getConfigKeys(topologyConf);
+        for (Pair<String, Credentials> cred : getCredentials(credentials, confKeys)) {
+            try {
+                Configuration configuration = getHadoopConfiguration(topologyConf, cred.getFirst());
+                Collection<Token<? extends TokenIdentifier>> tokens = cred.getSecond().getAllTokens();
+
+                if (tokens != null && !tokens.isEmpty()) {
+                    for (Token token : tokens) {
+                        //We need to re-login some other thread might have logged into hadoop using
+                        // their credentials (e.g. AutoHBase might be also part of nimbu auto creds)
+                        login(configuration);
+                        long expiration = token.renew(configuration);
+                        LOG.info("HDFS delegation token renewed, new expiration time {}", expiration);
+                    }
+                } else {
+                    LOG.debug("No tokens found for credentials, skipping renewal.");
+                }
+            } catch (Exception e) {
+                LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
+                        "renewal period so attempting to get new tokens.", e);
+                populateCredentials(credentials, topologyConf);
+            }
+        }
+    }
+
+    private void login(Configuration configuration) throws IOException {
+        if (configuration.get(STORM_KEYTAB_FILE_KEY) == null) {
+            configuration.set(STORM_KEYTAB_FILE_KEY, hdfsKeyTab);
+        }
+        if (configuration.get(STORM_USER_NAME_KEY) == null) {
+            configuration.set(STORM_USER_NAME_KEY, hdfsPrincipal);
+        }
+        SecurityUtil.login(configuration, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
+
+        LOG.info("Logged into hdfs with principal {}", configuration.get(STORM_USER_NAME_KEY));
+    }
+
+    @Override
+    public String getCredentialKey(String configKey) {
+        return HDFS_CREDENTIALS + configKey;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
index 8e8ae95..10c377e 100644
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
@@ -17,10 +17,11 @@
  */
 package org.apache.storm.hdfs.security;
 
+import org.apache.storm.security.auth.kerberos.AutoTGT;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.storm.security.auth.kerberos.AutoTGT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,19 +36,24 @@ import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
  * This class provides util methods for storm-hdfs connector communicating
  * with secured HDFS.
  */
-public class HdfsSecurityUtil {
+public final class HdfsSecurityUtil {
     public static final String STORM_KEYTAB_FILE_KEY = "hdfs.keytab.file";
     public static final String STORM_USER_NAME_KEY = "hdfs.kerberos.principal";
     public static final String HDFS_CREDENTIALS_CONFIG_KEYS = "hdfsCredentialsConfigKeys";
-
+    public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
+    public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
 
     private static final Logger LOG = LoggerFactory.getLogger(HdfsSecurityUtil.class);
     private static AtomicBoolean isLoggedIn = new AtomicBoolean();
-    public static void login(Map<String, Object> conf, Configuration hdfsConfig) throws IOException {
+
+    private HdfsSecurityUtil() {
+    }
+
+    public static void login(Map conf, Configuration hdfsConfig) throws IOException {
         //If AutoHDFS is specified, do not attempt to login using keytabs, only kept for backward compatibility.
         if(conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
                 (!(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHDFS.class.getName())) &&
-                 !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoTGT.class.getName())))) {
+                        !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoTGT.class.getName())))) {
             if (UserGroupInformation.isSecurityEnabled()) {
                 // compareAndSet added because of https://issues.apache.org/jira/browse/STORM-1535
                 if (isLoggedIn.compareAndSet(false, true)) {
@@ -65,4 +71,4 @@ public class HdfsSecurityUtil {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
index 73108bc..d88f197 100644
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
@@ -18,55 +18,20 @@
 
 package org.apache.storm.hive.security;
 
-import org.apache.commons.math3.util.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.storm.Config;
-import org.apache.storm.common.AbstractAutoCreds;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.common.AbstractHadoopAutoCreds;
 
-import javax.security.auth.Subject;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_CREDENTIALS;
+import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_CREDENTIALS_CONFIG_KEYS;
+
 /**
- * Automatically get hive delegation tokens and push it to user's topology.
+ * Auto credentials plugin for Hive implementation. This class provides a way to automatically
+ * push credentials to a topology and to retrieve them in the worker.
  */
-public class AutoHive extends AbstractAutoCreds {
-    private static final Logger LOG = LoggerFactory.getLogger(AutoHive.class);
-
-    public static final String HIVE_CREDENTIALS = "HIVE_CREDENTIALS";
-    public static final String HIVE_CREDENTIALS_CONFIG_KEYS = "hiveCredentialsConfigKeys";
-
-    public static final String HIVE_KEYTAB_FILE_KEY = "hive.keytab.file";
-    public static final String HIVE_PRINCIPAL_KEY = "hive.kerberos.principal";
-
-    public String hiveKeytab;
-    public String hivePrincipal;
-    public String metaStoreURI;
-
+public class AutoHive extends AbstractHadoopAutoCreds {
     @Override
-    public void doPrepare(Map<String, Object> conf) {
-        if (conf.containsKey(HIVE_KEYTAB_FILE_KEY) && conf.containsKey(HIVE_PRINCIPAL_KEY)) {
-            hiveKeytab = (String) conf.get(HIVE_KEYTAB_FILE_KEY);
-            hivePrincipal = (String) conf.get(HIVE_PRINCIPAL_KEY);
-            metaStoreURI = (String) conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
-        }
+    public void doPrepare(Map conf) {
     }
 
     @Override
@@ -75,205 +40,8 @@ public class AutoHive extends AbstractAutoCreds {
     }
 
     @Override
-    public void shutdown() {
-        //no op.
-    }
-
-    @Override
-    protected byte[] getHadoopCredentials(Map<String, Object> conf, String configKey) {
-        Configuration configuration = getHadoopConfiguration(conf, configKey);
-        return getHadoopCredentials(conf, configuration);
-    }
-
-    @Override
-    protected byte[] getHadoopCredentials(Map<String, Object> conf) {
-        Configuration configuration = new Configuration();
-        return getHadoopCredentials(conf, configuration);
-    }
-
-    private Configuration getHadoopConfiguration(Map<String, Object> topoConf, String configKey) {
-        Configuration configuration = new Configuration();
-        fillHadoopConfiguration(topoConf, configKey, configuration);
-        return configuration;
-    }
-
-    public HiveConf createHiveConf(String metaStoreURI, String hiveMetaStorePrincipal) throws IOException {
-        HiveConf hcatConf = new HiveConf();
-        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
-        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
-        hcatConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-        hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
-        hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, hiveMetaStorePrincipal);
-        return hcatConf;
-    }
-
-    @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map<String, Object> conf, final Configuration configuration) {
-        try {
-            if (UserGroupInformation.isSecurityEnabled()) {
-                String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-                String hiveMetaStoreURI = getMetaStoreURI(configuration);
-                String hiveMetaStorePrincipal = getMetaStorePrincipal(configuration);
-                HiveConf hcatConf = createHiveConf(hiveMetaStoreURI, hiveMetaStorePrincipal);
-                login(configuration);
-
-                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-                UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, currentUser);
-                try {
-                    Token<DelegationTokenIdentifier> delegationTokenId =
-                            getDelegationToken(hcatConf, hiveMetaStorePrincipal, topologySubmitterUser);
-                    proxyUser.addToken(delegationTokenId);
-                    LOG.info("Obtained Hive tokens, adding to user credentials.");
-
-                    Credentials credential = proxyUser.getCredentials();
-                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
-                    ObjectOutputStream out = new ObjectOutputStream(bao);
-                    credential.write(out);
-                    out.flush();
-                    out.close();
-                    return bao.toByteArray();
-                } catch (Exception ex) {
-                    LOG.debug(" Exception" + ex.getMessage());
-                    throw ex;
-                }
-            } else {
-                throw new RuntimeException("Security is not enabled for Hadoop");
-            }
-        } catch (Exception ex) {
-            throw new RuntimeException("Failed to get delegation tokens.", ex);
-        }
-    }
-
-    private Token<DelegationTokenIdentifier> getDelegationToken(HiveConf hcatConf,
-                                                                String metaStoreServicePrincipal,
-                                                                String topologySubmitterUser) throws IOException {
-        LOG.info("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
-
-        HCatClient hcatClient = null;
-        try {
-            hcatClient = HCatClient.create(hcatConf);
-            String delegationToken = hcatClient.getDelegationToken(topologySubmitterUser, metaStoreServicePrincipal);
-            Token<DelegationTokenIdentifier> delegationTokenId = new Token<DelegationTokenIdentifier>();
-            delegationTokenId.decodeFromUrlString(delegationToken);
-
-            DelegationTokenIdentifier d = new DelegationTokenIdentifier();
-            d.readFields(new DataInputStream(new ByteArrayInputStream(
-                    delegationTokenId.getIdentifier())));
-            LOG.info("Created Delegation Token for : " + d.getUser());
-
-            return delegationTokenId;
-        } finally {
-            if (hcatClient != null)
-                hcatClient.close();
-        }
-    }
-
-    private String getMetaStoreURI(Configuration configuration) {
-        if (configuration.get(HiveConf.ConfVars.METASTOREURIS.varname) == null)
-            return metaStoreURI;
-        else
-            return configuration.get(HiveConf.ConfVars.METASTOREURIS.varname);
-    }
-
-    private String getMetaStorePrincipal(Configuration configuration) {
-        if (configuration.get(HIVE_PRINCIPAL_KEY) == null)
-            return hivePrincipal;
-        else
-            return configuration.get(HIVE_PRINCIPAL_KEY);
-    }
-
-    private void login(Configuration configuration) throws IOException {
-        if (configuration.get(HIVE_KEYTAB_FILE_KEY) == null) {
-            configuration.set(HIVE_KEYTAB_FILE_KEY, hiveKeytab);
-        }
-        if (configuration.get(HIVE_PRINCIPAL_KEY) == null) {
-            configuration.set(HIVE_PRINCIPAL_KEY, hivePrincipal);
-        }
-        SecurityUtil.login(configuration, HIVE_KEYTAB_FILE_KEY, HIVE_PRINCIPAL_KEY);
-        LOG.info("Logged into hive with principal {}", configuration.get(HIVE_PRINCIPAL_KEY));
-    }
-
-    @Override
-    public void doRenew(Map<String, String> credentials, Map<String, Object> topologyConf) {
-        for (Pair<String, Credentials> cred : getCredentials(credentials)) {
-            try {
-                Configuration configuration = getHadoopConfiguration(topologyConf, cred.getFirst());
-                String hiveMetaStoreURI = getMetaStoreURI(configuration);
-                String hiveMetaStorePrincipal = getMetaStorePrincipal(configuration);
-
-                Collection<Token<? extends TokenIdentifier>> tokens = cred.getSecond().getAllTokens();
-                login(configuration);
-
-                if (tokens != null && !tokens.isEmpty()) {
-                    for (Token token : tokens) {
-                        long expiration = renewToken(token, hiveMetaStoreURI, hiveMetaStorePrincipal);
-                        LOG.info("Hive delegation token renewed, new expiration time {}", expiration);
-                    }
-                } else {
-                    LOG.debug("No tokens found for credentials, skipping renewal.");
-                }
-            } catch (Exception e) {
-                LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
-                        "renewal period so attempting to get new tokens.", e);
-                populateCredentials(credentials, topologyConf);
-            }
-        }
-    }
-
-    private long renewToken(Token token, String metaStoreURI, String hiveMetaStorePrincipal) {
-        HCatClient hcatClient = null;
-        if (UserGroupInformation.isSecurityEnabled()) {
-            try {
-                String tokenStr = token.encodeToUrlString();
-                HiveConf hcatConf = createHiveConf(metaStoreURI, hiveMetaStorePrincipal);
-                LOG.debug("renewing delegation tokens for principal={}", hiveMetaStorePrincipal);
-                hcatClient = HCatClient.create(hcatConf);
-                Long expiryTime = hcatClient.renewDelegationToken(tokenStr);
-                LOG.info("Renewed delegation token. new expiryTime={}", expiryTime);
-                return expiryTime;
-            } catch (Exception ex) {
-                throw new RuntimeException("Failed to renew delegation tokens.", ex);
-            } finally {
-                if (hcatClient != null)
-                    try {
-                        hcatClient.close();
-                    } catch (HCatException e) {
-                        LOG.error(" Exception", e);
-                    }
-            }
-        } else {
-            throw new RuntimeException("Security is not enabled for Hadoop");
-        }
-    }
-
-
-    @Override
-    protected String getCredentialKey(String configKey) {
+    public String getCredentialKey(String configKey) {
         return HIVE_CREDENTIALS + configKey;
     }
 
-    @SuppressWarnings("unchecked")
-    public static void main(String[] args) throws Exception {
-        Map<String, Object> conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(HIVE_PRINCIPAL_KEY, args[1]); // hive principal storm-hive@WITZEN.COM
-        conf.put(HIVE_KEYTAB_FILE_KEY, args[2]); // storm hive keytab /etc/security/keytabs/storm-hive.keytab
-        conf.put(HiveConf.ConfVars.METASTOREURIS.varname, args[3]); // hive.metastore.uris : "thrift://pm-eng1-cluster1.field.hortonworks.com:9083"
-
-        AutoHive autoHive = new AutoHive();
-        autoHive.prepare(conf);
-
-        Map<String, String> creds = new HashMap<String, String>();
-        autoHive.populateCredentials(creds, conf);
-        LOG.info("Got Hive credentials" + autoHive.getCredentials(creds));
-
-        Subject subject = new Subject();
-        autoHive.populateSubject(subject, creds);
-        LOG.info("Got a Subject " + subject);
-
-        //autoHive.renew(creds, conf);
-        //LOG.info("Renewed credentials" + autoHive.getCredentials(creds));
-    }
-
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9ca6d95b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
new file mode 100644
index 0000000..7aded11
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hive.security;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.storm.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_KEYTAB_FILE_KEY;
+import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_PRINCIPAL_KEY;
+
+/**
+ * Command tool of Hive credential renewer
+ */
+public final class AutoHiveCommand {
+  private static final Logger LOG = LoggerFactory.getLogger(AutoHiveCommand.class);
+
+  private AutoHiveCommand() {
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void main(String[] args) throws Exception {
+    Map conf = new HashMap();
+    conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+    conf.put(HIVE_PRINCIPAL_KEY, args[1]); // hive principal storm-hive@WITZEN.COM
+    conf.put(HIVE_KEYTAB_FILE_KEY, args[2]); // storm hive keytab /etc/security/keytabs/storm-hive.keytab
+    conf.put(HiveConf.ConfVars.METASTOREURIS.varname, args[3]); // hive.metastore.uris : "thrift://pm-eng1-cluster1.field.hortonworks.com:9083"
+
+    AutoHive autoHive = new AutoHive();
+    autoHive.prepare(conf);
+    AutoHiveNimbus autoHiveNimbus = new AutoHiveNimbus();
+    autoHiveNimbus.prepare(conf);
+
+    Map<String, String> creds = new HashMap<>();
+    autoHiveNimbus.populateCredentials(creds, conf);
+    LOG.info("Got Hive credentials" + autoHive.getCredentials(creds));
+
+    Subject subject = new Subject();
+    autoHive.populateSubject(subject, creds);
+    LOG.info("Got a Subject " + subject);
+
+    autoHiveNimbus.renew(creds, conf);
+    LOG.info("Renewed credentials" + autoHive.getCredentials(creds));
+  }
+
+}


Mime
View raw message