oozie-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pbac...@apache.org
Subject oozie git commit: OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko)
Date Thu, 31 Aug 2017 13:30:07 GMT
Repository: oozie
Updated Branches:
  refs/heads/master df0e3c24c -> 12ea195dd


OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within
JavaActionExecutor (andras.piros via pbacsko)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/12ea195d
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/12ea195d
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/12ea195d

Branch: refs/heads/master
Commit: 12ea195dd935d64c9a9dbe447acd0a76abef3c8e
Parents: df0e3c2
Author: Peter Bacsko <pbacsko@cloudera.com>
Authored: Thu Aug 31 15:29:58 2017 +0200
Committer: Peter Bacsko <pbacsko@cloudera.com>
Committed: Thu Aug 31 15:29:58 2017 +0200

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 128 ++++++++++++++-----
 .../oozie/service/HadoopAccessorService.java    |  54 ++++++--
 .../action/hadoop/TestJavaActionExecutor.java   |  11 +-
 release-log.txt                                 |   1 +
 4 files changed, 142 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/12ea195d/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index dc17950..bca79aa 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskChecker;
@@ -962,7 +963,7 @@ public class JavaActionExecutor extends ActionExecutor {
             // Setting the credential properties in launcher conf
             Configuration credentialsConf = null;
 
-            HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
+            Map<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
                     action, actionConf);
             Credentials credentials = null;
             if (credentialsProperties != null) {
@@ -991,6 +992,11 @@ public class JavaActionExecutor extends ActionExecutor {
             LOG.debug("Creating yarnClient for action {0}", action.getId());
             yarnClient = createYarnClient(context, launcherJobConf);
 
+            if (UserGroupInformation.isSecurityEnabled()) {
+                credentials = ensureCredentials(credentials);
+                acquireHDFSDelegationToken(actionFs, credentialsConf, credentials);
+            }
+
             if (alreadyRunning && !isUserRetry) {
                 try {
                     ApplicationId appId = ConverterUtils.toApplicationId(launcherId);
@@ -1066,6 +1072,56 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
+    private Credentials ensureCredentials(final Credentials credentials) {
+        if (credentials == null) {
+            LOG.debug("No credentials present, creating a new one.");
+            return new Credentials();
+        }
+
+        return credentials;
+    }
+
+    /**
+     * In a secure environment, when both HDFS HA and log aggregation are turned on, {@link
JavaActionExecutor} is not able to call
+     * {@link YarnClient#submitApplication} since {@code HDFS_DELEGATION_TOKEN} is missing.
+     *
+     * @param actionFs the {@link FileSystem} to get the delegation token from
+     * @param credentialsConf the {@link Configuration} to extract the YARN renewer
+     * @param credentials the {@link Credentials} where the delegation token is stored
+     * @throws IOException
+     * @throws ActionExecutorException when security is enabled, but either {@code credentials}
are empty, or
+     * {@code serverPrincipal} is empty, or HDFS delegation token is not present within {@code
actionFs}
+     */
+    private void acquireHDFSDelegationToken(final FileSystem actionFs,
+                                            final Configuration credentialsConf,
+                                            final Credentials credentials)
+            throws IOException, ActionExecutorException {
+        LOG.debug("Security is enabled, checking credentials to acquire HDFS delegation token.");
+
+        final HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class);
+        final String servicePrincipal = hadoopAccessorService.getServicePrincipal(credentialsConf);
+        final String serverPrincipal = hadoopAccessorService.getServerPrincipal(
+                credentialsConf,
+                servicePrincipal);
+        if (serverPrincipal == null) {
+            final String errorTemplate = "No server principal present, won't get HDFS delegation
token. [servicePrincipal={0}]";
+            LOG.error(errorTemplate, servicePrincipal);
+            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022",
errorTemplate, servicePrincipal);
+        }
+
+        LOG.debug("Server principal present, getting HDFS delegation token. [serverPrincipal={0}]",
serverPrincipal);
+        final Token hdfsDelegationToken = actionFs.getDelegationToken(serverPrincipal);
+        if (hdfsDelegationToken == null) {
+            final String errorTemplate = "No HDFS delegation token present, won't set credentials.
[serverPrincipal={0}]";
+            LOG.error(errorTemplate, serverPrincipal);
+            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022",
errorTemplate, serverPrincipal);
+        }
+
+        LOG.debug("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]",
+                hdfsDelegationToken);
+        credentials.addToken(new Text(hdfsDelegationToken.getService().toString()), hdfsDelegationToken);
+    }
+
     private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId,
Configuration launcherJobConf,
                                         String user, Context context, Configuration actionConf,
String actionName,
                                         Credentials credentials)
