flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] wangyang0918 commented on a change in pull request #15524: [FLINK-21667][runtime] Defer starting ResourceManager to after obtaining leadership.
Date Thu, 20 May 2021 09:37:54 GMT

wangyang0918 commented on a change in pull request #15524:
URL: https://github.com/apache/flink/pull/15524#discussion_r635842471



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
##########
@@ -143,25 +147,16 @@ public void start() throws Exception {
             if (running) {
                 LOG.info("Stopping resource manager service.");
                 running = false;
-                try {
-                    leaderElectionService.stop();
-                } catch (Exception e) {
-                    terminationFuture.completeExceptionally(
-                            new FlinkException("Cannot stop leader election service.", e));
-                }
+                stopLeaderElectionService();
+                stopLeaderResourceManager();
             } else {
                 LOG.debug("Resource manager service is not running.");
             }
 
-            if (resourceManager != null) {
-                resourceManager.closeAsync();
-            } else {
-                // resource manager is never started
-                terminationFuture.complete(null);
-            }
+            FutureUtils.forward(previousResourceManagerTerminationFuture, serviceTerminationFuture);

Review comment:
       Could the exception of `stopLeaderElectionService` is swallowed by forwarding?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
##########
@@ -18,63 +18,101 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.FlinkException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Default implementation of {@link ResourceManagerService}. */
-public class ResourceManagerServiceImpl implements ResourceManagerService {
+public class ResourceManagerServiceImpl implements ResourceManagerService, LeaderContender
{
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerServiceImpl.class);
 
     private final ResourceManagerFactory<?> resourceManagerFactory;
     private final ResourceManagerProcessContext rmProcessContext;
 
+    private final LeaderElectionService leaderElectionService;
+    private final FatalErrorHandler fatalErrorHandler;
+    private final Executor ioExecutor;
+
+    private final Executor handleLeaderEventExecutor;
     private final CompletableFuture<Void> terminationFuture;
 
     private final Object lock = new Object();
 
+    @GuardedBy("lock")
+    private boolean running;
+
     @Nullable
     @GuardedBy("lock")
     private ResourceManager<?> resourceManager;
 
+    @Nullable
+    @GuardedBy("lock")
+    private UUID leaderSessionID;
+
     private ResourceManagerServiceImpl(
             ResourceManagerFactory<?> resourceManagerFactory,
             ResourceManagerProcessContext rmProcessContext) {
         this.resourceManagerFactory = checkNotNull(resourceManagerFactory);
         this.rmProcessContext = checkNotNull(rmProcessContext);
 
+        this.leaderElectionService =
+                rmProcessContext
+                        .getHighAvailabilityServices()
+                        .getResourceManagerLeaderElectionService();
+        this.fatalErrorHandler = rmProcessContext.getFatalErrorHandler();
+        this.ioExecutor = rmProcessContext.getIoExecutor();
+
+        this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();

Review comment:
       The `handleLeaderEventExecutor` need to be shutdown properly.
   
   The reason why we need this dedicated executor is that starting/stopping resourcemanager
takes a little long time. Right?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
+import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.function.TriConsumer;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+/** Implementation of {@link ResourceManagerFactory} for testing purpose. */

Review comment:
       You do not have to change, but maybe the new introduced `TestingResourceManagerFactory`
could subsume `ClusterEntrypointTest#TestingResourceManagerFactory`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@link ResourceManagerServiceImpl}. */
+public class ResourceManagerServiceImplTest extends TestLogger {
+
+    private static final Time TIMEOUT = Time.seconds(10L);
+    private static final Time FAST_TIMEOUT = Time.milliseconds(50L);
+
+    private static final HeartbeatServices heartbeatServices = new TestingHeartbeatServices();
+    private static final ClusterInformation clusterInformation =
+            new ClusterInformation("localhost", 1234);
+    private static final MetricRegistry metricRegistry = TestingMetricRegistry.builder().build();
+
+    private static TestingRpcService rpcService;
+    private static TestingHighAvailabilityServices haService;
+    private static TestingFatalErrorHandler fatalErrorHandler;
+
+    private TestingResourceManagerFactory.Builder rmFactoryBuilder;
+    private TestingLeaderElectionService leaderElectionService;
+    private ResourceManagerServiceImpl resourceManagerService;
+
+    @BeforeClass
+    public static void setupClass() {
+        rpcService = new TestingRpcService();
+        haService = new TestingHighAvailabilityServices();
+        fatalErrorHandler = new TestingFatalErrorHandler();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        fatalErrorHandler.clearError();
+
+        rmFactoryBuilder = new TestingResourceManagerFactory.Builder();
+
+        leaderElectionService = new TestingLeaderElectionService();
+        haService.setResourceManagerLeaderElectionService(leaderElectionService);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (resourceManagerService != null) {
+            resourceManagerService.close();
+        }
+
+        if (leaderElectionService != null) {
+            leaderElectionService.stop();
+        }
+
+        if (fatalErrorHandler.hasExceptionOccurred()) {
+            fatalErrorHandler.rethrowError();
+        }
+    }
+
+    @AfterClass
+    public static void teardownClass() throws Exception {
+        if (rpcService != null) {
+            RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+        }
+    }
+
+    private void createAndStartResourceManager() throws Exception {
+        createResourceManager();
+        resourceManagerService.start();
+    }
+
+    private void createResourceManager() throws Exception {
+        final TestingResourceManagerFactory rmFactory = rmFactoryBuilder.build();
+        resourceManagerService =
+                ResourceManagerServiceImpl.create(
+                        rmFactory,
+                        new Configuration(),
+                        rpcService,
+                        haService,
+                        heartbeatServices,
+                        fatalErrorHandler,
+                        clusterInformation,
+                        null,
+                        metricRegistry,
+                        "localhost",
+                        ForkJoinPool.commonPool());
+    }
+
+    @Test
+    public void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
+        final UUID leaderSessionId = UUID.randomUUID();
+        final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
+
+        createAndStartResourceManager();
+
+        // grant leadership
+        leaderElectionService.isLeader(leaderSessionId);
+
+        // should start new RM and confirm leader session
+        assertThat(startRmFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), is(leaderSessionId));
+        assertThat(
+                leaderElectionService
+                        .getConfirmationFuture()
+                        .get(TIMEOUT.getSize(), TIMEOUT.getUnit())
+                        .getLeaderSessionId(),
+                is(leaderSessionId));
+    }
+
+    @Test
+    public void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
+        final UUID leaderSessionId = UUID.randomUUID();
+        final CompletableFuture<Void> finishRmInitializationFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder.setInitializeConsumer(
+                (ignore) -> blockOnFuture(finishRmInitializationFuture));
+
+        createAndStartResourceManager();
+
+        // grant leadership
+        leaderElectionService.isLeader(leaderSessionId);
+
+        // RM initialization not finished, should not confirm leader session
+        assertNotComplete(leaderElectionService.getConfirmationFuture());
+
+        // finish RM initialization
+        finishRmInitializationFuture.complete(null);
+
+        // should confirm leader session
+        assertThat(
+                leaderElectionService
+                        .getConfirmationFuture()
+                        .get(TIMEOUT.getSize(), TIMEOUT.getUnit())
+                        .getLeaderSessionId(),
+                is(leaderSessionId));
+    }
+
+    @Test
+    public void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
+        final UUID leaderSessionId1 = UUID.randomUUID();
+        final UUID leaderSessionId2 = UUID.randomUUID();
+        final CompletableFuture<UUID> startRmFuture1 = new CompletableFuture<>();
+        final CompletableFuture<UUID> startRmFuture2 = new CompletableFuture<>();
+        final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder
+                .setInitializeConsumer(
+                        uuid -> {
+                            if (!startRmFuture1.isDone()) {
+                                startRmFuture1.complete(uuid);
+                            } else {
+                                startRmFuture2.complete(uuid);
+                            }
+                        })
+                .setTerminateConsumer(terminateRmFuture::complete);
+
+        createAndStartResourceManager();
+
+        // first time grant leadership
+        leaderElectionService.isLeader(leaderSessionId1);
+
+        // make sure RM started, before proceeding the next step
+        assertRmStarted();
+
+        // second time grant leadership
+        leaderElectionService.isLeader(leaderSessionId2);
+
+        // should terminate first RM, start a new RM and confirm leader session
+        assertThat(
+                terminateRmFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), is(leaderSessionId1));
+        assertThat(startRmFuture2.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), is(leaderSessionId2));
+        assertThat(
+                leaderElectionService
+                        .getConfirmationFuture()
+                        .get(TIMEOUT.getSize(), TIMEOUT.getUnit())
+                        .getLeaderSessionId(),
+                is(leaderSessionId2));
+    }
+
+    @Test
+    public void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader()
+            throws Exception {
+        final UUID leaderSessionId1 = UUID.randomUUID();
+        final UUID leaderSessionId2 = UUID.randomUUID();
+        final CompletableFuture<UUID> startRmFuture1 = new CompletableFuture<>();
+        final CompletableFuture<UUID> startRmFuture2 = new CompletableFuture<>();
+        final CompletableFuture<Void> finishRmTerminationFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder
+                .setInitializeConsumer(
+                        uuid -> {
+                            if (!startRmFuture1.isDone()) {
+                                startRmFuture1.complete(uuid);
+                            } else {
+                                startRmFuture2.complete(uuid);
+                            }
+                        })
+                .setTerminateConsumer((ignore) -> blockOnFuture(finishRmTerminationFuture));
+
+        createAndStartResourceManager();
+
+        // first time grant leadership
+        leaderElectionService.isLeader(leaderSessionId1);
+
+        // make sure RM started, before proceeding the next step
+        assertRmStarted();
+
+        // second time grant leadership
+        leaderElectionService.isLeader(leaderSessionId2);
+
+        // first RM termination not finished, should not start new RM
+        assertNotComplete(startRmFuture2);
+
+        // finish first RM termination
+        finishRmTerminationFuture.complete(null);
+
+        // should start new RM and confirm leader session
+        assertThat(startRmFuture2.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), is(leaderSessionId2));
+        assertThat(
+                leaderElectionService
+                        .getConfirmationFuture()
+                        .get(TIMEOUT.getSize(), TIMEOUT.getUnit())
+                        .getLeaderSessionId(),
+                is(leaderSessionId2));
+    }
+
+    @Test
+    public void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
+        final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
+
+        createResourceManager();
+
+        // grant leadership
+        leaderElectionService.isLeader(UUID.randomUUID());
+
+        // service not started, should not start new RM
+        assertNotComplete(startRmFuture);
+        assertNotComplete(leaderElectionService.getConfirmationFuture());
+    }
+
+    @Test
+    public void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
+        final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
+
+        createAndStartResourceManager();
+        resourceManagerService.close();
+
+        // grant leadership
+        leaderElectionService.isLeader(UUID.randomUUID());
+
+        // service stopped, should not start new RM
+        assertNotComplete(startRmFuture);
+        assertNotComplete(leaderElectionService.getConfirmationFuture());
+    }
+
+    @Test
+    public void revokeLeadership_stopExistLeader() throws Exception {
+        final UUID leaderSessionId = UUID.randomUUID();
+        final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete);
+
+        createAndStartResourceManager();
+
+        // grant leadership
+        leaderElectionService.isLeader(leaderSessionId);
+
+        // make sure RM started, before proceeding the next step
+        assertRmStarted();
+
+        // revoke leadership
+        leaderElectionService.notLeader();
+
+        // should terminate RM
+        assertThat(
+                terminateRmFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), is(leaderSessionId));
+    }
+
+    @Test
+    public void leaderRmTerminated_terminateService() throws Exception {
+        final UUID leaderSessionId = UUID.randomUUID();
+        final CompletableFuture<Void> rmTerminationFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder.setGetTerminationFutureFunction((ignore1, ignore2) -> rmTerminationFuture);
+
+        createAndStartResourceManager();
+
+        // grant leadership
+        leaderElectionService.isLeader(leaderSessionId);
+
+        // make sure RM started, before proceeding the next step
+        assertRmStarted();
+
+        // terminate RM
+        rmTerminationFuture.complete(null);
+
+        // should terminate service
+        resourceManagerService.getTerminationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+    }
+
+    @Test
+    public void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
+        final UUID leaderSessionId = UUID.randomUUID();
+        final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> rmTerminationFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder
+                .setTerminateConsumer(terminateRmFuture::complete)
+                .setGetTerminationFutureFunction((ignore1, ignore2) -> rmTerminationFuture);
+
+        createAndStartResourceManager();
+
+        // grant leadership
+        leaderElectionService.isLeader(leaderSessionId);
+
+        // make sure RM started, before proceeding the next step
+        assertRmStarted();
+
+        // revoke leadership
+        leaderElectionService.notLeader();
+        assertThat(
+                terminateRmFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), is(leaderSessionId));
+
+        // terminate RM
+        rmTerminationFuture.complete(null);
+
+        // should not terminate service
+        assertNotComplete(resourceManagerService.getTerminationFuture());
+    }
+
+    @Test
+    public void closeService_stopRmAndLeaderElection() throws Exception {
+        final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
+
+        rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete);
+
+        createAndStartResourceManager();
+
+        // grant leadership
+        leaderElectionService.isLeader(UUID.randomUUID());
+
+        // make sure RM started, before proceeding the next step
+        assertRmStarted();
+
+        // close service
+        resourceManagerService.close();
+
+        // should stop RM and leader election
+        assertTrue(terminateRmFuture.isDone());
+        assertTrue(leaderElectionService.isStopped());

