flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8956) Port RescalingITCase to flip6
Date Thu, 22 Mar 2018 16:02:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409766#comment-16409766
] 

ASF GitHub Bot commented on FLINK-8956:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5715#discussion_r176475682
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
---
    @@ -528,54 +454,44 @@ public void testSavepointRescalingPartitionedOperatorState(boolean
scaleOut, Ope
     		}
     
     		try {
    -			jobManager = cluster.getLeaderGateway(deadline.timeLeft());
    -
     			JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
     
    -			jobID = jobGraph.getJobID();
    -
    -			cluster.submitJobDetached(jobGraph);
    +			final JobID jobID = jobGraph.getJobID();
     
    -			Object savepointResponse = null;
    +			client.setDetached(true);
    +			client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
     
     			// wait until the operator is started
     			StateSourceBase.workStartedLatch.await();
     
    -			while (deadline.hasTimeLeft()) {
    -
    -				Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID,
Option.<String>empty()), deadline.timeLeft());
    -				FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS);
    -				savepointResponse = Await.result(savepointPathFuture, waitingTime);
    -
    -				if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) {
    -					break;
    -				}
    -			}
    -
    -			assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess);
    -
    -			final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath();
    -
    -			Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID),
deadline.timeLeft());
    -
    -			Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID),
deadline.timeLeft());
    -
    -			Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
    +			CompletableFuture<String> savepointPathFuture = FutureUtils.retryWithDelay(
    +				() -> {
    +					try {
    +						return client.triggerSavepoint(jobID, null);
    +					} catch (FlinkException e) {
    +						throw new RuntimeException(e);
    --- End diff --
    
    Shouldn't we return a exceptionally completed future here?


> Port RescalingITCase to flip6
> -----------------------------
>
>                 Key: FLINK-8956
>                 URL: https://issues.apache.org/jira/browse/FLINK-8956
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Blocker
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message