flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5229: [FLINK-8343] [flip6] Remove Yarn specific commands...
Date Mon, 08 Jan 2018 15:58:06 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5229#discussion_r160166040
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
---
    @@ -534,215 +557,235 @@ private Configuration applyYarnProperties(Configuration configuration)
throws Fl
     		return effectiveConfiguration;
     	}
     
    -	public int run(
    -			String[] args,
    -			Configuration configuration,
    -			String configurationDirectory) {
    +	public int run(String[] args) throws CliArgsException, FlinkException {
     		//
     		//	Command Line Options
     		//
    -		Options options = new Options();
    -		addGeneralOptions(options);
    -		addRunOptions(options);
    +		final CommandLine cmd = parseCommandLineOptions(args, true);
     
    -		CommandLineParser parser = new PosixParser();
    -		CommandLine cmd;
    -		try {
    -			cmd = parser.parse(options, args);
    -		} catch (Exception e) {
    -			System.out.println(e.getMessage());
    -			printUsage();
    -			return 1;
    -		}
    +		final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd);
     
    -		// Query cluster for metrics
    -		if (cmd.hasOption(query.getOpt())) {
    -			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
    -				configuration,
    -				configurationDirectory,
    -				cmd.hasOption(flip6.getOpt()));
    -			String description;
    -			try {
    -				description = yarnDescriptor.getClusterDescription();
    -			} catch (Exception e) {
    -				System.err.println("Error while querying the YARN cluster for available resources:
" + e.getMessage());
    -				e.printStackTrace(System.err);
    -				return 1;
    -			}
    -			System.out.println(description);
    -			return 0;
    -		} else if (cmd.hasOption(applicationId.getOpt())) {
    +		try {
    +			// Query cluster for metrics
    +			if (cmd.hasOption(query.getOpt())) {
    +				final String description = yarnClusterDescriptor.getClusterDescription();
    +				System.out.println(description);
    +				return 0;
    +			} else {
    +				final ClusterClient clusterClient;
    +				final ApplicationId yarnApplicationId;
     
    -			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
    -				configuration,
    -				configurationDirectory,
    -				cmd.hasOption(flip6.getOpt()));
    +				if (cmd.hasOption(applicationId.getOpt())) {
    +					yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
     
    -			//configure ZK namespace depending on the value passed
    -			String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
    -									cmd.getOptionValue(zookeeperNamespace.getOpt())
    -									: yarnDescriptor.getFlinkConfiguration()
    -									.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
    -			LOG.info("Going to use the ZK namespace: {}", zkNamespace);
    -			yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
    +					clusterClient = yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
    +				} else {
    +					final ClusterSpecification clusterSpecification = getClusterSpecification(cmd);
     
    -			try {
    -				yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
    -			} catch (Exception e) {
    -				throw new RuntimeException("Could not retrieve existing Yarn application", e);
    -			}
    +					clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
     
    -			if (detachedMode) {
    -				LOG.info("The Flink YARN client has been started in detached mode. In order to stop
" +
    -					"Flink on YARN, use the following command or a YARN web interface to stop it:\n"
+
    -					"yarn application -kill " + applicationId.getOpt());
    -				yarnCluster.disconnect();
    -			} else {
    -				ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    -
    -				try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
    -						yarnDescriptor.getYarnClient(),
    -						yarnCluster.getApplicationId(),
    -						new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
    -					runInteractiveCli(
    -						yarnCluster,
    -						yarnApplicationStatusMonitor,
    -						true);
    -				} catch (Exception e) {
    -					LOG.info("Could not properly close the Yarn application status monitor.", e);
    -				} finally {
    -					// shut down the scheduled executor service
    -					ExecutorUtils.gracefulShutdown(
    -						1000L,
    -						TimeUnit.MILLISECONDS,
    -						scheduledExecutorService);
    -				}
    -			}
    -		} else {
    +					//------------------ ClusterClient deployed, handle connection details
    +					yarnApplicationId = ConverterUtils.toApplicationId(clusterClient.getClusterIdentifier());
     
    -			try (AbstractYarnClusterDescriptor yarnDescriptor = createClusterDescriptor(cmd)){
    -				final ClusterSpecification clusterSpecification;
    +					String jobManagerAddress =
    +						clusterClient.getJobManagerAddress().getAddress().getHostName() +
    +							':' + clusterClient.getJobManagerAddress().getPort();
     
    -				try {
    -					clusterSpecification = getClusterSpecification(cmd);
    -				} catch (FlinkException e) {
    -					System.err.println("Error while creating the cluster specification: " + e.getMessage());
    -					e.printStackTrace();
    -					return 1;
    -				}
    +					System.out.println("Flink JobManager is now running on " + jobManagerAddress);
    +					System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
     
    -				try {
    -					yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification);
    -				} catch (Exception e) {
    -					System.err.println("Error while deploying YARN cluster: " + e.getMessage());
    -					e.printStackTrace(System.err);
    -					return 1;
    -				}
    -				//------------------ ClusterClient deployed, handle connection details
    -				String jobManagerAddress =
    -					yarnCluster.getJobManagerAddress().getAddress().getHostName() +
    -						":" + yarnCluster.getJobManagerAddress().getPort();
    -
    -				System.out.println("Flink JobManager is now running on " + jobManagerAddress);
    -				System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
    -
    -				// file that we write into the conf/ dir containing the jobManager address and the
dop.
    -				File yarnPropertiesFile = getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
    -
    -				Properties yarnProps = new Properties();
    -				yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString());
    -				if (clusterSpecification.getSlotsPerTaskManager() != -1) {
    -					String parallelism =
    -						Integer.toString(clusterSpecification.getSlotsPerTaskManager() * clusterSpecification.getNumberTaskManagers());
    -					yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
    +					writeYarnPropertiesFile(
    +						yarnApplicationId,
    +						clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(),
    +						yarnClusterDescriptor.getDynamicPropertiesEncoded());
     				}
    -				// add dynamic properties
    -				if (yarnDescriptor.getDynamicPropertiesEncoded() != null) {
    -					yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
    -						yarnDescriptor.getDynamicPropertiesEncoded());
    -				}
    -				writeYarnProperties(yarnProps, yarnPropertiesFile);
    -
    -				//------------------ ClusterClient running, let user control it ------------
     
     				if (detachedMode) {
    -					// print info and quit:
     					LOG.info("The Flink YARN client has been started in detached mode. In order to stop
" +
     						"Flink on YARN, use the following command or a YARN web interface to stop it:\n"
+
    -						"yarn application -kill " + yarnCluster.getApplicationId());
    -					yarnCluster.waitForClusterToBeReady();
    -					yarnCluster.disconnect();
    +						"yarn application -kill " + applicationId.getOpt());
     				} else {
    -
     					ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    --- End diff --
    
    I would prefer `Executors.newSingleThreadScheduledExecutor();` because the `coreSize`
will always be `1`.


---

Mime
View raw message