storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2501: Auto populate Hive credentials using Hive MetaStore delegation tokens
Date Thu, 18 May 2017 11:31:04 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch c6caa4563 -> e4e912fc3


STORM-2501: Auto populate Hive credentials using Hive MetaStore delegation tokens


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad5ac422
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad5ac422
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad5ac422

Branch: refs/heads/1.x-branch
Commit: ad5ac422833a857a311563a2a529c15a65353613
Parents: 4058623
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
Authored: Tue Apr 25 17:24:06 2017 +0530
Committer: Manikumar Reddy O <manikumar.reddy@gmail.com>
Committed: Thu May 18 11:43:42 2017 +0530

----------------------------------------------------------------------
 external/storm-autocreds/pom.xml                |  38 +++
 .../apache/storm/hive/security/AutoHive.java    | 279 +++++++++++++++++++
 external/storm-hive/README.md                   |  82 +++++-
 external/storm-hive/pom.xml                     |  11 +
 .../org/apache/storm/hive/bolt/HiveBolt.java    |  31 +--
 .../apache/storm/hive/common/HiveOptions.java   |   3 -
 .../org/apache/storm/hive/common/HiveUtils.java | 105 ++++---
 .../apache/storm/hive/common/HiveWriter.java    |  46 ++-
 .../apache/storm/hive/trident/HiveState.java    |  29 +-
 .../apache/storm/hive/bolt/TestHiveBolt.java    |   4 +-
 .../storm/hive/common/TestHiveWriter.java       |   6 +-
 11 files changed, 537 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-autocreds/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml
index 3762cfd..39db2ab 100644
--- a/external/storm-autocreds/pom.xml
+++ b/external/storm-autocreds/pom.xml
@@ -78,6 +78,44 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-streaming</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-avatica</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-webhcat-java-client</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-avatica</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
     <build>
     <plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
