spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vadim Semenov <va...@datadoghq.com.INVALID>
Subject Re: Stream is corrupted in ShuffleBlockFetcherIterator
Date Fri, 16 Aug 2019 17:14:36 GMT
This is what you're looking for:

Handle large corrupt shuffle blocks
https://issues.apache.org/jira/browse/SPARK-26089

So until 3.0 the only way I can think of is to reduce the size/split your
job into many

On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin <m.pryahin@gmail.com>
wrote:

> Hello, Spark community!
>
> I've been struggling with my job which constantly fails due to inability
> to uncompress some previously compressed blocks while shuffling data.
> I use spark 2.2.0 with all the configuration settings left by default (no
> specific compression codec is specified). I've ascertained that
> LZ4CompressionCodec is used as a default codec. The job fails as soon as
> the limit of attempts exceeded with the following  message:
>
> Caused by: java.io.IOException: Stream is corrupted
> at
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
> at
> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
> at
> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
> at
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
> at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
> at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
> ... 28 more
> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of
> input buffer
>
>
> Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on how
> to workaround this issue?  I've tried the Snappy codec but it fails
> likewise with a bit different message)
>
> org.apache.spark.shuffle.FetchFailedException: failed to uncompress the
> chunk: FAILED_TO_UNCOMPRESS(5)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: failed to uncompress the chunk:
> FAILED_TO_UNCOMPRESS(5)
> at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
> at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
> at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
> at java.io.InputStream.read(InputStream.java:101)
> at
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
> at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
> at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
> ... 27 more
>
>
> The option of using no compression seems the only feasible for me at this
> point.
> I really need your expert assistance, thank you very much in advance! Any
> help is greatly appreciated!
>
>
> [1] https://issues.apache.org/jira/browse/SPARK-18105
>
>
> Cheers,
> Mike Pryakhin
>
>

-- 
Sent from my iPhone

Mime
View raw message