flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
Date Mon, 08 Apr 2019 13:25:46 GMT
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain
unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273034056
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 ##########
 @@ -707,6 +709,49 @@ public void testTaskManagerTimeout() throws Exception {
 		}
 	}
 
+	/**
+	 * Tests that idle but not releasable task managers will not be released even if timed out.
+	 */
+	@Test
+	public void testTaskManagerTimeoutWhenNotReleasable() throws Exception {
+		final long tmTimeout = 500L;
+
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceID resourceID = ResourceID.generate();
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID,
taskExecutorGateway);
+
+		when(taskExecutorGateway.canBeReleased()).thenReturn(false);
+
+		final SlotID slotId = new SlotID(resourceID, 0);
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+		final SlotReport slotReport = new SlotReport(slotStatus);
+
+		final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
+
+		try (SlotManager slotManager = new SlotManager(
+			TestingUtils.defaultScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			Time.milliseconds(tmTimeout))) {
+
+			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
+
+			mainThreadExecutor.execute(new Runnable() {
+				@Override
+				public void run() {
+					slotManager.registerTaskManager(taskManagerConnection, slotReport);
+				}
+			});
+
+			verify(resourceManagerActions, timeout(100L * tmTimeout).times(0))
 
 Review comment:
   1. isn't this code waiting for 50 seconds? Otherwise how can you make sure, that a method
"was called exactly 0 times within some time"?
   
   2. I do not like this `verify` that `releaseResource` was called assertion. It would be
better to assert that slot and/or task/partition were not released, otherwise this is prone
for false positive or false negative errors in case of refactoring. For example what if method
`releaseResource` will be overloaded with another parameter?
   
   3. I also do not like the multithreading here. I think it would be better to independently
test for two things:
   a) in this test manually calling `checkTaskManagerTimeouts()` works as expected
   b) add another test to check that `checkTaskManagerTimeouts()` is enqueued somehow, but
to be honest I would be fine in ignoring this...
   (besides the fact that it would be better to not have this code multithreaded at all)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message