new file mode 100644
index 0000000..da2997e
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hive.security;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.storm.Config;
+import org.apache.storm.common.AbstractAutoCreds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Automatically get hive delegation tokens and push it to user's topology.
+ */
+public class AutoHive extends AbstractAutoCreds {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHive.class);
+
+    public static final String HIVE_CREDENTIALS = "HIVE_CREDENTIALS";
+    public static final String HIVE_CREDENTIALS_CONFIG_KEYS = "hiveCredentialsConfigKeys";
+
+    public static final String HIVE_KEYTAB_FILE_KEY = "hive.keytab.file";
+    public static final String HIVE_PRINCIPAL_KEY = "hive.kerberos.principal";
+
+    public String hiveKeytab;
+    public String hivePrincipal;
+    public String metaStoreURI;
+
+    @Override
+    public void doPrepare(Map conf) {
+        if (conf.containsKey(HIVE_KEYTAB_FILE_KEY) && conf.containsKey(HIVE_PRINCIPAL_KEY))
{
+            hiveKeytab = (String) conf.get(HIVE_KEYTAB_FILE_KEY);
+            hivePrincipal = (String) conf.get(HIVE_PRINCIPAL_KEY);
+            metaStoreURI = (String) conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
+        }
+    }
+
+    @Override
+    protected String getConfigKeyString() {
+        return HIVE_CREDENTIALS_CONFIG_KEYS;
+    }
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    protected byte[] getHadoopCredentials(Map conf, String configKey) {
+        Configuration configuration = getHadoopConfiguration(conf, configKey);
+        return getHadoopCredentials(conf, configuration);
+    }
+
+    @Override
+    protected byte[] getHadoopCredentials(Map conf) {
+        Configuration configuration = new Configuration();
+        return getHadoopCredentials(conf, configuration);
+    }
+
+    private Configuration getHadoopConfiguration(Map topoConf, String configKey) {
+        Configuration configuration = new Configuration();
+        fillHadoopConfiguration(topoConf, configKey, configuration);
+        return configuration;
+    }
+
+    public HiveConf createHiveConf(String metaStoreURI, String hiveMetaStorePrincipal) throws
IOException {
+        HiveConf hcatConf = new HiveConf();
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+        hcatConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+        hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
+        hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, hiveMetaStorePrincipal);
+        return hcatConf;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected byte[] getHadoopCredentials(Map conf, final Configuration configuration) {
+        try {
+            if (UserGroupInformation.isSecurityEnabled()) {
+                String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+                String hiveMetaStoreURI = getMetaStoreURI(configuration);
+                String hiveMetaStorePrincipal = getMetaStorePrincipal(configuration);
+                HiveConf hcatConf = createHiveConf(hiveMetaStoreURI, hiveMetaStorePrincipal);
+                login(configuration);
+
+                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+                UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser,
currentUser);
+                try {
+                    Token<DelegationTokenIdentifier> delegationTokenId =
+                            getDelegationToken(hcatConf, hiveMetaStorePrincipal, topologySubmitterUser);
+                    proxyUser.addToken(delegationTokenId);
+                    LOG.info("Obtained Hive tokens, adding to user credentials.");
+
+                    Credentials credential = proxyUser.getCredentials();
+                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                    ObjectOutputStream out = new ObjectOutputStream(bao);
+                    credential.write(out);
+                    out.flush();
+                    out.close();
+                    return bao.toByteArray();
+                } catch (Exception ex) {
+                    LOG.debug(" Exception" + ex.getMessage());
+                    throw ex;
+                }
+            } else {
+                throw new RuntimeException("Security is not enabled for Hadoop");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens.", ex);
+        }
+    }
+
+    private Token<DelegationTokenIdentifier> getDelegationToken(HiveConf hcatConf,
+                                                                String metaStoreServicePrincipal,
+                                                                String topologySubmitterUser)
throws IOException {
+        LOG.info("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
+
+        HCatClient hcatClient = null;
+        try {
+            hcatClient = HCatClient.create(hcatConf);
+            String delegationToken = hcatClient.getDelegationToken(topologySubmitterUser,
metaStoreServicePrincipal);
+            Token<DelegationTokenIdentifier> delegationTokenId = new Token<DelegationTokenIdentifier>();
+            delegationTokenId.decodeFromUrlString(delegationToken);
+
+            DelegationTokenIdentifier d = new DelegationTokenIdentifier();
+            d.readFields(new DataInputStream(new ByteArrayInputStream(
+                    delegationTokenId.getIdentifier())));
+            LOG.info("Created Delegation Token for : " + d.getUser());
+
+            return delegationTokenId;
+        } finally {
+            if (hcatClient != null)
+                hcatClient.close();
+        }
+    }
+
+    private String getMetaStoreURI(Configuration configuration) {
+        if (configuration.get(HiveConf.ConfVars.METASTOREURIS.varname) == null)
+            return metaStoreURI;
+        else
+            return configuration.get(HiveConf.ConfVars.METASTOREURIS.varname);
+    }
+
+    private String getMetaStorePrincipal(Configuration configuration) {
+        if (configuration.get(HIVE_PRINCIPAL_KEY) == null)
+            return hivePrincipal;
+        else
+            return configuration.get(HIVE_PRINCIPAL_KEY);
+    }
+
+    private void login(Configuration configuration) throws IOException {
+        if (configuration.get(HIVE_KEYTAB_FILE_KEY) == null) {
+            configuration.set(HIVE_KEYTAB_FILE_KEY, hiveKeytab);
+        }
+        if (configuration.get(HIVE_PRINCIPAL_KEY) == null) {
+            configuration.set(HIVE_PRINCIPAL_KEY, hivePrincipal);
+        }
+        SecurityUtil.login(configuration, HIVE_KEYTAB_FILE_KEY, HIVE_PRINCIPAL_KEY);
+        LOG.info("Logged into hive with principal {}", configuration.get(HIVE_PRINCIPAL_KEY));
+    }
+
+    @Override
+    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+        for (Pair<String, Credentials> cred : getCredentials(credentials)) {
+            try {
+                Configuration configuration = getHadoopConfiguration(topologyConf, cred.getFirst());
+                String hiveMetaStoreURI = getMetaStoreURI(configuration);
+                String hiveMetaStorePrincipal = getMetaStorePrincipal(configuration);
+
+                Collection<Token<? extends TokenIdentifier>> tokens = cred.getSecond().getAllTokens();
+                login(configuration);
+
+                if (tokens != null && !tokens.isEmpty()) {
+                    for (Token token : tokens) {
+                        long expiration = renewToken(token, hiveMetaStoreURI, hiveMetaStorePrincipal);
+                        LOG.info("Hive delegation token renewed, new expiration time {}",
expiration);
+                    }
+                } else {
+                    LOG.debug("No tokens found for credentials, skipping renewal.");
+                }
+            } catch (Exception e) {
+                LOG.warn("could not renew the credentials, one of the possible reason is
tokens are beyond " +
+                        "renewal period so attempting to get new tokens.", e);
+                populateCredentials(credentials, topologyConf);
+            }
+        }
+    }
+
+    private long renewToken(Token token, String metaStoreURI, String hiveMetaStorePrincipal)
{
+        HCatClient hcatClient = null;
+        if (UserGroupInformation.isSecurityEnabled()) {
+            try {
+                String tokenStr = token.encodeToUrlString();
+                HiveConf hcatConf = createHiveConf(metaStoreURI, hiveMetaStorePrincipal);
+                LOG.debug("renewing delegation tokens for principal={}", hiveMetaStorePrincipal);
+                hcatClient = HCatClient.create(hcatConf);
+                Long expiryTime = hcatClient.renewDelegationToken(tokenStr);
+                LOG.info("Renewed delegation token. new expiryTime={}", expiryTime);
+                return expiryTime;
+            } catch (Exception ex) {
+                throw new RuntimeException("Failed to renew delegation tokens.", ex);
+            } finally {
+                if (hcatClient != null)
+                    try {
+                        hcatClient.close();
+                    } catch (HCatException e) {
+                        LOG.error(" Exception", e);
+                    }
+            }
+        } else {
+            throw new RuntimeException("Security is not enabled for Hadoop");
+        }
+    }
+
+
+    @Override
+    protected String getCredentialKey(String configKey) {
+        return HIVE_CREDENTIALS + configKey;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        conf.put(HIVE_PRINCIPAL_KEY, args[1]); // hive principal storm-hive@WITZEN.COM
+        conf.put(HIVE_KEYTAB_FILE_KEY, args[2]); // storm hive keytab /etc/security/keytabs/storm-hive.keytab
+        conf.put(HiveConf.ConfVars.METASTOREURIS.varname, args[3]); // hive.metastore.uris
: "thrift://pm-eng1-cluster1.field.hortonworks.com:9083"
+
+        AutoHive autoHive = new AutoHive();
+        autoHive.prepare(conf);
+
+        Map<String, String> creds = new HashMap<String, String>();
+        autoHive.populateCredentials(creds, conf);
+        LOG.info("Got Hive credentials" + autoHive.getCredentials(creds));
+
+        Subject subject = new Subject();
+        autoHive.populateSubject(subject, creds);
+        LOG.info("Got a Subject " + subject);
+
+        //autoHive.renew(creds, conf);
+        //LOG.info("Renewed credentials" + autoHive.getCredentials(creds));
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
index 3ae0ea0..cccb1a9 100644
--- a/external/storm-hive/README.md
+++ b/external/storm-hive/README.md
@@ -99,8 +99,85 @@ Hive Trident state also follows similar pattern to HiveBolt it takes in
HiveOpti
                 	     		
    StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
    TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new
Fields());
- ```
+```
    
