flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] tillrohrmann commented on a change in pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots
Date Tue, 28 May 2019 11:48:49 GMT
tillrohrmann commented on a change in pull request #7227: [FLINK-11059] [runtime] do not add
releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#discussion_r288058885
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 ##########
 @@ -759,37 +759,43 @@ private void checkIdleSlot() {
 		final FlinkException cause = new FlinkException("Releasing idle slot.");
 
 		for (AllocatedSlot expiredSlot : expiredSlots) {
-			final AllocationID allocationID = expiredSlot.getAllocationId();
-			if (availableSlots.tryRemove(allocationID) != null) {
-
-				log.info("Releasing idle slot [{}].", allocationID);
-				final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
-					allocationID,
-					cause,
-					rpcTimeout);
-
-				FutureUtils.whenCompleteAsyncIfNotDone(
-					freeSlotFuture,
-					componentMainThreadExecutor,
-					(Acknowledge ignored, Throwable throwable) -> {
-						if (throwable != null) {
-							if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
-								log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
-										"Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
-									throwable);
-								tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
-							} else {
-								log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
-									"longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
-							}
-						}
-					});
+
+			if (availableSlots.tryRemove(expiredSlot.getAllocationId()) != null) {
+				releaseSlotToTaskManager(expiredSlot, cause);
 			}
 		}
 
 		scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
 	}
 
+	private void releaseSlotToTaskManager(AllocatedSlot expiredSlot, FlinkException cause) {
+		final AllocationID allocationID = expiredSlot.getAllocationId();
+		log.info("Releasing idle slot [{}].", allocationID);
+
+		final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
+				allocationID,
+				cause,
+				rpcTimeout);
+
+		FutureUtils.whenCompleteAsyncIfNotDone(
+				freeSlotFuture,
+				componentMainThreadExecutor,
+				(Acknowledge ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						if (throwable instanceof TimeoutException) {
+							log.debug("Releasing slot [{}] of registered TaskExecutor {} timeout. " +
+											"Trying to release it again.",
+									allocationID, expiredSlot.getTaskManagerId(), throwable);
+							releaseSlotToTaskManager(expiredSlot, cause);
 
 Review comment:
   I think it would be better to not unconditionally repeat until success and instead stop
after `n` retries. We could use `FutureUtils#retryWithDelay` for that purpose which already
contains the required logic. The problem is that this could go on forever in case that the
TM failed terminally.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message