flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
Date Mon, 08 Jan 2018 10:28:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316043#comment-16316043
] 

ASF GitHub Bot commented on FLINK-8339:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5225#discussion_r160110442
  
    --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
---
    @@ -0,0 +1,293 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import org.apache.flink.client.cli.CliFrontendParser;
    +import org.apache.flink.client.deployment.ClusterSpecification;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.util.TestLogger;
    +import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
    +import org.apache.flink.yarn.configuration.YarnConfigOptions;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.Options;
    +import org.apache.hadoop.yarn.api.records.ApplicationId;
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.StandardOpenOption;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Tests for the {@link FlinkYarnSessionCli}.
    + */
    +public class FlinkYarnSessionCliTest extends TestLogger {
    +
    +	private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(),
42);
    +
    +	private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = ApplicationId.newInstance(System.currentTimeMillis(),
43);
    +
    +	private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
    +	private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
    +
    +	private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID;
    +
    +	private static final String invalidPropertiesFile = "jasfobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS
+ ":asf" + TEST_YARN_JOB_MANAGER_PORT;
    +
    +	@Rule
    +	public TemporaryFolder tmp = new TemporaryFolder();
    +
    +	@Test
    +	public void testDynamicProperties() throws Exception {
    +
    +		FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
    +			"",
    +			"",
    +			false);
    +		Options options = new Options();
    +		cli.addGeneralOptions(options);
    +		cli.addRunOptions(options);
    +
    +		CommandLineParser parser = new DefaultParser();
    +		CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n",
"15",
    +				"-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"});
    +
    +		AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(
    +			new Configuration(),
    +			tmp.getRoot().getAbsolutePath(),
    +			null,
    +			cmd);
    +
    +		Assert.assertNotNull(flinkYarnDescriptor);
    +
    +		Map<String, String> dynProperties =
    +			FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded());
    +		assertEquals(2, dynProperties.size());
    +		assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
    +		assertEquals("-DappName=foobar", dynProperties.get("env.java.opts"));
    +	}
    +
    +	@Test
    +	public void testNotEnoughTaskSlots() throws Exception {
    +		String[] params =
    +			new String[] {"-yn", "2", "-ys", "3", "-p", "7"};
    +
    +		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn");
    +
    +		Options options = new Options();
    +		// TODO: Nasty workaround: We should get rid of the YarnCLI and run options coupling
    +		options.addOption(CliFrontendParser.PARALLELISM_OPTION);
    +		yarnCLI.addGeneralOptions(options);
    +		yarnCLI.addRunOptions(options);
    +
    +		final CommandLine commandLine = CliFrontendParser.parse(options, params, true);
    +
    +		ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new
Configuration(), commandLine);
    +
    +		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be
increased.
    +		assertEquals(4, clusterSpecification.getSlotsPerTaskManager());
    +		assertEquals(2, clusterSpecification.getNumberTaskManagers());
    +	}
    +
    +	@Test
    +	public void testCorrectSettingOfMaxSlots() throws Exception {
    +		String[] params =
    +			new String[] {"-yn", "2", "-ys", "3"};
    +
    +		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn");
    +
    +		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
    +
    +		final Configuration configuration = new Configuration();
    +
    +		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
    +			configuration,
    +			tmp.getRoot().getAbsolutePath(),
    +			"",
    +			commandLine);
    +
    +		final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(
    +			configuration,
    +			commandLine);
    +
    +		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be
increased.
    +		assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
    +		assertEquals(2, clusterSpecification.getNumberTaskManagers());
    +	}
    +
    +	@Test
    +	public void testZookeeperNamespaceProperty() throws Exception {
    +		String zkNamespaceCliInput = "flink_test_namespace";
    +
    +		String[] params = new String[] {"-yn", "2", "-yz", zkNamespaceCliInput};
    +
    +		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn");
    +
    +		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
    +
    +		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
    +			new Configuration(),
    +			tmp.getRoot().getAbsolutePath(),
    +			"",
    +			commandLine);
    +
    +		assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
    +	}
    +
    +	/**
    +	 * Test that the CliFrontend is able to pick up the .yarn-properties file from a specified
location.
    +	 */
    +	@Test
    +	public void testResumeFromYarnPropertiesFile() throws Exception {
    +
    +		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
    +
    +		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn");
    +
    +		final Configuration configuration = new Configuration();
    +		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
    +
    +		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[]
{}, true);
    +
    +		final String clusterId = flinkYarnSessionCli.getClusterId(
    +			configuration,
    +			commandLine);
    +
    +		assertEquals(TEST_YARN_APPLICATION_ID.toString(), clusterId);
    +	}
    +
    +	@Test(expected = IllegalConfigurationException.class)
    +	public void testInvalidYarnPropertiesFile() throws Exception {
    +
    +		File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
    +
    +		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn");
    +
    +		final Configuration configuration = new Configuration();
    +		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
    +
    +		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[]
{}, true);
    +
    +		flinkYarnSessionCli.getClusterId(
    +			configuration,
    +			commandLine);
    +
    +		fail("We should have failed reading the yarn properties file when retrieving the cluster
id.");
    --- End diff --
    
    It is wrong to put `fail()` if your test expects an exception to be thrown. For example:

    ```
    @Test(expected = IllegalConfigurationException.class)
    	public void testInvalidYarnPropertiesFile() throws Exception {
    		try {
    			File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
    
    			final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn");
    
    			final Configuration configuration = new Configuration();
    			configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
    
    			final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[]{},
true);
    
    			flinkYarnSessionCli.getClusterId(
    				configuration,
    				commandLine);
    		} catch (final Exception e) {
    
    		}
    
    		fail("We should have failed reading the yarn properties file when retrieving the cluster
id.");
    	}
    ```
    
    fails with 
    ```
    java.lang.Exception: Unexpected exception, expected<org.apache.flink.configuration.IllegalConfigurationException>
but was<java.lang.AssertionError>
    ```
    
    The message *We should have failed reading the yarn properties file when retrieving the
cluster id.* is lost.


> Let CustomCommandLine return a ClusterDescriptor
> ------------------------------------------------
>
>                 Key: FLINK-8339
>                 URL: https://issues.apache.org/jira/browse/FLINK-8339
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} and deploy
a cluster. In order to better separate concerns it would be good if the {{CustomCommandLine}}
would simply return a {{ClusterDescriptor}} which could then be used to retrieve a {{ClusterClient}}
or to deploy a Flink cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message