+
+##Working with Secure Hive
+If your topology is going to interact with secure Hive, your bolts/states needs to be authenticated
by Hive Server. We 
+currently have 2 options to support this:
+
+### Using keytabs on all worker hosts
+If you have distributed the keytab files for hive user on all potential worker hosts then
you can use this method. You should specify
+hive configs using the methods HiveOptions.withKerberosKeytab(), HiveOptions.withKerberosPrincipal()
methods.
+
+On worker hosts the bolt/trident-state code will use the keytab file with principal provided
in the config to authenticate with 
+Hive. This method is little dangerous as you need to ensure all workers have the keytab file
at the same location and you need
+to remember this as you bring up new hosts in the cluster.
+
+
+### Using Hive MetaStore delegation tokens 
+Your administrator can configure nimbus to automatically get delegation tokens on behalf
of the topology submitter user.
+Since Hive depends on HDFS, we should also configure HDFS delegation tokens.
+
+More details about Hadoop Tokens here: https://github.com/apache/storm/blob/master/docs/storm-hdfs.md
+
+The nimbus should be started with following configurations:
+
+```
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hive.security.AutoHive", "org.apache.storm.hdfs.security.AutoHDFS"]
+nimbus.credential.renewers.classes : ["org.apache.storm.hive.security.AutoHive", "org.apache.storm.hdfs.security.AutoHDFS"]
+nimbus.credential.renewers.freq.secs : 82800 (23 hours)
+
+hive.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hive super user that
can impersonate other users.)
+hive.kerberos.principal: "superuser@EXAMPLE.com"
+hive.metastore.uris: "thrift://server:9083"
+
+//hdfs configs
+hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that
can impersonate other users.)
+hdfs.kerberos.principal: "superuser@EXAMPLE.com"
+```
+
+
+Your topology configuration should have:
+
+```
+topology.auto-credentials :["org.apache.storm.hive.security.AutoHive", "org.apache.storm.hdfs.security.AutoHDFS"]
+```
+
+If nimbus did not have the above configuration you need to add and then restart it. Ensure
the hadoop configuration 
+files (core-site.xml, hdfs-site.xml and hive-site.xml) and the storm-hive connector jar with
all the dependencies is present in nimbus's classpath.
+
+As an alternative to adding the configuration files (core-site.xml, hdfs-site.xml and hive-site.xml)
to the classpath, you could specify the configurations
+as a part of the topology configuration. E.g. in you custom storm.yaml (or -c option while
submitting the topology),
+
+
+```
+hiveCredentialsConfigKeys : ["hivecluster1", "hivecluster2"] (the hive clusters you want
to fetch the tokens from)
+"hivecluster1": {"config1": "value1", "config2": "value2", ... } (A map of config key-values
specific to cluster1)
+"hivecluster2": {"config1": "value1", "hive.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs",
"hive.kerberos.principal": "cluster2user@EXAMPLE.com", "hive.metastore.uris": "thrift://server:9083"}
(here along with other configs, we have custom keytab and principal for "cluster2" which will
override the keytab/principal specified at topology level)
+
+hdfsCredentialsConfigKeys : ["hdfscluster1", "hdfscluster2"] (the hdfs clusters you want
to fetch the tokens from)
+"hdfscluster1": {"config1": "value1", "config2": "value2", ... } (A map of config key-values
specific to cluster1)
+"hdfscluster2": {"config1": "value1", "hdfs.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs",
"hdfs.kerberos.principal": "cluster2user@EXAMPLE.com"} (here along with other configs, we
have custom keytab and principal for "cluster2" which will override the keytab/principal specified
at topology level)
+```
+
+Instead of specifying key values you may also directly specify the resource files for e.g.,
+
+```
+"cluster1": {"resources": ["/path/to/core-site1.xml", "/path/to/hdfs-site1.xml", "/path/to/hive-site1.xml"]}
+"cluster2": {"resources": ["/path/to/core-site2.xml", "/path/to/hdfs-site2.xml", "/path/to/hive-site2.xml"]}
+```
+
+Storm will download the tokens separately for each of the clusters and populate it into the
subject and also renew the tokens periodically. This way it would be possible to run multiple
bolts connecting to separate Hive cluster within the same topology.
+
+Nimbus will use the keytab and principal specified in the config to authenticate with Hive
metastore. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation
tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHive,
nimbus will push the
+delegation tokens to all the workers for your topology and the hive bolt/state will authenticate
with Hive Server using 
+these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified
in hive.kerberos.principal 
+has permissions to acquire tokens on behalf of other users.
  
 ## Committer Sponsors
  * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
