flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Elias Levy (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-9575) Potential race condition when removing JobGraph in HA
Date Wed, 01 Aug 2018 21:31:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Elias Levy updated FLINK-9575:
------------------------------
    Description: 
When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_,
the following code is executed : 
{noformat}
val futureOption = currentJobs.get(jobID) match {
case Some((eg, _)) =>
val result = if (removeJobFromStateBackend) {
val futureOption = Some(future {
try {
// ...otherwise, we can have lingering resources when there is a concurrent shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
}
}(context.dispatcher))

try {
archive ! decorateMessage(
ArchiveExecutionGraph(
jobID,
ArchivedExecutionGraph.createFrom(eg)))
} catch {
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}

futureOption
} else {
None
}

currentJobs.remove(jobID)

result
case None => None
}

// remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID, removeJobFromStateBackend)

jobManagerMetricGroup.removeJob(jobID)

futureOption
}{noformat}
This causes the asynchronous removal of the job and synchronous removal of blob files connected
with this jar. This means as far as I understand that there is a potential problem that we
can fail to remove job graph from _submittedJobGraphs._ If the JobManager fails and we elect
the new leader it can try to recover such job, but it will fail with an exception since the
assigned blob was already removed.

  was:
When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_,
the following code is executed : 
{noformat}
 
val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result = if
(removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise, we can
have lingering resources when there is a concurrent shutdown // and the ZooKeeper client is
closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID)
} catch { case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.",
t) } }(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( jobID,
ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => log.warn(s"Could
not archive the execution graph $eg.", t) } futureOption } else { None } currentJobs.remove(jobID)
result case None => None } // remove all job-related BLOBs from local and HA store libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID, removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID)
futureOption }
val futureOption = currentJobs.get(jobID) match {
case Some((eg, _)) =>
val result = if (removeJobFromStateBackend) {
val futureOption = Some(future {
try {
// ...otherwise, we can have lingering resources when there is a concurrent shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
}
}(context.dispatcher))

try {
archive ! decorateMessage(
ArchiveExecutionGraph(
jobID,
ArchivedExecutionGraph.createFrom(eg)))
} catch {
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}

futureOption
} else {
None
}

currentJobs.remove(jobID)

result
case None => None
}

// remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID, removeJobFromStateBackend)

jobManagerMetricGroup.removeJob(jobID)

futureOption
}{noformat}
This causes the asynchronous removal of the job and synchronous removal of blob files connected
with this jar. This means as far as I understand that there is a potential problem that we
can fail to remove job graph from _submittedJobGraphs._ If the JobManager fails and we elect
the new leader it can try to recover such job, but it will fail with an exception since the
assigned blob was already removed.


> Potential race condition when removing JobGraph in HA
> -----------------------------------------------------
>
>                 Key: FLINK-9575
>                 URL: https://issues.apache.org/jira/browse/FLINK-9575
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.5.0
>            Reporter: Dominik Wosiński
>            Assignee: Dominik Wosiński
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_,
the following code is executed : 
> {noformat}
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of blob files
connected with this jar. This means as far as I understand that there is a potential problem
that we can fail to remove job graph from _submittedJobGraphs._ If the JobManager fails and
we elect the new leader it can try to recover such job, but it will fail with an exception
since the assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message