flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (FLINK-9575) Potential race condition when removing JobGraph in HA
Date Wed, 18 Jul 2018 14:39:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Till Rohrmann resolved FLINK-9575.
----------------------------------
    Resolution: Fixed

Fixed via
master:
e984168e2eca59c08da90bd5feeac458eaa91bed
f6b2e8c5ff0304e4835d2dc8c792a0d055679603

1.6.0:
e2b4ffc016da822dda544b31fb3caf679f80a9d9
b9fe077d221bdb013ed57f2555405c9fe4a96aa1

1.5.2:
1bf77cfe17bc046772d02b22d6347388de359ff6
9c4b40dd0bbb22f8f312b0fc42f54a1a4619bf53

> Potential race condition when removing JobGraph in HA
> -----------------------------------------------------
>
>                 Key: FLINK-9575
>                 URL: https://issues.apache.org/jira/browse/FLINK-9575
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.5.0
>            Reporter: Dominik Wosiński
>            Assignee: Dominik Wosiński
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_,
the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result
= if (removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise,
we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client
is closed. Not removing the job immediately allow the // shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => log.warn(s"Could
not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage(
ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable
=> log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } else { None
} currentJobs.remove(jobID) result case None => None } // remove all job-related BLOBs
from local and HA store libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID,
removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of blob files
connected with this jar. This means as far as I understand that there is a potential problem
that we can fail to remove job graph from _submittedJobGraphs._ If the JobManager fails and
we elect the new leader it can try to recover such job, but it will fail with an exception
since the assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message