flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] tweise closed pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
Date Thu, 20 Dec 2018 22:45:19 GMT
tweise closed pull request #7249: [FLINK-11048] Ability to programmatically execute streaming
pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 0af6d937294..4fcb5556223 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,6 +30,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
 import org.slf4j.Logger;
@@ -65,6 +68,9 @@
 	/** The classpaths that need to be attached to each job. */
 	private final List<URL> globalClasspaths;
 
+	/** The savepoint restore settings for job execution. */
+	private final SavepointRestoreSettings savepointRestoreSettings;
+
 	/**
 	 * Creates a new RemoteStreamEnvironment that points to the master
 	 * (JobManager) described by the given host name and port.
@@ -133,6 +139,36 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig
 	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
 	 */
 	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration,
String[] jarFiles, URL[] globalClasspaths) {
+		this(host, port, clientConfiguration, jarFiles, null, null);
+	}
+
+	/**
+	 * Creates a new RemoteStreamEnvironment that points to the master
+	 * (JobManager) described by the given host name and port.
+	 *
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param clientConfiguration
+	 *            The configuration used to parametrize the client that connects to the
+	 *            remote cluster.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 * @param globalClasspaths
+	 *            The paths of directories and JAR files that are added to each user code
+	 *            classloader on all nodes in the cluster. Note that the paths must specify
a
+	 *            protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a
NFS share).
+	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
+	 * @param savepointRestoreSettings
+	 *            Optional savepoint restore settings for job execution.
+	 */
+	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration,
String[] jarFiles, URL[] globalClasspaths, SavepointRestoreSettings savepointRestoreSettings)
{
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The RemoteEnvironment cannot be used when submitting a program through a client, "
+
@@ -167,35 +203,62 @@ public RemoteStreamEnvironment(String host, int port, Configuration
clientConfig
 		else {
 			this.globalClasspaths = Arrays.asList(globalClasspaths);
 		}
+		this.savepointRestoreSettings = savepointRestoreSettings;
 	}
 
-	@Override
-	public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
-		StreamGraph streamGraph = getStreamGraph();
+	/**
+	 * Executes the job remotely.
+	 *
+	 * <p>This method can be used independent of the {@link StreamExecutionEnvironment}
type.
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 */
+	@PublicEvolving
+	public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
+		List<URL> jarFiles,
+		String host,
+		int port,
+		Configuration clientConfiguration,
+		List<URL> globalClasspaths,
+		String jobName,
+		SavepointRestoreSettings savepointRestoreSettings
+	) throws ProgramInvocationException {
+		StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
 		streamGraph.setJobName(jobName);
-		transformations.clear();
-		return executeRemotely(streamGraph, jarFiles);
+		return executeRemotely(streamGraph,
+			streamExecutionEnvironment.getClass().getClassLoader(),
+			streamExecutionEnvironment.getConfig(),
+			jarFiles,
+			host,
+			port,
+			clientConfiguration,
+			globalClasspaths,
+			savepointRestoreSettings);
 	}
 
 	/**
-	 * Executes the remote job.
+	 * Execute the given stream graph remotely.
 	 *
-	 * @param streamGraph
-	 *            Stream Graph to execute
-	 * @param jarFiles
-	 * 			  List of jar file URLs to ship to the cluster
-	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * <p>Method for internal use since it exposes stream graph and other implementation
details that are subject to change.
+	 * @throws ProgramInvocationException
 	 */
-	protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles)
throws ProgramInvocationException {
+	private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
+		ClassLoader envClassLoader,
+		ExecutionConfig executionConfig,
+		List<URL> jarFiles,
+		String host,
+		int port,
+		Configuration clientConfiguration,
+		List<URL> globalClasspaths,
+		SavepointRestoreSettings savepointRestoreSettings
+	) throws ProgramInvocationException {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
-			getClass().getClassLoader());
+		ClassLoader userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
envClassLoader);
 
 		Configuration configuration = new Configuration();
-		configuration.addAll(this.clientConfiguration);
+		configuration.addAll(clientConfiguration);
 
 		configuration.setString(JobManagerOptions.ADDRESS, host);
 		configuration.setInteger(JobManagerOptions.PORT, port);
@@ -211,10 +274,15 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph,
List<URL>
 				streamGraph.getJobGraph().getJobID(), e);
 		}
 
