Repository: sqoop Updated Branches: refs/heads/sqoop2 b1561866e -> b69fb9b68 SQOOP-2757: Sqoop2: Add module connector-sdk-hadoop to hold hadoop specific SDK classes used by the connectors (Dian Fu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b69fb9b6 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b69fb9b6 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b69fb9b6 Branch: refs/heads/sqoop2 Commit: b69fb9b6801e70cd4b144ff963c08a3679f1b261 Parents: b156186 Author: Jarek Jarcec Cecho Authored: Sun Dec 27 08:20:10 2015 +0100 Committer: Jarek Jarcec Cecho Committed: Sun Dec 27 08:20:10 2015 +0100 ---------------------------------------------------------------------- connector/connector-hdfs/pom.xml | 6 + .../sqoop/connector/hdfs/HdfsConstants.java | 2 - .../sqoop/connector/hdfs/HdfsExtractor.java | 3 +- .../connector/hdfs/HdfsFromInitializer.java | 4 +- .../apache/sqoop/connector/hdfs/HdfsLoader.java | 3 +- .../sqoop/connector/hdfs/HdfsPartitioner.java | 3 +- .../sqoop/connector/hdfs/HdfsToDestroyer.java | 4 +- .../sqoop/connector/hdfs/HdfsToInitializer.java | 4 +- .../connector/hdfs/security/SecurityUtils.java | 146 ------------------ .../hdfs/security/TestSecurityUtils.java | 49 ------- connector/connector-sdk-hadoop/pom.xml | 60 ++++++++ .../hadoop/security/SecurityUtils.java | 147 +++++++++++++++++++ .../hadoop/security/TestSecurityUtils.java | 50 +++++++ connector/pom.xml | 1 + pom.xml | 5 + server/pom.xml | 5 + 16 files changed, 280 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml index d695750..5996314 100644 --- a/connector/connector-hdfs/pom.xml +++ b/connector/connector-hdfs/pom.xml @@ -44,6 +44,12 @@ limitations under the License. + org.apache.sqoop + connector-sdk-hadoop + provided + + + org.apache.hadoop hadoop-common provided http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java index f06300a..39ee4a3 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java @@ -35,6 +35,4 @@ public final class HdfsConstants extends Constants { public static final String WORK_DIRECTORY = PREFIX + "work_dir"; public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date"; - - public static final String DELEGATION_TOKENS = PREFIX + "delegation_tokens"; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 441fe30..a813c47 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -30,14 +30,13 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.LineReader; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopIDFUtils; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; -import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java index 3a0d626..d815d58 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -21,19 +21,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; -import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import org.apache.log4j.Logger; -import java.io.IOException; import java.security.PrivilegedExceptionAction; http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index a6551e6..774221a 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -25,16 +25,15 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopIDFUtils; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter; -import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index d01e932..f35b8e9 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -39,13 +39,12 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; -import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java index 858042c..0c62ab1 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java @@ -21,17 +21,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; -import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; -import java.io.IOException; import java.security.PrivilegedExceptionAction; public class HdfsToDestroyer extends Destroyer { http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 204c978..70e0fde 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -21,18 +21,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; -import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; -import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.UUID; http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java deleted file mode 100644 index 0a42936..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.security; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Logger; -import org.apache.sqoop.common.ImmutableContext; -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.connector.hdfs.HdfsConstants; -import org.apache.sqoop.job.etl.TransferableContext; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -/** - * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security portion - * (like generating and distributing delegation tokens) won't happen automatically for us under the hood - * and we have to do everything manually. - */ -public class SecurityUtils { - - private static final Logger LOG = Logger.getLogger(SecurityUtils.class); - - /** - * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start job" commnad) - */ - static public UserGroupInformation createProxyUser(TransferableContext context) throws IOException { - return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()); - } - - /** - * Creates proxy user and load's it up with all delegation tokens that we have created ourselves - */ - static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext context) throws IOException { - UserGroupInformation proxyUser = createProxyUser(context); - loadDelegationTokensToUGI(proxyUser, context.getContext()); - - return proxyUser; - } - - /** - * Generate delegation tokens for current user (this code is suppose to run in doAs) and store them - * serialized in given mutable context. - */ - static public void generateDelegationTokens(MutableContext context, Path path, Configuration configuration) throws IOException { - if(!UserGroupInformation.isSecurityEnabled()) { - LOG.info("Running on unsecured cluster, skipping delegation token generation."); - return; - } - - // String representation of all tokens that we will create (most likely single one) - List tokens = new LinkedList<>(); - - Credentials credentials = new Credentials(); - TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration); - for (Token token : credentials.getAllTokens()) { - LOG.info("Generated token: " + token.toString()); - tokens.add(serializeToken(token)); - } - - // The context classes are transferred via "Credentials" rather then with jobconf, so we're not leaking the DT out here - if(tokens.size() > 0) { - context.setString(HdfsConstants.DELEGATION_TOKENS, StringUtils.join(tokens, " ")); - } - } - - /** - * Loads delegation tokens that we created and serialize into the mutable context - */ - static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext context) throws IOException { - String tokenList = context.getString(HdfsConstants.DELEGATION_TOKENS); - if(tokenList == null) { - LOG.info("No delegation tokens found"); - return; - } - - for(String stringToken: tokenList.split(" ")) { - Token token = deserializeToken(stringToken); - LOG.info("Loaded delegation token: " + token.toString()); - ugi.addToken(token); - } - } - - /** - * Serialize given token into String. - * - * We'll convert token to byte[] using Writable methods fro I/O and then Base64 - * encode the bytes to a human readable string. - */ - static public String serializeToken(Token token) throws IOException { - // Serialize the Token to a byte array - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - token.write(dos); - baos.flush(); - - return Base64.encodeBase64String(baos.toByteArray()); - } - - /** - * Deserialize token from given String. - * - * See serializeToken for details how the token is expected to be serialized. - */ - static public Token deserializeToken(String stringToken) throws IOException { - Token token = new Token(); - byte[] tokenBytes = Base64.decodeBase64(stringToken); - - ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes); - DataInputStream dis = new DataInputStream(bais); - token.readFields(dis); - - return token; - } - - private SecurityUtils() { - // Initialization is prohibited - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java deleted file mode 100644 index 713c704..0000000 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.security; - -import org.apache.hadoop.io.Text; -import org.testng.annotations.Test; -import org.apache.hadoop.security.token.Token; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - -public class TestSecurityUtils { - - @Test - public void testTokenSerializationDeserialization() throws Exception { - byte[] identifier = "identifier".getBytes(); - byte[] password = "password".getBytes(); - Text kind = new Text("kind"); - Text service = new Text("service"); - - Token token = new Token(identifier, password, kind, service); - String serializedForm = SecurityUtils.serializeToken(token); - assertNotNull(serializedForm); - - Token deserializedToken = SecurityUtils.deserializeToken(serializedForm); - assertNotNull(deserializedToken); - - assertEquals(identifier, deserializedToken.getIdentifier()); - assertEquals(password, deserializedToken.getPassword()); - assertEquals(kind.toString(), deserializedToken.getKind().toString()); - assertEquals(service.toString(), deserializedToken.getService().toString()); - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-sdk-hadoop/pom.xml b/connector/connector-sdk-hadoop/pom.xml new file mode 100644 index 0000000..2793886 --- /dev/null +++ b/connector/connector-sdk-hadoop/pom.xml @@ -0,0 +1,60 @@ + + + + 4.0.0 + + + org.apache.sqoop + connector + 2.0.0-SNAPSHOT + + + org.apache.sqoop + connector-sdk-hadoop + Sqoop Connector Hadoop Specific SDK + + + + commons-codec + commons-codec + + + org.testng + testng + test + + + org.apache.sqoop + sqoop-common + + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + + http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java b/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java new file mode 100644 index 0000000..44f5c03 --- /dev/null +++ b/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hadoop.security; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.job.etl.TransferableContext; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +/** + * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security portion + * (like generating and distributing delegation tokens) won't happen automatically for us under the hood + * and we have to do everything manually. + */ +public class SecurityUtils { + + private static final Logger LOG = Logger.getLogger(SecurityUtils.class); + + private static final String DELEGATION_TOKENS = "org.apache.sqoop.connector.delegation_tokens"; + + /** + * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start job" commnad) + */ + static public UserGroupInformation createProxyUser(TransferableContext context) throws IOException { + return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()); + } + + /** + * Creates proxy user and load's it up with all delegation tokens that we have created ourselves + */ + static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext context) throws IOException { + UserGroupInformation proxyUser = createProxyUser(context); + loadDelegationTokensToUGI(proxyUser, context.getContext()); + + return proxyUser; + } + + /** + * Generate delegation tokens for current user (this code is suppose to run in doAs) and store them + * serialized in given mutable context. + */ + static public void generateDelegationTokens(MutableContext context, Path path, Configuration configuration) throws IOException { + if(!UserGroupInformation.isSecurityEnabled()) { + LOG.info("Running on unsecured cluster, skipping delegation token generation."); + return; + } + + // String representation of all tokens that we will create (most likely single one) + List tokens = new LinkedList<>(); + + Credentials credentials = new Credentials(); + TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration); + for (Token token : credentials.getAllTokens()) { + LOG.info("Generated token: " + token.toString()); + tokens.add(serializeToken(token)); + } + + // The context classes are transferred via "Credentials" rather then with jobconf, so we're not leaking the DT out here + if(tokens.size() > 0) { + context.setString(DELEGATION_TOKENS, StringUtils.join(tokens, " ")); + } + } + + /** + * Loads delegation tokens that we created and serialize into the mutable context + */ + static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext context) throws IOException { + String tokenList = context.getString(DELEGATION_TOKENS); + if(tokenList == null) { + LOG.info("No delegation tokens found"); + return; + } + + for(String stringToken: tokenList.split(" ")) { + Token token = deserializeToken(stringToken); + LOG.info("Loaded delegation token: " + token.toString()); + ugi.addToken(token); + } + } + + /** + * Serialize given token into String. + * + * We'll convert token to byte[] using Writable methods fro I/O and then Base64 + * encode the bytes to a human readable string. + */ + static public String serializeToken(Token token) throws IOException { + // Serialize the Token to a byte array + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + token.write(dos); + baos.flush(); + + return Base64.encodeBase64String(baos.toByteArray()); + } + + /** + * Deserialize token from given String. + * + * See serializeToken for details how the token is expected to be serialized. + */ + static public Token deserializeToken(String stringToken) throws IOException { + Token token = new Token(); + byte[] tokenBytes = Base64.decodeBase64(stringToken); + + ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes); + DataInputStream dis = new DataInputStream(bais); + token.readFields(dis); + + return token; + } + + private SecurityUtils() { + // Initialization is prohibited + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java b/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java new file mode 100644 index 0000000..59293b2 --- /dev/null +++ b/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hadoop.security; + +import org.apache.hadoop.io.Text; +import org.testng.annotations.Test; +import org.apache.hadoop.security.token.Token; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class TestSecurityUtils { + + @Test + public void testTokenSerializationDeserialization() throws Exception { + byte[] identifier = "identifier".getBytes(); + byte[] password = "password".getBytes(); + Text kind = new Text("kind"); + Text service = new Text("service"); + + Token token = new Token(identifier, password, kind, service); + String serializedForm = SecurityUtils.serializeToken(token); + assertNotNull(serializedForm); + + Token deserializedToken = SecurityUtils.deserializeToken(serializedForm); + assertNotNull(deserializedToken); + + assertEquals(identifier, deserializedToken.getIdentifier()); + assertEquals(password, deserializedToken.getPassword()); + assertEquals(kind.toString(), deserializedToken.getKind().toString()); + assertEquals(service.toString(), deserializedToken.getService().toString()); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/pom.xml ---------------------------------------------------------------------- diff --git a/connector/pom.xml b/connector/pom.xml index be8fcb1..7340b37 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -34,6 +34,7 @@ limitations under the License. connector-sdk + connector-sdk-hadoop connector-generic-jdbc connector-hdfs connector-kite http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d7b0dd5..12786ca 100644 --- a/pom.xml +++ b/pom.xml @@ -347,6 +347,11 @@ limitations under the License. ${project.version} + org.apache.sqoop + connector-sdk-hadoop + ${project.version} + + org.apache.sqoop.connector sqoop-connector-generic-jdbc ${project.version} http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index c24183c..c0e40d5 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -78,6 +78,11 @@ limitations under the License. + org.apache.sqoop + connector-sdk-hadoop + + + org.apache.sqoop.connector sqoop-connector-generic-jdbc