flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StephanEwen <...@git.apache.org>
Subject [GitHub] flink pull request #3057: [FLINK-5364] Rework JAAS configuration to support ...
Date Fri, 06 Jan 2017 20:08:03 GMT
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3057#discussion_r95007601
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
---
    @@ -71,163 +64,93 @@
     	 */
     	public static void install(SecurityConfiguration config) throws Exception {
     
    -		if (!config.securityIsEnabled()) {
    -			// do not perform any initialization if no Kerberos crendetails are provided
    -			return;
    -		}
    -
    -		// establish the JAAS config
    -		JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
    -		javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
    -
    -		populateSystemSecurityProperties(config.flinkConf);
    -
    -		// establish the UGI login user
    -		UserGroupInformation.setConfiguration(config.hadoopConf);
    -
    -		// only configure Hadoop security if we have security enabled
    -		if (UserGroupInformation.isSecurityEnabled()) {
    -
    -			final UserGroupInformation loginUser;
    -
    -			if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
    -				String keytabPath = (new File(config.keytab)).getAbsolutePath();
    -
    -				UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
    -
    -				loginUser = UserGroupInformation.getLoginUser();
    -
    -				// supplement with any available tokens
    -				String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
    -				if (fileLocation != null) {
    -				/*
    -				 * Use reflection API since the API semantics are not available in Hadoop1 profile.
Below APIs are
    -				 * used in the context of reading the stored tokens from UGI.
    -				 * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
    -				 * loginUser.addCredentials(cred);
    -				*/
    -					try {
    -						Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
    -							File.class, org.apache.hadoop.conf.Configuration.class);
    -						Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
    -							config.hadoopConf);
    -						Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
    -							Credentials.class);
    -						addCredentialsMethod.invoke(loginUser, cred);
    -					} catch (NoSuchMethodException e) {
    -						LOG.warn("Could not find method implementations in the shaded jar. Exception: {}",
e);
    -					}
    -				}
    -			} else {
    -				// login with current user credentials (e.g. ticket cache)
    -				try {
    -					//Use reflection API to get the login user object
    -					//UserGroupInformation.loginUserFromSubject(null);
    -					Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject",
Subject.class);
    -					Subject subject = null;
    -					loginUserFromSubjectMethod.invoke(null, subject);
    -				} catch (NoSuchMethodException e) {
    -					LOG.warn("Could not find method implementations in the shaded jar. Exception: {}",
e);
    -				}
    -
    -				// note that the stored tokens are read automatically
    -				loginUser = UserGroupInformation.getLoginUser();
    +		// install the security modules
    +		List<SecurityModule> modules = new ArrayList();
    +		try {
    +			for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules())
{
    +				SecurityModule module = moduleClass.newInstance();
    +				module.install(config);
    +				modules.add(module);
     			}
    +		}
    +		catch(Exception ex) {
    +			throw new Exception("unable to establish the security context", ex);
    +		}
    +		installedModules = modules;
     
    -			LOG.info("Hadoop user set to {}", loginUser.toString());
    +		// install a security context
    +		// use the Hadoop login user as the subject of the installed security context
    +		if (!(installedContext instanceof NoOpSecurityContext)) {
    +			LOG.warn("overriding previous security context");
    +		}
    +		UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
    +		installedContext = new HadoopSecurityContext(loginUser);
    +	}
     
    -			boolean delegationToken = false;
    -			final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
    -			Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
    -			for (Token<? extends TokenIdentifier> token : usrTok) {
    -				final Text id = new Text(token.getIdentifier());
    -				LOG.debug("Found user token " + id + " with " + token);
    -				if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
    -					delegationToken = true;
    +	static void uninstall() {
    +		if(installedModules != null) {
    +			for (SecurityModule module : Lists.reverse(installedModules)) {
    +				try {
    +					module.uninstall();
     				}
    -			}
    -
    -			if (!loginUser.hasKerberosCredentials()) {
    -				//throw an error in non-yarn deployment if kerberos cache is not available
    -				if (!delegationToken) {
    -					LOG.error("Hadoop Security is enabled but current login user does not have Kerberos
Credentials");
    -					throw new RuntimeException("Hadoop Security is enabled but current login user does
not have Kerberos Credentials");
    +				catch(UnsupportedOperationException e) {
     				}
     			}
    -
    -			if (!(installedContext instanceof NoOpSecurityContext)) {
    -				LOG.warn("overriding previous security context");
    -			}
    -
    -			installedContext = new HadoopSecurityContext(loginUser);
    +			installedModules = null;
     		}
    -	}
     
    -	static void clearContext() {
     		installedContext = new NoOpSecurityContext();
     	}
     
    -	/*
    -	 * This method configures some of the system properties that are require for ZK and
Kafka SASL authentication
    -	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
    -	 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
    -	 * In this method, setting java.security.auth.login.config configuration is configured
only to support ZK and
    -	 * Kafka current code behavior.
    +	/**
    +	 * The global security configuration.
    +	 *
    +	 * See {@link SecurityOptions} for corresponding configuration options.
     	 */
    -	private static void populateSystemSecurityProperties(Configuration configuration) {
    -		Preconditions.checkNotNull(configuration, "The supplied configuration was null");
    +	public static class SecurityConfiguration {
     
    -		boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
    +		private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES
= Collections.unmodifiableList(
    +			new ArrayList<Class<? extends SecurityModule>>() {{
    --- End diff --
    
    I think you can use `Arrays.asList(...)` here, instead of creating an anonymous subclass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message