Review comment:
       Maybe we need to check `leaderElectionService.isStopped()` is `false ` before closing
`resourceManagerService`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link ResourceManagerService}. */
+public class ResourceManagerServiceImpl implements ResourceManagerService, LeaderContender
{
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerServiceImpl.class);
+
+    private final ResourceManagerFactory<?> resourceManagerFactory;
+    private final ResourceManagerProcessContext rmProcessContext;
+
+    private final LeaderElectionService leaderElectionService;
+    private final FatalErrorHandler fatalErrorHandler;
+    private final Executor ioExecutor;
+
+    private final Executor handleLeaderEventExecutor;
+    private final CompletableFuture<Void> serviceTerminationFuture;
+
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private boolean running;
+
+    @Nullable
+    @GuardedBy("lock")
+    private ResourceManager<?> leaderResourceManager;
+
+    @Nullable
+    @GuardedBy("lock")
+    private UUID leaderSessionID;
+
+    @GuardedBy("lock")
+    private CompletableFuture<Void> previousResourceManagerTerminationFuture;
+
+    private ResourceManagerServiceImpl(
+            ResourceManagerFactory<?> resourceManagerFactory,
+            ResourceManagerProcessContext rmProcessContext) {
+        this.resourceManagerFactory = checkNotNull(resourceManagerFactory);
+        this.rmProcessContext = checkNotNull(rmProcessContext);
+
+        this.leaderElectionService =
+                rmProcessContext
+                        .getHighAvailabilityServices()
+                        .getResourceManagerLeaderElectionService();
+        this.fatalErrorHandler = rmProcessContext.getFatalErrorHandler();
+        this.ioExecutor = rmProcessContext.getIoExecutor();
+
+        this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();
+        this.serviceTerminationFuture = new CompletableFuture<>();
+
+        this.running = false;
+        this.leaderResourceManager = null;
+        this.leaderSessionID = null;
+        this.previousResourceManagerTerminationFuture = FutureUtils.completedVoidFuture();
+    }
+
+    // ------------------------------------------------------------------------
+    //  ResourceManagerService
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void start() throws Exception {
+        synchronized (lock) {
+            if (running) {
+                LOG.debug("Resource manager service has already started.");
+                return;
+            }
+            running = true;
+        }
+
+        LOG.info("Starting resource manager service.");
+
+        leaderElectionService.start(this);
+    }
+
+    @Override
+    public CompletableFuture<Void> getTerminationFuture() {
+        return serviceTerminationFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> deregisterApplication(
+            final ApplicationStatus applicationStatus, final @Nullable String diagnostics)
{
+        synchronized (lock) {
+            if (running && leaderResourceManager != null) {
+                return leaderResourceManager
+                        .getSelfGateway(ResourceManagerGateway.class)
+                        .deregisterApplication(applicationStatus, diagnostics)
+                        .thenApply(ack -> null);
+            } else {
+                return FutureUtils.completedExceptionally(
+                        new FlinkException(
+                                "Cannot deregister application. Resource manager service
is not available."));
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        synchronized (lock) {
+            if (running) {
+                LOG.info("Stopping resource manager service.");
+                running = false;
+                stopLeaderElectionService();
+                stopLeaderResourceManager();
+            } else {
+                LOG.debug("Resource manager service is not running.");
+            }
+
+            FutureUtils.forward(previousResourceManagerTerminationFuture, serviceTerminationFuture);
+        }
+
+        return serviceTerminationFuture;
+    }
+
+    // ------------------------------------------------------------------------
+    //  LeaderContender
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void grantLeadership(UUID newLeaderSessionID) {
+        handleLeaderEventExecutor.execute(
+                () -> {
+                    synchronized (lock) {
+                        if (!running) {
+                            LOG.info(
+                                    "Resource manager service is not running. Ignore granting
leadership with session ID {}.",
+                                    newLeaderSessionID);
+                            return;
+                        }
+
+                        LOG.info(
+                                "Resource manager service is granted leadership with session
id {}.",
+                                newLeaderSessionID);
+
+                        try {
+                            startNewLeaderResourceManager(newLeaderSessionID);
+                        } catch (Throwable t) {
+                            fatalErrorHandler.onFatalError(
+                                    new FlinkException("Cannot start resource manager.",
t));
+                        }
+                    }
+                });
+    }
+
+    @Override
+    public void revokeLeadership() {
+        handleLeaderEventExecutor.execute(
+                () -> {
+                    synchronized (lock) {
+                        if (!running) {
+                            LOG.info(
+                                    "Resource manager service is not running. Ignore revoking
leadership.");
+                            return;
+                        }
+
+                        LOG.info(
+                                "Resource manager service is revoked leadership with session
id {}.",
+                                leaderSessionID);
+
+                        stopLeaderResourceManager();
+                    }
+                });
+    }
+
+    @Override
+    public void handleError(Exception exception) {
+        fatalErrorHandler.onFatalError(
+                new FlinkException(
+                        "Exception during leader election of resource manager occurred.",
+                        exception));
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal
+    // ------------------------------------------------------------------------
+
+    @GuardedBy("lock")
+    private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Exception
{
+        stopLeaderResourceManager();
+
+        this.leaderSessionID = newLeaderSessionID;
+        this.leaderResourceManager =
+                resourceManagerFactory.createResourceManager(
+                        rmProcessContext, newLeaderSessionID, ResourceID.generate());
+
+        final ResourceManager<?> newLeaderResourceManager = this.leaderResourceManager;
+
+        previousResourceManagerTerminationFuture
+                .thenComposeAsync(
+                        (ignore) -> {
+                            synchronized (lock) {
+                                return startResourceManagerIfIsLeader(newLeaderResourceManager);
+                            }
+                        },
+                        handleLeaderEventExecutor)
+                .thenAcceptAsync(
+                        (isStillLeader) -> {
+                            if (isStillLeader) {
+                                leaderElectionService.confirmLeadership(
+                                        newLeaderSessionID, newLeaderResourceManager.getAddress());
+                            }
+                        },
+                        ioExecutor);
+    }
+
+    /**
+     * Returns a future that completes as {@code true} if the resource manager is still leader
and
+     * started, and {@code false} if it's no longer leader.
+     */
+    @GuardedBy("lock")
+    private CompletableFuture<Boolean> startResourceManagerIfIsLeader(
+            ResourceManager<?> resourceManager) {
+        if (isLeader(resourceManager)) {
+            resourceManager.start();
+            forwardTerminationFuture(resourceManager);

Review comment:
       I do not find it is problem, but it is a little confusing to me that completing `ResourceManager`
will also complete the `serviceTerminationFuture`, which may cause the JobManager process
to exit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message