flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Elias Levy (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
Date Wed, 01 Aug 2018 21:44:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566032#comment-16566032
] 

Elias Levy commented on FLINK-10011:
------------------------------------

[~azagrebin] I don't think they are the same issue.  The issue I am observing is that the
new JM leader after a failover can't delete a job graph in ZK when it is canceled because
the previous JM leader still has the job graph locked in ZK via the child ephemeral node.

This is the state in ZK:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Job 2a4eff355aef849c5ca37dbac04f2ff1 was running before fail over and we canceled after fail
over.  The job is no longer running, but it is still in ZK.

In the logs we see that the JM 1 (10.210.22.167), that one that became leader after failover,
thinks it deleted the 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled:

July 30th 2018, 15:32:27.231	Trying to cancel job with ID 2a4eff355aef849c5ca37dbac04f2ff1.
July 30th 2018, 15:32:27.232	Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from
state RESTARTING to CANCELED.
July 30th 2018, 15:32:27.232	Stopping checkpoint coordinator for job 2a4eff355aef849c5ca37dbac04f2ff1
July 30th 2018, 15:32:27.239	Removed job graph 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
July 30th 2018, 15:32:27.245	Removing /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1
from ZooKeeper
July 30th 2018, 15:32:27.251	Removing /checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1
from ZooKeeper

Looking at the ZK logs I find the problem:

July 30th 2018, 15:32:27.241	Got user-level KeeperException when processing sessionid:0x2000001d2330001
type:delete cxid:0x434c zxid:0x60009dd94 txntype:-1 reqpath:n/a Error Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
Error:KeeperErrorCode = Directory not empty for /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1

Looking in ZK, we see:

[zk: localhost:2181(CONNECTED) 0] ls /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
[d833418c-891a-4b5e-b983-080be803275c]

>From the comments in ZooKeeperStateHandleStore.java I gather that this child node is used
as a deletion lock.  Looking at the contents of this ephemeral lock node:

[zk: localhost:2181(CONNECTED) 16] get /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
10.210.42.62
cZxid = 0x60002ffa7
ctime = Tue Jun 12 20:01:26 UTC 2018
mZxid = 0x60002ffa7
mtime = Tue Jun 12 20:01:26 UTC 2018
pZxid = 0x60002ffa7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x30000003f4a0003
dataLength = 12
numChildren = 0

and compared to the ephemeral node lock of the currently running job:

[zk: localhost:2181(CONNECTED) 17] get /flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
10.210.22.167
cZxid = 0x60009df4b
ctime = Mon Jul 30 23:01:04 UTC 2018
mZxid = 0x60009df4b
mtime = Mon Jul 30 23:01:04 UTC 2018
pZxid = 0x60009df4b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2000001d2330001
dataLength = 13
numChildren = 0

Assuming the content of the nodes represent the owner, it seems the job graph for the old
canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked by the previous JM leader, JM 2(10.210.42.62),
while the running job locked by the current JM leader, JM 1 (10.210.22.167).

Somehow the previous leader, JM 2, did not give up the lock when leadership failed over to
JM 2.  Note that JM 2 never lost it's ZK session, as it recovered it when it connected to
another ZK node.  So some code in the JobManager needed to explicitly release the lock on
the job graph during failover and failed to do so.

[~till.rohrmann] and [~uce] I think you wrote the ZK HA code.  Any thoughts?


