flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Congxian Qiu <qcx978132...@gmail.com>
Subject Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream
Date Thu, 16 May 2019 09:15:11 GMT
Hi Pedro

Could you please share the audit log for file
`/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc`,
seems this did not exist cause this problem (maybe this file was created
and deleted for some reason)

Best,
Congxian


Andrey Zagrebin <andrey@ververica.com> 于2019年5月16日周四 下午3:47写道:

> Hi,
>
> could you also post job master logs? and ideally full task manager logs.
> This failure can be caused by some other previous failure.
>
> Best,
> Andrey
>
> On Wed, May 15, 2019 at 2:48 PM PedroMrChaves <pedro.mr.chaves@gmail.com>
> wrote:
>
>> Hello,
>>
>> Every once in a while our checkpoints fail with the following exception:
>>
>> /AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint
>> 65912 for operator AGGREGATION-FILTER (2/2).}
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 65912 for
>> operator AGGREGATION-FILTER (2/2).
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>         ... 6 more
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
>> Could not flush and close the file system output stream to
>>
>> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> in order to obtain the stream state handle
>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>>         at
>>
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>         ... 5 more
>> Caused by: java.io.IOException: Could not flush and close the file system
>> output stream to
>>
>> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> in order to obtain the stream state handle
>>         at
>>
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>>         at
>>
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
>>         at
>>
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
>>         at
>> org.apache.flink.runtime.io
>> .async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>>         ... 7 more
>> Caused by:
>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
>> File
>> does not exist:
>>
>> /flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> (inode 181723246) Holder DFSClient_NONMAPREDUCE_-10072319_1 does not have
>> any open files.
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
>>         at
>>
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
>>         at
>>
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
>>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
>>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:422)
>>         at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)
>>
>>         at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>>         at com.sun.proxy.$Proxy12.complete(Unknown Source)
>>         at
>>
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
>>         at sun.reflect.GeneratedMethodAccessor143.invoke(Unknown Source)
>>         at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>>         at com.sun.proxy.$Proxy13.complete(Unknown Source)
>>         at
>>
>> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
>>         at
>>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>         at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>>         at
>>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>         at
>>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>         at
>>
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
>>         ... 12 more/
>>
>> What could be the problem?
>>
>> Flink version: 1.6.2
>> Checkpointing configuration:
>> Screenshot_2019-05-15_at_13.png
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-05-15_at_13.png>
>>
>> Average state size: ~56 MB
>>
>>
>>
>>
>> -----
>> Best Regards,
>> Pedro Chaves
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Mime
View raw message