flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9047) SlotPool can fail to release slots
Date Thu, 22 Mar 2018 12:39:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409456#comment-16409456
] 

ASF GitHub Bot commented on FLINK-9047:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5739#discussion_r176401066
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
---
    @@ -634,6 +653,99 @@ public void testCheckIdleSlot() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests that idle slots which cannot be released are only recycled if the owning {@link
TaskExecutor}
    +	 * is still registered at the {@link SlotPool}. See FLINK-9047.
    +	 */
    +	@Test
    +	public void testReleasingIdleSlotFailed() throws Exception {
    +		final ManualClock clock = new ManualClock();
    +		final SlotPool slotPool = new SlotPool(
    +			rpcService,
    +			jobId,
    +			clock,
    +			TestingUtils.infiniteTime(),
    +			timeout);
    +
    +		try {
    +			final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
    +
    +			final AllocationID expiredAllocationId = new AllocationID();
    +			final SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.UNKNOWN);
    +
    +			final ArrayDeque<CompletableFuture<Acknowledge>> responseQueue = new ArrayDeque<>(2);
    +			taskManagerGateway.setFreeSlotFunction((AllocationID allocationId, Throwable cause)
-> {
    +				if (responseQueue.isEmpty()) {
    +					return CompletableFuture.completedFuture(Acknowledge.get());
    +				} else {
    +					return responseQueue.pop();
    +				}
    +			});
    +
    +			responseQueue.add(FutureUtils.completedExceptionally(new FlinkException("Test failure")));
    +
    +			final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>();
    +			responseQueue.add(responseFuture);
    +
    +			assertThat(
    +				slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(),
    +				Matchers.is(Acknowledge.get()));
    +
    +			assertThat(
    +				slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotToExpire).get(),
    +				Matchers.is(true));
    +
    +			clock.advanceTime(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +
    +			slotPool.triggerCheckIdleSlot();
    +
    +			CompletableFuture<LogicalSlot> allocatedSlotFuture = slotPoolGateway.allocateSlot(
    +				new SlotRequestId(),
    +				new DummyScheduledUnit(),
    +				SlotProfile.noRequirements(),
    +				true,
    +				timeout);
    +
    +			// wait until the slot has been fulfilled with the previously idling slot
    +			final LogicalSlot logicalSlot = allocatedSlotFuture.get();
    +			assertThat(logicalSlot.getAllocationId(), Matchers.is(expiredAllocationId));
    +
    +			// return the slot
    +			slotPool.getSlotOwner().returnAllocatedSlot(logicalSlot).get();
    +
    +			// advance the time so that the returned slot is now idling
    +			clock.advanceTime(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +
    +			slotPool.triggerCheckIdleSlot();
    +
    +			// request a new slot after the idling slot has been released
    +			allocatedSlotFuture = slotPoolGateway.allocateSlot(
    +				new SlotRequestId(),
    +				new DummyScheduledUnit(),
    +				SlotProfile.noRequirements(),
    +				true,
    +				timeout);
    +
    +			// release the TaskExecutor before we get a response from the slot releasing
    +			slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()).get();
    +
    +			// let the slot releasing fail --> since there the owning TaskExecutor is no longer
registered
    +			// the slot should be discarded
    +			responseFuture.completeExceptionally(new FlinkException("Second test exception"));
    +
    +			try {
    +				// since the slot msut have been discarded, we cannot fulfill the slot request
    --- End diff --
    
    nit: *msut*


> SlotPool can fail to release slots
> ----------------------------------
>
>                 Key: FLINK-9047
>                 URL: https://issues.apache.org/jira/browse/FLINK-9047
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{SlotPool}} releases idling slots. If the release operation fails (e.g. timeout),
then it simply continues using the slot. This is problematic if the owning {{TaskExecutor}}
failed before and was unregistered in the meantime from the {{SlotPool}}. As a result, the
{{SlotPool}} will reuse the slot and whenever it tries to return because it is idling it will
fail again. This, effectively, renders the scheduling of a job impossible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message