flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] tillrohrmann commented on a change in pull request #15561: [FLINK-20695][ha] Clean up ha data for job if globally terminated
Date Tue, 11 May 2021 08:46:22 GMT

tillrohrmann commented on a change in pull request #15561:
URL: https://github.com/apache/flink/pull/15561#discussion_r629951907



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -770,6 +770,14 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
                         jobId,
                         e);
             }
+
+            try {
+                highAvailabilityServices.cleanupJobData(jobId);
+            } catch (Exception e) {
+                log.warn(
+                        "Could not properly clean data for job {} stored by ha services",
jobId, e);
+            }

Review comment:
       I think we mustn't clean up the job data if failed to delete the `JobGraph` from the
`jobGraphWriter`. The problem is that we might lose pointers to some checkpoints while still
trying to resume a `Job` in case of a failover.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -313,6 +314,15 @@ public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFac
         return new ZooKeeperLeaderElectionDriverFactory(client, latchPath, leaderPath);
     }
 
+    public static List<String> getLeaderPathsForJob(
+            final Configuration configuration, final String pathSuffix) {
+        return Arrays.asList(
+                configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH)
+                        + pathSuffix,
+                configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
+                        + pathSuffix);

Review comment:
       What about the checkpoints ZNodes under `HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH/jobId`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -456,6 +459,31 @@ public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws
Exceptio
         assertThatHABlobsHaveBeenRemoved();
     }
 
+    @Test
+    public void testHaDataCleanupWhenJobFinished() throws Exception {
+        TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
+        TestingJobManagerRunner jobManagerRunner =
+                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
+        finishJob(jobManagerRunner);
+        JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS);
+        assertThat(jobID, is(this.jobId));
+    }
+
+    @Test
+    public void testHaDataCleanupWhenJobNotFinished() throws Exception {
+        TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
+        TestingJobManagerRunner jobManagerRunner =
+                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
+        jobManagerRunner.completeResultFutureExceptionally(new JobNotFinishedException(jobId));
+        try {
+            cleanupJobHADataFuture.get(50L, TimeUnit.MILLISECONDS);

Review comment:
       ```suggestion
               cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS);
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
##########
@@ -106,6 +112,29 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails(
         assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, CloseOperations.BLOB_CLOSE));
     }
 
+    @Test
+    public void testCleanupJobData() throws Exception {
+        final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
+        final TestingBlobStoreService testingBlobStoreService =
+                new TestingBlobStoreService(closeOperations);
+
+        JobID jobID = new JobID();
+        CompletableFuture<JobID> jobCleanupFuture = new CompletableFuture<>();
+
+        final TestingHaServices haServices =
+                new TestingHaServices(
+                        new Configuration(),
+                        Executors.directExecutor(),
+                        testingBlobStoreService,
+                        closeOperations,
+                        () -> {},
+                        jobCleanupFuture::complete);
+
+        haServices.cleanupJobData(jobID);
+        JobID jobIDCleaned = jobCleanupFuture.get(2000, TimeUnit.MILLISECONDS);

Review comment:
       ```suggestion
           JobID jobIDCleaned = jobCleanupFuture.get();
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
##########
@@ -148,6 +149,46 @@ public void testCloseAndCleanupAllDataWithUncle() throws Exception {
         assertThat(client.checkExists().forPath(unclePath), is(notNullValue()));
     }
 
+    /** Tests that the ZooKeeperHaServices cleans up paths for job manager. */
+    @Test
+    public void testCleanupJobData() throws Exception {
+        String rootPath = "/foo/bar/flink";
+        final Configuration configuration = createConfiguration(rootPath);
+        String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+        String latchFullPath =
+                rootPath
+                        + namespace
+                        + configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH);
+        String leaderFullPath =
+                rootPath
+                        + namespace
+                        + configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH);
+
+        final TestingBlobStoreService blobStoreService = new TestingBlobStoreService();
+
+        JobID jobID = new JobID();
+        runCleanupTestWithJob(
+                configuration,
+                blobStoreService,
+                jobID,
+                haServices -> {
+                    List<String> latchChildrenBefore = client.getChildren().forPath(latchFullPath);
+                    List<String> leaderChildrenBefore =
+                            client.getChildren().forPath(leaderFullPath);
+
+                    haServices.cleanupJobData(jobID);
+
+                    List<String> latchChildrenAfter = client.getChildren().forPath(latchFullPath);
+                    List<String> leaderChildrenAfter = client.getChildren().forPath(leaderFullPath);
+
+                    latchChildrenBefore.removeAll(latchChildrenAfter);
+                    leaderChildrenBefore.removeAll(leaderChildrenAfter);
+
+                    assertThat(latchChildrenBefore, contains(jobID.toString()));
+                    assertThat(leaderChildrenBefore, contains(jobID.toString()));

Review comment:
       I needed a bit to understand this logic. Could we make this a bit more explicit. E.g.
asserting that `*ChildrenBefore` contains `JobID` and `*ChildrenAfter` does not contain `JobID`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message