flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
Date Mon, 08 Apr 2019 13:25:46 GMT
pnowojski commented on a change in pull request #7938: [FLINK-10941] Keep slots which contain
unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#discussion_r273042838
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##########
 @@ -1003,26 +1008,37 @@ private void checkTaskManagerTimeouts() {
 		if (!taskManagerRegistrations.isEmpty()) {
 			long currentTime = System.currentTimeMillis();
 
-			ArrayList<InstanceID> timedOutTaskManagerIds = new ArrayList<>(taskManagerRegistrations.size());
+			ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size());
 
 			// first retrieve the timed out TaskManagers
 			for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values())
{
 				if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds())
{
 					// we collect the instance ids first in order to avoid concurrent modifications by the
 					// ResourceActions.releaseResource call
-					timedOutTaskManagerIds.add(taskManagerRegistration.getInstanceId());
+					timedOutTaskManagers.add(taskManagerRegistration);
 				}
 			}
 
 			// second we trigger the release resource callback which can decide upon the resource
release
-			final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
-			for (InstanceID timedOutTaskManagerId : timedOutTaskManagerIds) {
-				LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId);
-				resourceActions.releaseResource(timedOutTaskManagerId, cause);
+			for (TaskManagerRegistration taskManagerRegistration : timedOutTaskManagers) {
+				InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
+				if (waitResultConsumedToRelease) {
+					// checking whether TaskManagers can be safely removed
+					taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased()
+						.thenRunAsync(() -> releaseTaskExecutor(timedOutTaskManagerId), mainThreadExecutor);
 
 Review comment:
   ? regardless if we the gateway can be released or not we always want to release task executor?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message