> Old job resurrected during HA failover
> --------------------------------------
>
>                 Key: FLINK-10011
>                 URL: https://issues.apache.org/jira/browse/FLINK-10011
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.4.2
>            Reporter: Elias Levy
>            Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during JobManager high-availability
fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x30000003f4a0003, likely
server has closed socket, closing socket connection and attempting reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs are not
monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender akka.tcp://flink@flink-jm-2:6123/user/jobmanager
no longer participates in the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}}
>  * 15:19:57 JM 2 changes job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status
to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages because
there is no leader
>  ** {{Discard message LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
TaskManager akka://flink/user/taskmanager is disassociating)) because there is currently no
valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to ip-10-210-43-221.ec2.internal/10.210.43.221:2181,
initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.}}
>  ** {{Session establishment complete on server ip-10-210-43-221.ec2.internal/10.210.43.221:2181,
sessionid = 0x30000003f4a0003, negotiated timeout = 40000}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored
again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted leadership
with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager Actor[akka://flink/user/jobmanager#33755521|#33755521] lost
leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
>  ** {{Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:create
cxid:0x4211 zxid:0x60009dc70 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1
Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
>  ** {{Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:create
cxid:0x4230 zxid:0x60009dc78 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2
Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2}}
>  * 15:29:08 JM 1 starts to recover job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
>  ** {{Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1, JobInfo(clients:
Set((Actor[akka.tcp://flink@ip-10-210-42-62.ec2.internal:37564/temp/$c],DETACHED)), start:
1528833686265)).}}
>  ** {{Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).}}
>  ** {{Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Running initialization on master for job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint metadata: File
State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state CREATED to
RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RUNNING to
FAILING.}}{{ org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not
enough free slots available to run the job. You can decrease the operator parallelism or increase
the number of slots per TaskManager in the configuration.}}
>  * 15:20:09 JM 1 starts to recover {color:#d04437}61bca496065cd05e4263070a5e923a05{color}
>  ** {{Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05, JobInfo(clients:
Set((Actor[akka.tcp://flink@ip-10-210-22-167.ec2.internal:41001/temp/$c],DETACHED)), start:
1525728377900)).}}
>  ** {{Submitting recovered job 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).}}
>  ** {{Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Running initialization on master for job Some Job (61bca496065cd05e4263070a5e923a05).}}
>  ** {{Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint metadata: File
State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).}}
>  ** {{Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state CREATED to
RUNNING.}}
>  ** {{Trying to fetch 0 checkpoints from storage}}
>  ** {{Found 0 checkpoints in ZooKeeper.}}
>  * 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs:
>  ** {{Name collision: Group already contains a Metric with the name 'lastCheckpointSize'.
Metric will not be reported.[jobmanager, job]}}
>  ** {{Name collision: Group already contains a Metric with the name 'lastCheckpointAlignmentBuffered'.
Metric will not be reported.[jobmanager, job]}}
>  ** etc
>  * 15:20:39 JM 1 begins attempting to restart the {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
job repeatedly
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state FAILING to
RESTARTING.}}
>  ** {{Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING
to CREATED.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state CREATED to
RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RUNNING to
FAILING.}}
>  * 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a follower
>  
> h4. Analysis
>  
> The cluster was running job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
 The JM HA leader was JM 2, which was connected to ZK 1.  ZK 1 was a follower in the ZK
ensemble.  The ZK leader was ZK 2.  
> ZK 1 lost network connectivity for about 16 minutes.  Upon loss of connectivity to ZK
1, JM 2 gives up leadership and shutdown the  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
job.  JM 2 then immediately connects to ZK 2, without its ZK session having expired.  Nonetheless,
as it gave up leadership JM 1 is elected the new JM leader.
> JM 1 then erroneously decides there are two jobs to restore.  The previously running
job,  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, and {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.
 JM 1 decides there are no checkpoints for {color:#d04437}61bca496065cd05e4263070a5e923a05{color},
which starts right away.   JM 1 attempts to restore {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
from the latest checkpoint, but it fails because there aren't enough task slots in the cluster
as a result of the other job starting.  Thus,  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
entered a restart loop.
> We manually stopped both jobs and starts a new job based on the last known checkpoint
for  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
>  
> Job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}  is an old job that we ran
for a month between May 7th and June 5th.
> After our manual intervention, the the HA state directory in S3 looks like this:
> {{s3cmd ls s3://bucket/flink/cluster_1/recovery/}}
> {{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
> {{2018-07-31 17:33 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5}}
> {{2018-07-31 17:34 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb}}
> {{2018-07-31 17:32 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02}}
> {{2018-06-12 20:01 284626 s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec}}
> {{2018-07-30 23:01 285257 s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c}}
> submittedJobGraph7f627a661cec appears to be job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color},
the long running job that failed during the ZK failover
> submittedJobGraphf3767780c00c appears to be job {color:#205081}d77948df92813a68ea6dfd6783f40e7e{color},
the job we started restoring from a checkpoint after shutting down the duplicate jobs
>  
> A few questions come to mind.
> h5. Why does the JM terminate running jobs when it can immediately connect to another
ZK node and its ZK session has not expired?
> This seems to be a result of using the LeaderLatch recipe in Curator.  It's [docs|https://github.com/Netflix/curator/wiki/Leader-Latch]
state: 
> {quote}LeaderLatch instances add a ConnectionStateListener to watch for connection problems.
If SUSPENDED or LOST is reported, the LeaderLatch that is the *leader will report that it
is no longer the leader* (i.e. there will not be a leader until the connection is re-established).
If a LOST connection is RECONNECTED, the LeaderLatch *will delete its previous ZNode and create
a new one*.
> Users of LeaderLatch must take account that connection issues can cause leadership to
be lost. i.e. hasLeadership() returns true but some time later the connection is SUSPENDED
or LOST. At that point hasLeadership() will return false. It is highly recommended that LeaderLatch
users register a ConnectionStateListener.
> {quote}
> So not only is leadership lost while disconnected, but will likely stay lost when reconnecting
as a result of the znode deletion and recreation.
> This can probably be solved by using the Curator LeaderElection recipe instead, which
[states|https://github.com/Netflix/curator/wiki/Leader-Election]:
> {quote}The {{LeaderSelectorListener}} class extends {{ConnectionStateListener}}. When
the LeaderSelector is started, it adds the listener to the Curator instance. Users of the
{{LeaderSelector}} must pay attention to any connection state changes. If an instance becomes
the leader, it should respond to notification of being SUSPENDED or LOST.
> If the SUSPENDED state is reported, the instance must assume that it might no longer
be the leader until it receives a RECONNECTED state. If the LOST state is reported, the instance
is no longer the leader and its {{takeLeadership}} method should exit.
> {quote}
> So with LeaderElection it seems that what to do during the SUSPENDED state is left up
to the application, which may choose to continue being leader until the state becomes LOST.
> Obviously there are dangers with doing so, but at the very least you would expect the
JM not to give up leadership until it tried to connect to other ZK members within the remaining
ZK session timeout.
> This problem has been [previously discussed|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Zookeeper-failure-handling-td19611.html]
in the mailing list, which led to FLINK-6174 and this [PR|https://github.com/apache/flink/pull/3599],
which appears to be a modification of the Curator LeaderLatch recipe.  It also lead to FLINK-5703,
which seems like an attempt to keep jobs running during JM failover.  While that is a valuable
addition, I argue that is not required to avoid this failure, as the previous leader can continue
being leader so long as it connects to a new ZK before its ZK session expires.
>  
> h5. Why did JM 1 resurrect the old job?
> Something must have been off with the state stored in the S3 HA recovery directory.  The
job must have not been fully removed.  
> In fact, right now the recovery directory has the file submittedJobGraph7f627a661cec
which appears to be job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. Is that expected?
 That job is no longer running.  Shouldn't that file no longer exist in the recovery directory?
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message