@@ -1162,45 +1218,51 @@ public class JavaActionExecutor extends ActionExecutor {
         return appContext;
     }
 
-    protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context
context,
-            WorkflowAction action, Configuration actionConf) throws Exception {
-        HashMap<String, CredentialsProperties> credPropertiesMap = null;
-        if (context != null && action != null) {
-            if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) {
-                XConfiguration wfJobConf = getWorkflowConf(context);
-                if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
-                    !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP)))
{
-                    credPropertiesMap = getActionCredentialsProperties(context, action);
-                    if (!credPropertiesMap.isEmpty()) {
-                        for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet())
{
-                            if (entry.getValue() != null) {
-                                CredentialsProperties prop = entry.getValue();
-                                LOG.debug("Credential Properties set for action : " + action.getId());
-                                for (Entry<String, String> propEntry : prop.getProperties().entrySet())
{
-                                    String key = propEntry.getKey();
-                                    String value = propEntry.getValue();
-                                    actionConf.set(key, value);
-                                    LOG.debug("property : '" + key + "', value : '" + value
+ "'");
-                                }
-                            }
-                        }
-                    } else {
-                        LOG.warn("No credential properties found for action : " + action.getId()
+ ", cred : " + action.getCred());
-                    }
-                } else {
-                    LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
+    Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context
context,
+                                                                         final WorkflowAction
action,
+                                                                         final Configuration
actionConf) throws Exception {
+        if (context == null || action == null) {
+            LOG.warn("context or action is null");
+            return null;
+        }
+
+        if (Boolean.TRUE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) &&
!UserGroupInformation.isSecurityEnabled()) {
+            LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
+            return null;
+        }
+
+        final XConfiguration wfJobConf = getWorkflowConf(context);
+        if (!Boolean.FALSE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) &&
+                wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))
 &&
+                !UserGroupInformation.isSecurityEnabled()) {
+            LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
+            return null;
+        }
+
+        final Map<String, CredentialsProperties> credPropertiesMap = getActionCredentialsProperties(context,
action);
+        if (credPropertiesMap.isEmpty()) {
+            LOG.warn("No credential properties found for action : " + action.getId() + ",
cred : " + action.getCred());
+            return credPropertiesMap;
+        }
+
+        for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet())
{
+            if (entry.getValue() != null) {
+                final CredentialsProperties prop = entry.getValue();
+                LOG.debug("Credential Properties set for action : " + action.getId());
+                for (final Entry<String, String> propEntry : prop.getProperties().entrySet())
{
+                    final String key = propEntry.getKey();
+                    final String value = propEntry.getValue();
+                    actionConf.set(key, value);
+                    LOG.debug("property : '" + key + "', value : '" + value + "'");
                 }
-            } else {
-                LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
             }
-        } else {
-            LOG.warn("context or action is null");
         }
