flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate checkpoint triggering into several asynchronous stages
Date Sun, 19 Jan 2020 15:56:15 GMT
ifndef-SleePy commented on a change in pull request #10332: [FLINK-13905][checkpointing] Separate
checkpoint triggering into several asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368303994
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 ##########
 @@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws Exception {
 			failureManager);
 
 		// Periodic
+		final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 = coord.triggerCheckpoint(
+			System.currentTimeMillis(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+			null,
+			true,
+			false);
+		manuallyTriggeredScheduledExecutor.triggerAll();
 		try {
-			coord.triggerCheckpoint(
-				System.currentTimeMillis(),
-				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				null,
-				true,
-				false);
-			manuallyTriggeredScheduledExecutor.triggerAll();
+			onCompletionPromise1.get();
 			fail("The triggerCheckpoint call expected an exception");
-		} catch (CheckpointException e) {
-			assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason());
+		} catch (ExecutionException e) {
+			final Optional<CheckpointException> checkpointExceptionOptional =
+				ExceptionUtils.findThrowable(e, CheckpointException.class);
+			assertTrue(checkpointExceptionOptional.isPresent());
+			assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+				checkpointExceptionOptional.get().getCheckpointFailureReason());
 		}
 
 		// Not periodic
+		final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 = coord.triggerCheckpoint(
+			System.currentTimeMillis(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+			null,
+			false,
+			false);
+		manuallyTriggeredScheduledExecutor.triggerAll();
+		assertFalse(onCompletionPromise2.isCompletedExceptionally());
+	}
+
+	@Test
+	public void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			true,
+			false,
+			0);
 
 Review comment:
   Yes, there are too many codes copied everywhere. I'll introduce some utils for these initializations.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message