-		client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+		client.setPrintStatusDuringExecution(executionConfig.isSysoutLoggingEnabled());
+
+		if (savepointRestoreSettings == null) {
+			savepointRestoreSettings = SavepointRestoreSettings.none();
+		}
 
 		try {
-			return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();
+			return client.run(streamGraph, jarFiles, globalClasspaths, userCodeClassLoader, savepointRestoreSettings)
+				.getJobExecutionResult();
 		}
 		catch (ProgramInvocationException e) {
 			throw e;
@@ -233,6 +301,37 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph,
List<URL>
 		}
 	}
 
+	@Override
+	public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
+		StreamGraph streamGraph = getStreamGraph();
+		streamGraph.setJobName(jobName);
+		transformations.clear();
+		return executeRemotely(streamGraph, jarFiles);
+	}
+
+	/**
+	 * Executes the remote job.
+	 *
+	 * <p>Note: This method exposes stream graph internal in the public API, but cannot
be removed for backward compatibility.
+	 * @param streamGraph
+	 *            Stream Graph to execute
+	 * @param jarFiles
+	 * 			  List of jar file URLs to ship to the cluster
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 */
+	@Deprecated
+	protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles)
throws ProgramInvocationException {
+		return executeRemotely(streamGraph,
+			this.getClass().getClassLoader(),
+			getConfig(),
+			jarFiles,
+			host,
+			port,
+			clientConfiguration,
+			globalClasspaths,
+			savepointRestoreSettings);
+	}
+
 	@Override
 	public String toString() {
 		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = "
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
index d9a257c9cb5..39bf6dbd1a4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
@@ -18,53 +18,78 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
-import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Iterator;
+
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link RemoteStreamEnvironment}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({RemoteStreamEnvironment.class})
 public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
 
-	@ClassRule
-	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
-		new MiniClusterResourceConfiguration.Builder()
-			.setNumberTaskManagers(1)
-			.setNumberSlotsPerTaskManager(1)
-			.build());
-
 	/**
 	 * Verifies that the port passed to the RemoteStreamEnvironment is used for connecting to
the cluster.
 	 */
 	@Test
 	public void testPortForwarding() throws Exception {
-		final Configuration clientConfiguration = new Configuration();
-		clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
 
-		final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+		String host = "fakeHost";
+		int port = 99;
+		JobExecutionResult expectedResult = new JobExecutionResult(null, 0, null);
+
+		RestClusterClient mockedClient = Mockito.mock(RestClusterClient.class);
+		when(mockedClient.run(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
+			.thenReturn(expectedResult);
+
+		PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenAnswer((invocation)
-> {
+				Object[] args = invocation.getArguments();
+				Configuration config = (Configuration) args[0];
+
+				Assert.assertEquals(host, config.getString(RestOptions.ADDRESS));
+				Assert.assertEquals(port, config.getInteger(RestOptions.PORT));
+				return mockedClient;
+			}
+		);
+
+		final Configuration clientConfiguration = new Configuration();
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-			miniCluster.getRestAddress().getHost(),
-			miniCluster.getRestAddress().getPort(),
-			clientConfiguration);
+			host, port, clientConfiguration);
+		env.fromElements(1).map(x -> x * 2);
+		JobExecutionResult actualResult = env.execute("fakeJobName");
+		Assert.assertEquals(expectedResult, actualResult);
+	}
+
+	@Test
+	public void testRemoteExecutionWithSavepoint() throws Exception {
+		SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath("fakePath");
+		RemoteStreamEnvironment env = new RemoteStreamEnvironment("fakeHost", 1,
+			null, new String[]{}, null, restoreSettings);
+		env.fromElements(1).map(x -> x * 2);
+
+		RestClusterClient mockedClient = Mockito.mock(RestClusterClient.class);
+		JobExecutionResult expectedResult = new JobExecutionResult(null, 0, null);
 
-		final DataStream<Integer> resultStream = env.fromElements(1)
-			.map(x -> x * 2);
+		PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenReturn(mockedClient);
+		when(mockedClient.run(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(restoreSettings)))
+			.thenReturn(expectedResult);
 
-		final Iterator<Integer> result = DataStreamUtils.collect(resultStream);
-		Assert.assertTrue(result.hasNext());
-		Assert.assertEquals(2, result.next().intValue());
-		Assert.assertFalse(result.hasNext());
+		JobExecutionResult actualResult = env.execute("fakeJobName");
+		Assert.assertEquals(expectedResult, actualResult);
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message