+
         return credPropertiesMap;
     }
 
     protected void setCredentialTokens(Credentials credentials, Configuration jobconf, Context
context, WorkflowAction action,
-            HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception
{
+                                       Map<String, CredentialsProperties> credPropertiesMap)
throws Exception {
 
         if (context != null && action != null && credPropertiesMap != null)
{
             // Make sure we're logged into Kerberos; if not, or near expiration, it will
relogin

http://git-wip-us.apache.org/repos/asf/oozie/blob/12ea195d/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index b507c79..187cee2 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -18,6 +18,8 @@
 
 package org.apache.oozie.service;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
@@ -661,23 +663,12 @@ public class HadoopAccessorService implements Service {
     Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
         // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not
have
         // support for renewing/cancelling tokens
-        String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
+        final String servicePrincipal = getServicePrincipal(jobConf);
         Text renewer;
         if (servicePrincipal != null) { // secure cluster
             renewer = mrTokenRenewers.get(servicePrincipal);
             if (renewer == null) {
-                // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
-                String target = jobConf.get(HADOOP_YARN_RM);
-                try {
-                    String addr = NetUtils.createSocketAddr(target).getHostName();
-                    renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal,
addr));
-                    LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal
+ ",Target=" + target
-                            + ",Renewer=" + renewer);
-                }
-                catch (IllegalArgumentException iae) {
-                    renewer = new Text(servicePrincipal.split("[/@]")[0]);
-                    LOG.info("Delegation Token Renewer for " + servicePrincipal + " is "
+ renewer);
-                }
+                renewer = new Text(getServerPrincipal(jobConf, servicePrincipal));
                 mrTokenRenewers.put(servicePrincipal, renewer);
             }
         }
@@ -687,6 +678,43 @@ public class HadoopAccessorService implements Service {
         return renewer;
     }
 
+    public String getServicePrincipal(final Configuration configuration) {
+        return configuration.get(RM_PRINCIPAL, configuration.get(JT_PRINCIPAL));
+    }
+
+    /**
+     * Mimic {@link org.apache.hadoop.mapred.Master#getMasterPrincipal}, get Kerberos principal
for use as delegation token renewer.
+     *
+     * @param configuration the {@link Configuration} containing the YARN RM address
+     * @param servicePrincipal the configured service principal
+     * @return the server principal originating from the host name and the service principal
+     * @throws IOException when something goes wrong finding out the local address inside
+     * {@link SecurityUtil#getServerPrincipal(String, String)}
+     */
+    public String getServerPrincipal(final Configuration configuration, final String servicePrincipal)
throws IOException {
+        Preconditions.checkNotNull(configuration, "configuration has to be filled");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(servicePrincipal), "servicePrincipal
has to be filled");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(configuration.get(HADOOP_YARN_RM)),
+                String.format("configuration entry %s has to be filled", HADOOP_YARN_RM));
+
+        String serverPrincipal;
+        final String target = configuration.get(HADOOP_YARN_RM);
+
+        try {
+            final String addr = NetUtils.createSocketAddr(target).getHostName();
+            serverPrincipal = SecurityUtil.getServerPrincipal(servicePrincipal, addr);
+            LOG.info("Delegation Token Renewer details: Principal={0},Target={1}", serverPrincipal,
target);
+        }
+        catch (final IllegalArgumentException iae) {
+            LOG.warn("An error happened while trying to get server principal. Getting it
from service principal anyway.", iae);
+
+            serverPrincipal = servicePrincipal.split("[/@]")[0];
+            LOG.info("Delegation Token Renewer for {0} is {1}", target, serverPrincipal);
+        }
+
+        return serverPrincipal;
+    }
+
     public void addFileToClassPath(String user, final Path file, final Configuration conf)
             throws IOException {
         ParamChecker.notEmpty(user, "user");

http://git-wip-us.apache.org/repos/asf/oozie/blob/12ea195d/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index c51c64a..ce674ad 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -878,7 +878,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         Configuration actionConf = ae.createBaseHadoopConf(context, actionXmlconf);
 
         // Setting the credential properties in launcher conf
-        HashMap<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context,
+        Map<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context,
                 action, actionConf);
 
         assertNotNull(credProperties);
@@ -951,8 +951,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         Configuration actionConf = ae.createBaseHadoopConf(context, actionXmlconf);
 
         try {
-        // Setting the credential properties in launcher conf should fail
-        ae.setCredentialPropertyToActionConf(context, action, actionConf);
+            // Setting the credential properties in launcher conf should fail
+            ae.setCredentialPropertyToActionConf(context, action, actionConf);
         }
         catch (ActionExecutorException e) {
             assertEquals(e.getErrorCode(), "JA021");
@@ -991,8 +991,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         Configuration actionConf = ae.createBaseHadoopConf(context, actionXmlconf);
 
         // should not throw JA021 exception
-        HashMap<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context,
action,
-                    actionConf);
+        ae.setCredentialPropertyToActionConf(context, action, actionConf);
     }
 
     public void testCredentialsSkip() throws Exception {
@@ -1091,7 +1090,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         ConfigurationService.setBoolean("oozie.credentials.skip", skipSite);
 
         // Setting the credential properties in launcher conf
-        HashMap<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context,
+        Map<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context,
                 action, actionConf);
 
         // Try to load the token without it being defined in oozie-site; should get an exception

http://git-wip-us.apache.org/repos/asf/oozie/blob/12ea195d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b63f42d..e2311ea 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within
JavaActionExecutor (andras.piros via pbacsko)
 OOZIE-3026 fix openjpa enhancer stage during build for logging (dbdist13, andras.piros via
pbacsko)
 OOZIE-2746 Several tests failure in TestV2ValidateServlet.java (Dongying Jiao via asasvari)
 OOZIE-3038 Make all Oozie JUnit tests pass on dist_test (andras.piros via gezapeti)


Mime
View raw message