spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sital Kedia (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-15074) Spark shuffle service bottlenecked while fetching large amount of intermediate data
Date Thu, 05 May 2016 22:33:12 GMT

    [ https://issues.apache.org/jira/browse/SPARK-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15273241#comment-15273241
] 

Sital Kedia commented on SPARK-15074:
-------------------------------------

Okay, I made a change to cache the index file and that made the shuffle read time twice as
fast. I am going to put out a PR for that change soon.

Now I see the shuffle service is spending most of the time in the FileChannelImpl.transferTo
method (Refer to the stack trace below). I wonder if there is a way to speed it up further?

<code>
java.lang.Thread.State: RUNNABLE
	at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
	at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:427)
	at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:492)
	at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:607)
	at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
	at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
	at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:254)
	at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:237)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:281)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:761)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:311)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:729)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1127)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:663)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:644)
	at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:663)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:644)
	at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:663)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:693)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:681)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:716)
	at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:954)
	at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:244)
	at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:184)
	at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:129)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:100)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:745)

</code>

> Spark shuffle service bottlenecked while fetching large amount of intermediate data
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-15074
>                 URL: https://issues.apache.org/jira/browse/SPARK-15074
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 1.6.1
>            Reporter: Sital Kedia
>
> While running a job which produces more than 90TB of intermediate data, we find that
about 10-15% of the reducer execution time is being spent in shuffle fetch. 
> Jstack of the shuffle service reveals that most of the time the shuffle service is reading
the index files generated by the mapper. 
> {code}
> java.lang.Thread.State: RUNNABLE
> 	at java.io.FileInputStream.readBytes(Native Method)
> 	at java.io.FileInputStream.read(FileInputStream.java:255)
> 	at java.io.DataInputStream.readFully(DataInputStream.java:195)
> 	at java.io.DataInputStream.readLong(DataInputStream.java:416)
> 	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:277)
> 	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
> 	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
> 	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
> 	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
> 	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
> 	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
> 	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> 	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> 	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> The issue is that for each shuffle fetch, we reopen the same index file again and read
it. It would be much efficient, if we can avoid opening the same file multiple times and cache
the data. We can use an LRU cache to save the index file information. This way we can also
limit the number of entries in the cache so that we don't blow up the memory indefinitely.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message