flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gary Yao (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable
Date Wed, 14 Mar 2018 14:17:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Gary Yao updated FLINK-8943:
----------------------------
    Description: 
*Description*
Job graphs will be recovered only once from the DFS. If the DFS is unavailable at the recovery
attempt, the jobs will simply be not running until the master is restarted again.

*Steps to reproduce*
# Submit job on Flink Cluster with HDFS as HA storage dir.
# Trigger job recovery by killing the master
# Stop HDFS NameNode
# Enable HDFS NameNode after job recovery is over
# Verify that job is not running.

*Expected behavior*
Master should fail fast and exit. 

*Stacktrace*
{noformat}
2018-03-14 14:01:37,704 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher   
  - Could not recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle
under /a41d50b6f3ac16a730dd12792a847c97. This indicates that the retrieved state handle is
broken. Try cleaning the state handle store.
	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 to ip-172-31-32-118.eu-central-1.compute.internal:9000
failed on connection exception: java.net.ConnectException: Connection refused; For more details
see:  http://wiki.apache.org/hadoop/ConnectionRefused
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
	at org.apache.hadoop.ipc.Client.call(Client.java:1435)
	at org.apache.hadoop.ipc.Client.call(Client.java:1345)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
	at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
	at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
	... 7 more
Caused by: java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
	at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
	at org.apache.hadoop.ipc.Client.call(Client.java:1381)
	... 40 more
{noformat}

  was:
*Description*
Job graphs will be recovered only once from the DFS. If the DFS is unavailable at the recovery
attempt, the jobs will simply be not running until the master is restarted again.

*Steps to reproduce*
# Submit job on Flink Cluster with HDFS as HA storage dir.
# Trigger job recovery by killing the master
# Stop HDFS NameNode
# Enable HDFS NameNode after job recovery is over
# Verify that job is not running.

*Stacktrace*
{noformat}
2018-03-14 14:01:37,704 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher   
  - Could not recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle
under /a41d50b6f3ac16a730dd12792a847c97. This indicates that the retrieved state handle is
broken. Try cleaning the state handle store.
	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 to ip-172-31-32-118.eu-central-1.compute.internal:9000
failed on connection exception: java.net.ConnectException: Connection refused; For more details
see:  http://wiki.apache.org/hadoop/ConnectionRefused
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
	at org.apache.hadoop.ipc.Client.call(Client.java:1435)
	at org.apache.hadoop.ipc.Client.call(Client.java:1345)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
	at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
	at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
	... 7 more
Caused by: java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
	at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
	at org.apache.hadoop.ipc.Client.call(Client.java:1381)
	... 40 more
{noformat}


> Jobs will not recover if DFS is temporarily unavailable
> -------------------------------------------------------
>
>                 Key: FLINK-8943
>                 URL: https://issues.apache.org/jira/browse/FLINK-8943
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0, 1.4.2, 1.6.0
>            Reporter: Gary Yao
>            Priority: Blocker
>              Labels: flip6
>             Fix For: 1.5.0, 1.6.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is unavailable at the
recovery attempt, the jobs will simply be not running until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> Master should fail fast and exit. 
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher
     - Could not recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state
handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates that the retrieved state handle
is broken. Try cleaning the state handle store.
> 	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
> 	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 to ip-172-31-32-118.eu-central-1.compute.internal:9000
failed on connection exception: java.net.ConnectException: Connection refused; For more details
see:  http://wiki.apache.org/hadoop/ConnectionRefused
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
> 	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1435)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1345)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> 	at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
> 	at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
> 	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
> 	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
> 	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
> 	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
> 	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
> 	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
> 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
> 	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
> 	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
> 	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
> 	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
> 	... 7 more
> Caused by: java.net.ConnectException: Connection refused
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
> 	at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1381)
> 	... 40 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message