flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Date Tue, 11 Oct 2016 14:16:21 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2608#discussion_r82778433
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
    @@ -282,29 +279,71 @@ public boolean isShutdown() {
     	//  Handling checkpoints and messages
     	// --------------------------------------------------------------------------------------------
     
    -	public Future<String> triggerSavepoint(long timestamp) throws Exception {
    -		CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint());
    +	/**
    +	 * Triggers a savepoint with the default savepoint directory as a target.
    +	 *
    +	 * @param timestamp The timestamp for the savepoint.
    +	 * @return A future to the completed checkpoint
    +	 * @throws IllegalStateException If no default savepoint directory has been configured
    +	 * @throws Exception Failures during triggering are forwarded
    +	 */
    +	public Future<CompletedCheckpoint> triggerSavepoint(long timestamp) throws Exception
{
    +		return triggerSavepoint(timestamp, null);
    +	}
    +
    +	/**
    +	 * Triggers a savepoint with the given savepoint directory as a target.
    +	 *
    +	 * @param timestamp The timestamp for the savepoint.
    +	 * @param savepointDirectory Target directory for the savepoint.
    +	 * @return A future to the completed checkpoint
    +	 * @throws IllegalStateException If no savepoint directory has been
    +	 *                               specified and no default savepoint directory has been
    +	 *                               configured
    +	 * @throws Exception             Failures during triggering are forwarded
    +	 */
    +	public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String savepointDirectory)
throws Exception {
    +		String targetDirectory;
    +		if (savepointDirectory != null) {
    +			targetDirectory = savepointDirectory;
    +		} else if (this.savepointDirectory != null) {
    +			targetDirectory = this.savepointDirectory;
    +		} else {
    +			throw new IllegalStateException("No savepoint directory configured. " +
    +					"You can either specify a directory when triggering this savepoint or " +
    +					"configure a cluster-wide default via key '" +
    +					ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.");
    +		}
    +
    +		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
    +		CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory);
     
     		if (result.isSuccess()) {
    -			PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint();
    -			return savepoint.getCompletionFuture();
    -		}
    -		else {
    -			return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()));
    +			return result.getPendingCheckpoint().getCompletionFuture();
    +		} else {
    +			CompletableFuture<CompletedCheckpoint> failed = new FlinkCompletableFuture<>();
    --- End diff --
    
    In order to create a completed future you can write `FlinkCompletableFuture.completedExceptionally(Throwable
t)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message