flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
Date Wed, 10 Jan 2018 13:26:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320223#comment-16320223
] 

ASF GitHub Bot commented on FLINK-8328:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5215#discussion_r160675128
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
---
    @@ -743,6 +690,142 @@ private void logAndSysout(String message) {
     		System.out.println(message);
     	}
     
    +	public static void main(final String[] args) throws Exception {
    +		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the
YARN session
    +
    +		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
    +
    +		final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
    +		SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
    +		int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>()
{
    +			@Override
    +			public Integer call() {
    +				return cli.run(args, flinkConfiguration, configurationDirectory);
    +			}
    +		});
    +		System.exit(retCode);
    +	}
    +
    +	private static void runInteractiveCli(
    +		YarnClusterClient clusterClient,
    +		YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
    +		boolean readConsoleInput) {
    +		try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
    +			boolean continueRepl = true;
    +			int numTaskmanagers = 0;
    +			long unknownStatusSince = System.currentTimeMillis();
    +
    +			while (continueRepl) {
    +
    +				final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow();
    +
    +				switch (applicationStatus) {
    +					case FAILED:
    +					case CANCELED:
    +						System.err.println("The Flink Yarn cluster has failed.");
    +						continueRepl = false;
    +						break;
    +					case UNKNOWN:
    +						if (unknownStatusSince < 0L) {
    +							unknownStatusSince = System.currentTimeMillis();
    +						}
    +
    +						if ((System.currentTimeMillis() - unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS)
{
    +							System.err.println("The Flink Yarn cluster is in an unknown state. Please check
the Yarn cluster.");
    +							continueRepl = false;
    +						} else {
    +							continueRepl = repStep(in, readConsoleInput);
    +						}
    +						break;
    +					case SUCCEEDED:
    +						if (unknownStatusSince > 0L) {
    +							unknownStatusSince = -1L;
    +						}
    +
    +						// ------------------ check if there are updates by the cluster -----------
    +						try {
    +							final GetClusterStatusResponse status = clusterClient.getClusterStatus();
    +
    +							if (status != null && numTaskmanagers != status.numRegisteredTaskManagers())
{
    +								System.err.println("Number of connected TaskManagers changed to " +
    +									status.numRegisteredTaskManagers() + ". " +
    +									"Slots available: " + status.totalNumberOfSlots());
    +								numTaskmanagers = status.numRegisteredTaskManagers();
    +							}
    +						} catch (Exception e) {
    +							LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval
attempt ...", e);
    +						}
    +
    +						printClusterMessages(clusterClient);
    +
    +						continueRepl = repStep(in, readConsoleInput);
    +				}
    +			}
    +		} catch (Exception e) {
    +			LOG.warn("Exception while running the interactive command line interface.", e);
    +		}
    +	}
    +
    +	private static void printClusterMessages(YarnClusterClient clusterClient) {
    +		final List<String> messages = clusterClient.getNewMessages();
    +		if (messages != null && messages.size() > 0) {
    --- End diff --
    
    true. Will change it.


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> ------------------------------------------------------------
>
>                 Key: FLINK-8328
>                 URL: https://issues.apache.org/jira/browse/FLINK-8328
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to pull the Yarn
{{ApplicationStatus}} polling out of the {{YarnClusterClient}}. I propose to introduce a dedicated
{{YarnApplicationStatusMonitor}}. This has also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message