flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] tillrohrmann commented on a change in pull request #15882: [FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restarting state
Date Tue, 11 May 2021 07:45:46 GMT

tillrohrmann commented on a change in pull request #15882:
URL: https://github.com/apache/flink/pull/15882#discussion_r629915588



##########
File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond()
throws Ex
         assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath()));
     }
 
+    @Test
+    public void testCancellationOfJobInRestartLoop() throws Exception {

Review comment:
       Let's give a bit more expressive name. Something like `testJobCancellationWhileRestartingSucceeds`
or so.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond()
throws Ex
         assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath()));
     }
 
+    @Test
+    public void testCancellationOfJobInRestartLoop() throws Exception {
+        final long timeInRestartingState = 10000L;
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
+        env.setRestartStrategy(
+                RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
+        env.addSource(new NotifyOnRunningAndFailingSource()).addSink(new DiscardingSink<>());
+
+        JobClient client = env.executeAsync();
+
+        NotifyOnRunningAndFailingSource.runningLatch.await();

Review comment:
       Why do we need this latch here?

##########
File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond()
throws Ex
         assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath()));
     }
 
+    @Test
+    public void testCancellationOfJobInRestartLoop() throws Exception {
+        final long timeInRestartingState = 10000L;
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
+        env.setRestartStrategy(
+                RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
+        env.addSource(new NotifyOnRunningAndFailingSource()).addSink(new DiscardingSink<>());
+
+        JobClient client = env.executeAsync();
+
+        NotifyOnRunningAndFailingSource.runningLatch.await();
+
+        // wait until we are in RESTARTING state
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RESTARTING,
+                Deadline.fromNow(Duration.of(timeInRestartingState, ChronoUnit.MILLIS)),
+                5);
+
+        // now cancel while in RESTARTING state
+        client.cancel().get();
+    }

Review comment:
       Nit: Does this really have to live in the `flink-tests` module? It feels to me that
this should be testable as an integration test in the `flink-runtime` module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message