flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException
Date Wed, 07 Mar 2018 09:02:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389273#comment-16389273
] 

ASF GitHub Bot commented on FLINK-8887:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5648#discussion_r172775779
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
    @@ -404,16 +404,25 @@ public void start() throws Exception {
     		final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
     
     		if (jobManagerRunner != null) {
    -			return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
    -		} else {
    -			final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);
    -
    -			if (jobDetails != null) {
    -				return CompletableFuture.completedFuture(jobDetails.getStatus());
    -			} else {
    -				return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
    +			try {
    +				return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
    --- End diff --
    
    This is an asynchronous call that isn't throwing the exception. You have to add a handler
to the returned `CompletableFuture`. It also only properly resolves one of the exceptions,
and IMO shouldn't catch `Exception` but the specific exceptions we want the workaround to
work for as to not hide other issues.
    
    In any case, I'm not sure if adding workarounds to the Dispatcher is the right way to
go. These issues revealed that some scenarios are not properly handled, and I would prefer
waiting for @tillrohrmann to really fix this in the Dispatcher and related components.
    
    We can temporarily handle both exceptions in the `MiniClusterClient` by adding a *single*
retry (with a short sleep) if a **specific** exception occurs.


> ClusterClient.getJobStatus can throw FencingTokenException
> ----------------------------------------------------------
>
>                 Key: FLINK-8887
>                 URL: https://issues.apache.org/jira/browse/FLINK-8887
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0
>            Reporter: Gary Yao
>            Assignee: vinoyang
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> *Description*
> Calling {{RestClusterClient.getJobStatus}} or {{MiniClusterClient.getJobStatus}} can
result in a {{FencingTokenException}}. 
> *Analysis*
> {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by job id. If
a reference is found, {{requestJobStatus}} is called on the respective instance. If not, the
{{ArchivedExecutionGraphStore}} is queried. However, between the lookup and the method call,
the {{JobMaster}} of the respective job may have lost leadership already (job finished), and
has set the fencing token to {{null}}.
> *Stacktrace*
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token
mismatch: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(requestJobStatus(Time)))
because the fencing token null did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51.
> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> 	at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> 	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token
not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(requestJobStatus(Time)))
because the fencing token is null.
> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> 	at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> 	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message