spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Imran Rashid (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5928) Remote Shuffle Blocks cannot be more than 2 GB
Date Fri, 20 Feb 2015 21:28:11 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329584#comment-14329584
] 

Imran Rashid commented on SPARK-5928:
-------------------------------------

Here are some thoughts on we *might* fix this.  I am definitely not saying an eventual solution
has to do it this way, just figured that I should write down my thoughts in case its useful
to anyone who takes a look at this in the future.  There may be alternate solutions that are
worth considering as well.

Right now, we always try to send at least one full shuffle block in each message.  We could
change the implementation to break one shuffle block into mutliple messages.  The key locations
for that are https://github.com/apache/spark/blob/master/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L93
on the receiving end, which assumes each "chunk" corresponds to exactly one block.  And on
the sending side, that happens here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala#L55.
 The protocol there could change to allow for one block to get sent in multiple chunks.

Ok I know that isn't a lot of info, but maybe at least the pointers to the code are helpful
to somebody :)


> Remote Shuffle Blocks cannot be more than 2 GB
> ----------------------------------------------
>
>                 Key: SPARK-5928
>                 URL: https://issues.apache.org/jira/browse/SPARK-5928
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Imran Rashid
>
> If a shuffle block is over 2GB, the shuffle fails, with an uninformative exception. 
The tasks get retried a few times and then eventually the job fails.
> Here is an example program which can cause the exception:
> {code}
>     val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>       val n = 3e3.toInt
>       val arr = new Array[Byte](n)
>       //need to make sure the array doesn't compress to something small
>       scala.util.Random.nextBytes(arr)
>       arr
>     }
>     rdd.map { x => (1, x)}.groupByKey().count()
> {code}
> Note that you can't trigger this exception in local mode, it only happens on remote fetches.
  I triggered these exceptions running with {{MASTER=yarn-client spark-shell --num-executors
2 --executor-memory 4000m}}
> {noformat}
> 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, imran-3.ent.cloudera.com):
FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0,
message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647:
3021252889 - discarded
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
> 	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds
2147483647: 3021252889 - discarded
> 	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501)
> 	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477)
> 	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403)
> 	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
> 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
> 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> 	... 1 more
> )
> {noformat}
> or if you use "spark.shuffle.blockTransferService=nio", then you get:
> {noformat}
> 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, imran-2.ent.cloudera.com):
FetchFailed(BlockManagerId(2, imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0,
message=
> org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed with ACK that
signalled a remote error: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> 	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
> 	at org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
> 	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
> 	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> 	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
> 	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
> 	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: sendMessageReliably failed with ACK that signalled a
remote error: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> 	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
> 	at org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
> 	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
> 	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> 	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
> 	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> 	at org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954)
> 	at org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940)
> 	at org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67)
> 	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728)
> 	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> 	... 3 more
> )
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message