flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephan Ewen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4632) when yarn nodemanager lost, flink hung
Date Wed, 21 Sep 2016 10:12:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15509469#comment-15509469
] 

Stephan Ewen commented on FLINK-4632:
-------------------------------------

Do you know which of the following two situations is the case?

(1) A task is in "CANCELING" and then the TaskManager for that specific task is killed. For
some reason, Flink does not learn about the lost TaskManager and waits for the cancellation
to complete.

(2) Flink tries to cancel a Task, but the Task can simply not cancel. Currently Flink relies
on the cooperation of Tasks to cancel. If they cannot cancel, the cancelling may get stuck.


If it is (2), we are currently adding a safety net that kills the TaskManager is cancelling
takes too long. (killing the process is the only guarnteed way in the JVM to terminate a thread).
That should in the YARN and Mesos cases help to recover.

> when yarn nodemanager lost,  flink hung
> ---------------------------------------
>
>                 Key: FLINK-4632
>                 URL: https://issues.apache.org/jira/browse/FLINK-4632
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, Streaming
>    Affects Versions: 1.2.0, 1.1.2
>         Environment: cdh5.5.1  jdk1.7 flink1.1.2  1.2-snapshot   kafka0.8.2
>            Reporter: 刘喆
>
> When run flink streaming on yarn,  using kafka as source,  it runs well when start. But
after long run, for example  8 hours, dealing 60,000,000+ messages, it hung: no messages consumed,
  one taskmanager is CANCELING, the exception show:
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: connection
timeout
> 	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:152)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
> 	at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
> 	at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
> 	at io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
> 	at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:835)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:87)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:162)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: 连接超时
> 	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> 	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> 	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
> 	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> 	... 6 more
> after apply    https://issues.apache.org/jira/browse/FLINK-4625   
> it show:
> java.lang.Exception: TaskManager was lost/killed: ResourceID{resourceId='container_1471620986643_744852_01_001400'}
@ 38.slave.adh (dataPort=45349)
> 	at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:162)
> 	at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
> 	at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:138)
> 	at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
> 	at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:224)
> 	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1054)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:458)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 	at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
> 	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> 	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message