flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...
Date Thu, 02 Nov 2017 17:25:24 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4937#discussion_r148602544
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
---
    @@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() throws Exception
{
     			fail("wrong exception: " + e);
     		}
     	}
    +
    +	@Test
    +	public void testCancelSlotAllocation() throws Exception {
    +		final JobID jid = new JobID();
    +
    +		final SlotPool pool = new SlotPool(
    +				rpcService, jid,
    +				SystemClock.getInstance(),
    +				Time.days(1), Time.days(1),
    +				Time.seconds(3) // this is the timeout for the request tested here
    +		);
    +		pool.start(JobMasterId.generate(), "foobar");
    +		SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
    +
    +		// 1. test the pending request is in waitingResourceManagerRequests
    +		AllocationID allocationID = new AllocationID();
    +		CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID,
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
    +
    +		try {
    +			future.get(2, TimeUnit.SECONDS);
    +			fail("We expected a AskTimeoutException.");
    +		}
    +		catch (ExecutionException e) {
    +			assertEquals(AskTimeoutException.class, e.getCause().getClass());
    +		}
    +		catch (Exception e) {
    +			fail("wrong exception: " + e);
    +		}
    +
    +		assertEquals(1, pool.getNumOfWaitingForResourceRequests());
    +
    +		pool.cancelSlotAllocation(allocationID);
    +		assertEquals(0, pool.getNumOfWaitingForResourceRequests());
    +
    +		// 2. test the pending request is in pendingRequests
    +		ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock();
    +		pool.connectToResourceManager(resourceManagerGateway);
    +
    +		AllocationID allocationID2 = new AllocationID();
    +		future = slotPoolGateway.allocateSlot(allocationID2, DEFAULT_TESTING_PROFILE, null,
Time.seconds(1));
    +
    +		try {
    +			future.get(2, TimeUnit.SECONDS);
    +			fail("We expected a AskTimeoutException.");
    +		}
    +		catch (ExecutionException e) {
    +			assertEquals(AskTimeoutException.class, e.getCause().getClass());
    +		}
    +		catch (Exception e) {
    +			fail("wrong exception: " + e);
    +		}
    +
    +		assertEquals(1, pool.getNumOfPendingRequests());
    +
    +		pool.cancelSlotAllocation(allocationID2);
    +		assertEquals(0, pool.getNumOfPendingRequests());
    --- End diff --
    
    It's not guaranteed that `pool.getNumofPendingRequests()` is executed after `pool.cancelSlotAllocation`
has been executed.


---

Mime
View raw message