@@ -109,7 +186,4 @@ Hive Trident state also follows similar pattern to HiveBolt it takes in
HiveOpti
 
 
 
-
-
-
  

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
index 052cef9..a7d271f 100644
--- a/external/storm-hive/pom.xml
+++ b/external/storm-hive/pom.xml
@@ -152,6 +152,17 @@
       <version>0.9.3</version>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+      <version>${calcite.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-autocreds</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index dc8be91..0215b32 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -18,6 +18,9 @@
 
 package org.apache.storm.hive.bolt;
 
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.SerializationError;
+import org.apache.hive.hcatalog.streaming.StreamingException;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
@@ -30,14 +33,12 @@ import org.apache.storm.hive.common.HiveWriter;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import org.apache.hive.hcatalog.streaming.*;
 import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.hive.common.HiveUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Timer;
@@ -48,7 +49,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.List;
-import java.util.LinkedList;
 import java.io.IOException;
 
 public class HiveBolt extends BaseRichBolt {
@@ -57,11 +57,11 @@ public class HiveBolt extends BaseRichBolt {
     private HiveOptions options;
     private ExecutorService callTimeoutPool;
     private transient Timer heartBeatTimer;
-    private Boolean kerberosEnabled = false;
     private AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
     private UserGroupInformation ugi = null;
     private Map<HiveEndPoint, HiveWriter> allWriters;
     private BatchHelper batchHelper;
+    private boolean tokenAuthEnabled;
 
     public HiveBolt(HiveOptions options) {
         this.options = options;
@@ -70,23 +70,14 @@ public class HiveBolt extends BaseRichBolt {
     @Override
     public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector)
 {
         try {
-            if(options.getKerberosPrincipal() == null && options.getKerberosKeytab()
== null) {
-                kerberosEnabled = false;
-            } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab()
!= null) {
-                kerberosEnabled = true;
-            } else {
-                throw new IllegalArgumentException("To enable Kerberos, need to set both
KerberosPrincipal " +
-                                                   " & KerberosKeytab");
+            tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
+            try {
+                ugi = HiveUtils.authenticate(tokenAuthEnabled, options.getKerberosKeytab(),
options.getKerberosPrincipal());
+            } catch(HiveUtils.AuthenticationFailed ex) {
+                LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex);
+                throw new IllegalArgumentException(ex);
             }
 
-            if (kerberosEnabled) {
-                try {
-                    ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal());
-                } catch(HiveUtils.AuthenticationFailed ex) {
-                    LOG.error("Hive Kerberos authentication failed " + ex.getMessage(), ex);
-                    throw new IllegalArgumentException(ex);
-                }
-            }
             this.collector = collector;
             this.batchHelper = new BatchHelper(options.getBatchSize(), collector);
             allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>();
@@ -259,7 +250,7 @@ public class HiveBolt extends BaseRichBolt {
             HiveWriter writer = allWriters.get( endPoint );
             if (writer == null) {
                 LOG.debug("Creating Writer to Hive end point : " + endPoint);
-                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options,
tokenAuthEnabled);
                 if (allWriters.size() > (options.getMaxOpenConnections() - 1)) {
                     LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {}
", allWriters.size(), options.getMaxOpenConnections());
                     int retired = retireIdleWriters();

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
index ab81a75..e80f5d7 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -20,10 +20,7 @@ package org.apache.storm.hive.common;
 
 import java.io.Serializable;
 
-import org.apache.storm.hive.common.HiveWriter;
 import org.apache.storm.hive.bolt.mapper.HiveMapper;
-import org.apache.hive.hcatalog.streaming.*;
-
 
 public class HiveOptions implements Serializable {
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
index 591d565..4222640 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,68 +18,101 @@
 
 package org.apache.storm.hive.common;
 
-import org.apache.storm.hive.common.HiveWriter;
-import org.apache.storm.hive.bolt.mapper.HiveMapper;
-import org.apache.hive.hcatalog.streaming.*;
-
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-
+import org.apache.hive.hcatalog.streaming.ConnectionError;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.storm.hive.security.AutoHive;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.io.File;
-import java.io.IOException;
+
+import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
 
 public class HiveUtils {
     private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
 
     public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions
options) throws ConnectionError {
-        if(partitionVals==null) {
+        if (partitionVals == null) {
             return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(),
options.getTableName(), null);
         }
         return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(),
partitionVals);
     }
 
-    public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool,
UserGroupInformation ugi, HiveOptions options)
-        throws HiveWriter.ConnectFailure, InterruptedException {
+    public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool,
UserGroupInformation ugi, HiveOptions options, boolean tokenAuthEnabled)
+            throws HiveWriter.ConnectFailure, InterruptedException {
         return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
-                              options.getCallTimeOut(), callTimeoutPool, options.getMapper(),
ugi);
+                options.getCallTimeOut(), callTimeoutPool, options.getMapper(), ugi, tokenAuthEnabled);
     }
 
-    public static synchronized UserGroupInformation authenticate(String keytab, String principal)
-    throws AuthenticationFailed {
-        File kfile = new File(keytab);
-        if (!(kfile.isFile() && kfile.canRead())) {
-            throw new IllegalArgumentException("The keyTab file: "
-                                               + keytab + " is nonexistent or can't read.
"
-                                               + "Please specify a readable keytab file for
Kerberos auth.");
-        }
-        try {
-            principal = SecurityUtil.getServerPrincipal(principal, "");
-        } catch (Exception e) {
-            throw new AuthenticationFailed("Host lookup error when resolving principal "
+ principal, e);
+    public static synchronized UserGroupInformation authenticate(boolean isTokenAuthEnabled,
String keytab, String principal) throws AuthenticationFailed {
+
+        if (isTokenAuthEnabled)
+            return getCurrentUser(principal);
+
+        boolean kerberosEnabled = false;
+
+        if (principal == null && keytab == null) {
+            kerberosEnabled = false;
+        } else if (principal != null && keytab != null) {
+            kerberosEnabled = true;
+        } else {
+            throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal
and  KerberosKeytab");
         }
-        try {
-            UserGroupInformation.loginUserFromKeytab(principal, keytab);
-            return UserGroupInformation.getLoginUser();
-        } catch (IOException e) {
-            throw new AuthenticationFailed("Login failed for principal " + principal, e);
+
+        if (kerberosEnabled) {
+            File kfile = new File(keytab);
+
+            if (!(kfile.isFile() && kfile.canRead())) {
+                throw new IllegalArgumentException("The keyTab file: " + keytab + " is nonexistent
or can't read. "
+                        + "Please specify a readable keytab file for Kerberos auth.");
+            }
+
+            try {
+                principal = SecurityUtil.getServerPrincipal(principal, "");
+            } catch (Exception e) {
+                throw new AuthenticationFailed("Host lookup error when resolving principal
" + principal, e);
+            }
+
+            try {
+                UserGroupInformation.loginUserFromKeytab(principal, keytab);
+                return UserGroupInformation.getLoginUser();
+            } catch (IOException e) {
+                throw new AuthenticationFailed("Login failed for principal " + principal,
e);
+            }
         }
+
+        return null;
+
     }
 
-     public static class AuthenticationFailed extends Exception {
-         public AuthenticationFailed(String reason, Exception cause) {
-             super("Kerberos Authentication Failed. " + reason, cause);
-         }
-     }
+    public static class AuthenticationFailed extends Exception {
+        public AuthenticationFailed(String reason, Exception cause) {
+            super("Kerberos Authentication Failed. " + reason, cause);
+        }
+    }
 
     public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> allWriters)
{
-        for (Map.Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+        for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
             LOG.info("cached writers {} ", entry.getValue());
         }
     }
+
+    public static boolean isTokenAuthEnabled(Map conf) {
+        return conf.get(TOPOLOGY_AUTO_CREDENTIALS) != null && (((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHive.class.getName()));
+    }
+
+
+    private static UserGroupInformation getCurrentUser(String principal) throws AuthenticationFailed
{
+        try {
+            return UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+            throw new AuthenticationFailed("Login failed for principal " + principal, e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
index 4df1c60..fff4b4d 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -19,15 +19,15 @@
 package org.apache.storm.hive.common;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.hcatalog.streaming.*;
 import org.apache.storm.hive.bolt.mapper.HiveMapper;
@@ -58,7 +58,8 @@ public class HiveWriter {
 
     public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
                       boolean autoCreatePartitions, long callTimeout,
-                      ExecutorService callTimeoutPool, HiveMapper mapper, UserGroupInformation
ugi)
+                      ExecutorService callTimeoutPool, HiveMapper mapper,
+                      UserGroupInformation ugi, boolean tokenAuthEnabled)
         throws InterruptedException, ConnectFailure {
         try {
             this.autoCreatePartitions = autoCreatePartitions;
@@ -66,9 +67,9 @@ public class HiveWriter {
             this.callTimeoutPool = callTimeoutPool;
             this.endPoint = endPoint;
             this.ugi = ugi;
-            this.connection = newConnection(ugi);
+            this.connection = newConnection(ugi, tokenAuthEnabled);
             this.txnsPerBatch = txnsPerBatch;
-            this.recordWriter = mapper.createRecordWriter(endPoint);
+            this.recordWriter = getRecordWriter(mapper, tokenAuthEnabled);
             this.txnBatch = nextTxnBatch(recordWriter);
             this.closed = false;
             this.lastUsed = System.currentTimeMillis();
@@ -81,6 +82,35 @@ public class HiveWriter {
         }
     }
 
+    public RecordWriter getRecordWriter(final HiveMapper mapper, final boolean tokenAuthEnabled)
throws  Exception {
+        if (!tokenAuthEnabled)
+          return mapper.createRecordWriter(endPoint);
+
+        try {
+            return ugi.doAs (
+                    new PrivilegedExceptionAction<RecordWriter>() {
+                        @Override
+                        public RecordWriter run() throws StreamingException, IOException,
ClassNotFoundException {
+                            return mapper.createRecordWriter(endPoint);
+                        }
+                    }
+            );
+        } catch (Exception e) {
+            throw new ConnectFailure(endPoint, e);
+        }
+    }
+
+
+    private  HiveConf createHiveConf(String metaStoreURI, boolean tokenAuthEnabled)  {
+        if (!tokenAuthEnabled)
+            return null;
+
+        HiveConf hcatConf = new HiveConf();
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+        hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
+        return hcatConf;
+    }
+
     @Override
     public String toString() {
           return "{ "
@@ -229,13 +259,13 @@ public class HiveWriter {
         }
     }
 
-    private StreamingConnection newConnection(final UserGroupInformation ugi)
+    private StreamingConnection newConnection(final UserGroupInformation ugi, final boolean
tokenAuthEnabled)
         throws InterruptedException, ConnectFailure {
         try {
             return  callWithTimeout(new CallRunner<StreamingConnection>() {
                     @Override
                     public StreamingConnection call() throws Exception {
-                        return endPoint.newConnection(autoCreatePartitions, null, ugi); //
could block
+                        return endPoint.newConnection(autoCreatePartitions, createHiveConf(endPoint.metaStoreUri,
tokenAuthEnabled) , ugi); // could block
                     }
                 });
         } catch(StreamingException e) {
@@ -324,7 +354,7 @@ public class HiveWriter {
     /**
      * if there are remainingTransactions in current txnBatch, begins nextTransactions
      * otherwise creates new txnBatch.
-     * @param boolean rollToNext
+     * @param rollToNext
      */
     private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException,
TxnBatchFailure {
         if(txnBatch.remainingTransactions() == 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
index 11f329c..10b3591 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.storm.hive.trident;
 
 import org.apache.storm.trident.operation.TridentCollector;
@@ -25,7 +24,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.topology.FailedException;
 import org.apache.storm.hive.common.HiveWriter;
-import org.apache.storm.hive.common.HiveWriter;
 import org.apache.hive.hcatalog.streaming.*;
 import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.hive.common.HiveUtils;
@@ -35,8 +33,6 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +57,7 @@ public class HiveState implements State {
     private UserGroupInformation ugi = null;
     private Boolean kerberosEnabled = false;
     private Map<HiveEndPoint, HiveWriter> allWriters;
+    private boolean tokenAuthEnabled;
 
     public HiveState(HiveOptions options) {
         this.options = options;
@@ -78,22 +75,12 @@ public class HiveState implements State {
 
     public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions)
 {
         try {
-            if(options.getKerberosPrincipal() == null && options.getKerberosKeytab()
== null) {
-                kerberosEnabled = false;
-            } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab()
!= null) {
-                kerberosEnabled = true;
-            } else {
-                throw new IllegalArgumentException("To enable Kerberos, need to set both
KerberosPrincipal " +
-                                                   " & KerberosKeytab");
-            }
-
-            if (kerberosEnabled) {
-                try {
-                    ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal());
-                } catch(HiveUtils.AuthenticationFailed ex) {
-                    LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex);
-                    throw new IllegalArgumentException(ex);
-                }
+            tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
+            try {
+                ugi = HiveUtils.authenticate(tokenAuthEnabled, options.getKerberosKeytab(),
options.getKerberosPrincipal());
+            } catch(HiveUtils.AuthenticationFailed ex) {
+                LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex);
+                throw new IllegalArgumentException(ex);
             }
 
             allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>();
@@ -203,7 +190,7 @@ public class HiveState implements State {
             HiveWriter writer = allWriters.get( endPoint );
             if( writer == null ) {
                 LOG.info("Creating Writer to Hive end point : " + endPoint);
-                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
+                writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options,
tokenAuthEnabled);
                 if(allWriters.size() > (options.getMaxOpenConnections() - 1)){
                     int retired = retireIdleWriters();
                     if(retired==0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index 0cf0084..a223c41 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -132,11 +132,11 @@ public class TestHiveBolt {
         // 1) Basic
         HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
                                               , Arrays.asList(partitionVals.split(",")));
-        StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
+        StreamingConnection connection = endPt.newConnection(false, (HiveConf)null); //shouldn't
throw
         connection.close();
         // 2) Leave partition unspecified
         endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
-        endPt.newConnection(false, null).close(); // should not throw
+        endPt.newConnection(false, (HiveConf)null).close(); // should not throw
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/ad5ac422/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
index e43fec6..a53033f 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -112,7 +112,7 @@ public class TestHiveWriter {
             .withPartitionFields(new Fields(partNames));
         HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
         HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
-                                           ,callTimeoutPool, mapper, ugi);
+                                           ,callTimeoutPool, mapper, ugi, false);
         writer.close();
     }
 
@@ -123,7 +123,7 @@ public class TestHiveWriter {
             .withPartitionFields(new Fields(partNames));
         HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
         HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
-                                           , callTimeoutPool, mapper, ugi);
+                                           , callTimeoutPool, mapper, ugi, false);
         writeTuples(writer,mapper,3);
         writer.flush(false);
         writer.close();
@@ -138,7 +138,7 @@ public class TestHiveWriter {
 
         HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, Arrays.asList(partitionVals));
         HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
-                                           , callTimeoutPool, mapper, ugi);
+                                           , callTimeoutPool, mapper, ugi, false);
         Tuple tuple = generateTestTuple("1","abc");
         writer.write(mapper.mapRecord(tuple));
         tuple = generateTestTuple("2","def");


Mime
View raw message