Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6322#discussion_r202013852
--- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
@@ -1759,11 +1759,22 @@ class JobManager(
case None => None
}
- // remove all job-related BLOBs from local and HA store
- libraryCacheManager.unregisterJob(jobID)
- blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+ // remove all job-related BLOBs from local and HA store, only if the job was removed
correctly
+ futureOption match {
+ case Some(future) => future.onComplete{
+ case scala.util.Success(_) => {
+ libraryCacheManager.unregisterJob(jobID)
+ blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+ jobManagerMetricGroup.removeJob(jobID)
+ }
+
+ case scala.util.Failure(_) =>
+
+ }(context.dispatcher)
+
+ case None => None
+ }
- jobManagerMetricGroup.removeJob(jobID)
--- End diff --
this line can also be removed
---
|