flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
Date Wed, 10 Apr 2019 12:48:30 GMT
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain
unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273939589
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 ##########
 @@ -707,6 +706,53 @@ public void testTaskManagerTimeout() throws Exception {
 		}
 	}
 
+	/**
+	 * Tests that idle but not releasable task managers will not be released even if timed out
before it can be.
+	 */
+	@Test
+	public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception {
+		final long tmTimeout = 10L;
+
+		final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
+			.build();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceID resourceID = ResourceID.generate();
+
+		final AtomicBoolean canBeReleased = new AtomicBoolean(false);
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setCanBeReleasedSupplier(canBeReleased::get)
+			.createTestingTaskExecutorGateway();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID,
taskExecutorGateway);
+
+		final SlotID slotId = new SlotID(resourceID, 0);
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+		final SlotReport slotReport = new SlotReport(slotStatus);
+
+		final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
+
+		try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
+			.setTaskManagerTimeout(Time.milliseconds(tmTimeout))
+			.build()) {
+
+			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
+
+			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection,
slotReport));
+
+			// now it can not be released yet
+			canBeReleased.set(false);
+			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
 
 Review comment:
   shouldn't we wait for this to complete, because as it is now the assertion below might
be a no-op (depending on the race condition)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message