flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Date Mon, 27 Jun 2016 13:58:21 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2083#discussion_r68580451
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
     	}
     
     	/**
    -	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
    -	 * message to the job manager.
    +	 * Asks the JobManager to dispose a savepoint.
    +	 *
    +	 * <p>There are two options for this:
    +	 * <ul>
    +	 * <li>Either the job the savepoint belongs to is still running, in which
    +	 * case the user code class loader of the job is used.</li>
    +	 * <li>Or the job terminated, in which case the user JARs have to be
    +	 * uploaded before disposing the savepoint.</li>
    +	 * </ul>
     	 */
    -	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
    -		try {
    -			ActorGateway jobManager = getJobManagerGateway(options);
    -			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
    -			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath),
clientTimeout);
    +	private int disposeSavepoint(SavepointOptions options) throws Throwable {
    +		String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint
path");
     
    -			Object result;
    -			try {
    -				logAndSysout("Waiting for response...");
    -				result = Await.result(response, clientTimeout);
    +		JobID jobId = options.getJobId();
    +		String jarFile = options.getJarFilePath();
    +
    +		if (jobId != null && jarFile != null) {
    +			throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR.");
    +		}
    +
    +		ActorGateway jobManager = getJobManagerGateway(options);
    +
    +		final Future<Object> response;
    +		if (jobId != null) {
    +			// Dispose with class loader of running job
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + "
.");
    +
    +			response = jobManager.ask(
    +					new JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
    +					clientTimeout);
    +		} else if (jarFile != null) {
    +			logAndSysout("Disposing savepoint at '" + savepointPath + "'.");
    +
    +			// Dispose with uploaded user code loader
    +			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(),
config);
    +			PackagedProgram program = new PackagedProgram(
    --- End diff --
    
    I think we have to call `program.deleteExtractedLibraries()` at the end so that we clean
up possibly extracted libraries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message