flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5229: [FLINK-8343] [flip6] Remove Yarn specific commands...
Date Mon, 08 Jan 2018 15:57:50 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5229#discussion_r160166638
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
---
    @@ -534,215 +557,235 @@ private Configuration applyYarnProperties(Configuration configuration)
throws Fl
     		return effectiveConfiguration;
     	}
     
    -	public int run(
    -			String[] args,
    -			Configuration configuration,
    -			String configurationDirectory) {
    +	public int run(String[] args) throws CliArgsException, FlinkException {
     		//
     		//	Command Line Options
     		//
    -		Options options = new Options();
    -		addGeneralOptions(options);
    -		addRunOptions(options);
    +		final CommandLine cmd = parseCommandLineOptions(args, true);
     
    -		CommandLineParser parser = new PosixParser();
    -		CommandLine cmd;
    -		try {
    -			cmd = parser.parse(options, args);
    -		} catch (Exception e) {
    -			System.out.println(e.getMessage());
    -			printUsage();
    -			return 1;
    -		}
    +		final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd);
     
    -		// Query cluster for metrics
    -		if (cmd.hasOption(query.getOpt())) {
    -			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
    -				configuration,
    -				configurationDirectory,
    -				cmd.hasOption(flip6.getOpt()));
    -			String description;
    -			try {
    -				description = yarnDescriptor.getClusterDescription();
    -			} catch (Exception e) {
    -				System.err.println("Error while querying the YARN cluster for available resources:
" + e.getMessage());
    -				e.printStackTrace(System.err);
    -				return 1;
    -			}
    -			System.out.println(description);
    -			return 0;
    -		} else if (cmd.hasOption(applicationId.getOpt())) {
    +		try {
    +			// Query cluster for metrics
    +			if (cmd.hasOption(query.getOpt())) {
    +				final String description = yarnClusterDescriptor.getClusterDescription();
    +				System.out.println(description);
    +				return 0;
    +			} else {
    +				final ClusterClient clusterClient;
    +				final ApplicationId yarnApplicationId;
     
    -			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
    -				configuration,
    -				configurationDirectory,
    -				cmd.hasOption(flip6.getOpt()));
    +				if (cmd.hasOption(applicationId.getOpt())) {
    +					yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
     
    -			//configure ZK namespace depending on the value passed
    -			String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
    -									cmd.getOptionValue(zookeeperNamespace.getOpt())
    -									: yarnDescriptor.getFlinkConfiguration()
    -									.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
    -			LOG.info("Going to use the ZK namespace: {}", zkNamespace);
    -			yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
    +					clusterClient = yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
    +				} else {
    +					final ClusterSpecification clusterSpecification = getClusterSpecification(cmd);
     
    -			try {
    -				yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
    -			} catch (Exception e) {
    -				throw new RuntimeException("Could not retrieve existing Yarn application", e);
    -			}
    +					clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
     
    -			if (detachedMode) {
    -				LOG.info("The Flink YARN client has been started in detached mode. In order to stop
" +
    -					"Flink on YARN, use the following command or a YARN web interface to stop it:\n"
+
    -					"yarn application -kill " + applicationId.getOpt());
    -				yarnCluster.disconnect();
    -			} else {
    -				ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    -
    -				try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
    -						yarnDescriptor.getYarnClient(),
    -						yarnCluster.getApplicationId(),
    -						new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
    -					runInteractiveCli(
    -						yarnCluster,
    -						yarnApplicationStatusMonitor,
    -						true);
    -				} catch (Exception e) {
    -					LOG.info("Could not properly close the Yarn application status monitor.", e);
    -				} finally {
    -					// shut down the scheduled executor service
    -					ExecutorUtils.gracefulShutdown(
    -						1000L,
    -						TimeUnit.MILLISECONDS,
    -						scheduledExecutorService);
    -				}
    -			}
    -		} else {
    +					//------------------ ClusterClient deployed, handle connection details
    +					yarnApplicationId = ConverterUtils.toApplicationId(clusterClient.getClusterIdentifier());
     
    -			try (AbstractYarnClusterDescriptor yarnDescriptor = createClusterDescriptor(cmd)){
    -				final ClusterSpecification clusterSpecification;
    +					String jobManagerAddress =
    +						clusterClient.getJobManagerAddress().getAddress().getHostName() +
    +							':' + clusterClient.getJobManagerAddress().getPort();
     
    -				try {
    -					clusterSpecification = getClusterSpecification(cmd);
    -				} catch (FlinkException e) {
    -					System.err.println("Error while creating the cluster specification: " + e.getMessage());
    -					e.printStackTrace();
    -					return 1;
    -				}
    +					System.out.println("Flink JobManager is now running on " + jobManagerAddress);
    +					System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
     
    -				try {
    -					yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification);
    -				} catch (Exception e) {
    -					System.err.println("Error while deploying YARN cluster: " + e.getMessage());
    -					e.printStackTrace(System.err);
    -					return 1;
    -				}
    -				//------------------ ClusterClient deployed, handle connection details
    -				String jobManagerAddress =
    -					yarnCluster.getJobManagerAddress().getAddress().getHostName() +
    -						":" + yarnCluster.getJobManagerAddress().getPort();
    -
    -				System.out.println("Flink JobManager is now running on " + jobManagerAddress);
    -				System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
    -
    -				// file that we write into the conf/ dir containing the jobManager address and the
dop.
    -				File yarnPropertiesFile = getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
    -
    -				Properties yarnProps = new Properties();
    -				yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString());
    -				if (clusterSpecification.getSlotsPerTaskManager() != -1) {
    -					String parallelism =
    -						Integer.toString(clusterSpecification.getSlotsPerTaskManager() * clusterSpecification.getNumberTaskManagers());
    -					yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
    +					writeYarnPropertiesFile(
    +						yarnApplicationId,
    +						clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(),
    +						yarnClusterDescriptor.getDynamicPropertiesEncoded());
     				}
    -				// add dynamic properties
    -				if (yarnDescriptor.getDynamicPropertiesEncoded() != null) {
    -					yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
    -						yarnDescriptor.getDynamicPropertiesEncoded());
    -				}
    -				writeYarnProperties(yarnProps, yarnPropertiesFile);
    -
    -				//------------------ ClusterClient running, let user control it ------------
     
     				if (detachedMode) {
    -					// print info and quit:
     					LOG.info("The Flink YARN client has been started in detached mode. In order to stop
" +
     						"Flink on YARN, use the following command or a YARN web interface to stop it:\n"
+
    -						"yarn application -kill " + yarnCluster.getApplicationId());
    -					yarnCluster.waitForClusterToBeReady();
    -					yarnCluster.disconnect();
    +						"yarn application -kill " + applicationId.getOpt());
     				} else {
    -
     					ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
     
    -					try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
    -						yarnDescriptor.getYarnClient(),
    -						yarnCluster.getApplicationId(),
    -						new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
    +					final YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
    +						yarnClusterDescriptor.getYarnClient(),
    +						yarnApplicationId,
    +						new ScheduledExecutorServiceAdapter(scheduledExecutorService));
    +
    +					try {
     						runInteractiveCli(
    -							yarnCluster,
    +							clusterClient,
     							yarnApplicationStatusMonitor,
     							acceptInteractiveInput);
    -					} catch (Exception e) {
    -						LOG.info("Could not properly close the Yarn application status monitor.", e);
     					} finally {
    +						try {
    +							yarnApplicationStatusMonitor.close();
    +						} catch (Exception e) {
    +							LOG.info("Could not properly close the Yarn application status monitor.", e);
    +						}
    +
    +						try {
    +							clusterClient.shutdown();
    +						} catch (Exception e) {
    +							LOG.info("Could not properly shutdown cluster client.", e);
    +						}
    +
    +						try {
    +							yarnClusterDescriptor.terminateCluster(yarnApplicationId);
    +						} catch (FlinkException e) {
    +							LOG.info("Could not properly terminate the Flink cluster.", e);
    +						}
    +
     						// shut down the scheduled executor service
     						ExecutorUtils.gracefulShutdown(
     							1000L,
     							TimeUnit.MILLISECONDS,
     							scheduledExecutorService);
    +
    +						deleteYarnPropertiesFile();
    +
    +						try {
    +							final ApplicationReport applicationReport = yarnClusterDescriptor
    +								.getYarnClient()
    +								.getApplicationReport(yarnApplicationId);
    +
    +							logFinalApplicationReport(applicationReport);
    +						} catch (YarnException | IOException e) {
    +							LOG.info("Could not log the final application report.", e);
    +						}
     					}
     				}
    -			} catch (FlinkException e) {
    -				System.err.println("Error while deploying a Flink cluster: " + e.getMessage());
    -				e.printStackTrace();
    -				return 1;
    +			}
    +		} finally {
    +			try {
    +				yarnClusterDescriptor.close();
    +			} catch (Exception e) {
    +				LOG.info("Could not properly close the yarn cluster descriptor.", e);
     			}
     		}
    +
     		return 0;
     	}
     
    -	/**
    -	 * Utility method for tests.
    -	 */
    -	public void stop() {
    -		if (yarnCluster != null) {
    -			LOG.info("Command line interface is shutting down the yarnCluster");
    +	private void logFinalApplicationReport(ApplicationReport appReport) {
    +		try {
    +			LOG.info("Application " + appReport.getApplicationId() + " finished with state " +
appReport
    +				.getYarnApplicationState() + " and final state " + appReport
    +				.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
    +
    +			if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
    +				== YarnApplicationState.KILLED) {
    +				LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
    +				LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to
retrieve "
    +					+ "the full application log using this command:"
    +					+ System.lineSeparator()
    +					+ "\tyarn logs -applicationId " + appReport.getApplicationId()
    +					+ System.lineSeparator()
    +					+ "(It sometimes takes a few seconds until the logs are aggregated)");
    +			}
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't get final report", e);
    +		}
    +	}
     
    -			try {
    -				yarnCluster.shutdown();
    -			} catch (Throwable t) {
    -				LOG.warn("Could not properly shutdown the yarn cluster.", t);
    +	private void deleteYarnPropertiesFile() {
    +		// try to clean up the old yarn properties file
    +		try {
    +			File propertiesFile = getYarnPropertiesLocation(yarnPropertiesFileLocation);
    +			if (propertiesFile.isFile()) {
    +				if (propertiesFile.delete()) {
    +					LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile());
    +				} else {
    +					LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile());
    +				}
     			}
    +		} catch (Exception e) {
    +			LOG.warn("Exception while deleting the JobManager address file", e);
     		}
     	}
     
    +	private void writeYarnPropertiesFile(
    +			ApplicationId yarnApplicationId,
    +			int parallelism,
    +			@Nullable String dynamicProperties) {
    +		// file that we write into the conf/ dir containing the jobManager address and the
dop.
    +		final File yarnPropertiesFile = getYarnPropertiesLocation(yarnPropertiesFileLocation);
    +
    +		Properties yarnProps = new Properties();
    +		yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnApplicationId.toString());
    +		if (parallelism > 0) {
    +			yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, Integer.toString(parallelism));
    +		}
    +
    +		// add dynamic properties
    +		if (dynamicProperties != null) {
    +			yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicProperties);
    +		}
    +
    +		writeYarnProperties(yarnProps, yarnPropertiesFile);
    +	}
    +
     	private void logAndSysout(String message) {
     		LOG.info(message);
     		System.out.println(message);
     	}
     
    -	public static void main(final String[] args) throws Exception {
    +	public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded)
{
    +		if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() >
0) {
    +			Map<String, String> properties = new HashMap<>();
    +
    +			String[] propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR);
    +			for (String propLine : propertyLines) {
    +				if (propLine == null) {
    +					continue;
    +				}
    +
    +				int firstEquals = propLine.indexOf("=");
    +
    +				if (firstEquals >= 0) {
    +					String key = propLine.substring(0, firstEquals).trim();
    +					String value = propLine.substring(firstEquals + 1, propLine.length()).trim();
    +
    +					if (!key.isEmpty()) {
    +						properties.put(key, value);
    +					}
    +				}
    +			}
    +			return properties;
    +		}
    +		else {
    +			return Collections.emptyMap();
    +		}
    +	}
    +
    +	public static void main(final String[] args) {
     		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
     
     		final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
     
    -		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
    -			flinkConfiguration,
    -			configurationDirectory,
    -			"",
    -			""); // no prefix for the YARN session
    +		int retCode;
     
    -		SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
    +		try {
    +			final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
    +				flinkConfiguration,
    +				configurationDirectory,
    +				"",
    +				""); // no prefix for the YARN session
     
    -		final int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args,
flinkConfiguration, configurationDirectory));
    +			SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
    +
    +			retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
    +		} catch (CliArgsException e) {
    +			retCode = handleCliArgsException(e);
    +		} catch (Exception e) {
    +			retCode = handleError(e);
    +		}
     
     		System.exit(retCode);
     	}
     
     	private static void runInteractiveCli(
    -		YarnClusterClient clusterClient,
    +		ClusterClient clusterClient,
    --- End diff --
    
    nit: indentation


---

Mime
View raw message