spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Li Yuanjian (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
Date Wed, 01 Aug 2018 16:19:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Li Yuanjian updated SPARK-24989:
--------------------------------
    Attachment: FailedStage.png

> BlockFetcher should retry while getting OutOfDirectMemoryError
> --------------------------------------------------------------
>
>                 Key: SPARK-24989
>                 URL: https://issues.apache.org/jira/browse/SPARK-24989
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 2.2.0
>            Reporter: Li Yuanjian
>            Priority: Major
>         Attachments: FailedStage.png
>
>
> h3. Description
> This problem can be reproduced stably by a large parallelism job migrate from map reduce
to Spark in our practice, some metrics list below:
> ||Item||Value||
> |spark.executor.instances|1000|
> |spark.executor.cores|5|
> |task number of shuffle writer stage|18038|
> |task number of shuffle reader stage|80000|
> While the shuffle writer stage successful ended, the shuffle reader stage starting and
keep failing by FetchFail. Each fetch request need the netty sever allocate a buffer in 16MB(detailed
stack attached below), the huge amount of fetch request will use up default maxDirectMemory
rapidly, even though we bump up io.netty.maxDirectMemory to 50GB!
> {code:java}
> org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of
direct memory (used: 21474836480, max: 21474836480)
> 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514)
> 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445)
> 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
> 	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119)
> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:108)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216
byte(s) of direct memory (used: 21474836480, max: 21474836480)
> 	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530)
> 	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484)
> 	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711)
> 	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700)
> 	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
> 	at io.netty.buffer.PoolArena.allocate(PoolArena.java:221)
> 	at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
> 	at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:296)
> 	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
> 	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
> 	at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
> 	at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> 	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
> 	... 1 more
> {code}
> h3. Solution
> Add retry support and bump up java option io.netty.maxDirectMemory and try lager spark.shuffle.io.retryWait
can help the job passing and , I think we need more discussion about load balance of fetch
requests, but maybe the retry support is necessary first.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message