spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Phillip Henry <londonjava...@gmail.com>
Subject Re: Spark Executor OOMs when writing Parquet
Date Tue, 21 Jan 2020 11:18:57 GMT
Hi, Arwin.

Did you establish that this was due to off-heap memory usage? If you're
running on Linux, you could poll pmap PID to see how much off-heap memory
is being used...

Phillip

On Sat, Jan 18, 2020 at 2:13 AM Arwin Tio <arwin.tio@hotmail.com> wrote:

> Hi Vadim,
>
> Thank you for the help.  It seems that my executor JVMs has plenty of heap
> space, so off-heap may be the issue here:
>
> Green: committed
> Orange: used
>
>
>
> Will simply reducing heap size do the trick?
>
> What about the following config options from
> https://spark.apache.org/docs/latest/configuration.html
>
>
>    - spark.executor.memoryOverhead
>    - spark.memory.offHeap.enabled
>    - spark.memory.offHeap.size
>
> I will experiment with these at once but curious to know your thoughts. Is
> the Snappy off-heap different to the Spark off-heap as allocated by "
> spark.executor.memoryOverhead"?
>
> Thanks,
>
> Arwin
> ------------------------------
> *From:* Vadim Semenov <vadim@datadoghq.com>
> *Sent:* January 17, 2020 12:53 PM
> *To:* Arwin Tio <arwin.tio@hotmail.com>
> *Cc:* user@spark.apache.org <user@spark.apache.org>
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> Based on the error trace it's likely that the system doesn't have enough
> offheap memory, when Parquet is doing Snappy compression it uses native IO
> which works out of heap. You need to reduce the heap size which will give
> more memory for off heap operations.
>
> Caused by: java.lang.OutOfMemoryError
>         at sun.misc.Unsafe.allocateMemory(Native Method)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>         at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
> On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <arwin.tio@hotmail.com> wrote:
>
> Hello,
>
> I have a fairly straightforward Spark job that converts CSV to Parquet:
>
> ```
> Dataset<Row> df = spark.read(...)
>
> df
>   .repartition(5000)
>   .write()
>   .format("parquet")
>   .parquet("s3://mypath/...);
> ```
>
> For context, there are about 5 billion rows, each with 2000 columns. The
> entire dataset is about 1 TB (compressed).
>
> The error looks like this:
>
> ```
>   20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID
> 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13):
> org.apache.spark.SparkException: Task failed while writing rows.
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:123)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError
>         at sun.misc.Unsafe.allocateMemory(Native Method)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>         at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>         at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>         at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>         at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>         at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>         at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>         at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>         at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
>         at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
>         at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>         at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>         at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>         ... 10 more
> ```
>
> Can anybody give me some pointers on how to tune Spark to avoid this? I am
> using giant machines (r5d.12xlarge) with 384GB of memory. My executors have
> about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?
>
> Another issue I found was
> https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest
> that too many columns could be a problem. I don't want to throw any columns
> though.
>
> One more question, why does saving as Parquet create 4 different stages in
> Spark? You can see in the picture that there are 4 different stages, all at
> "save at LoglineParquetGenerator.java:241". I am not sure how to interpret
> these stages:
>
>
> Thanks,
>
> Arwin
>
>
>
> --
> Sent from my iPhone
>

Mime
View raw message