spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vadim Semenov <va...@datadoghq.com.INVALID>
Subject Re: Spark Executor OOMs when writing Parquet
Date Fri, 17 Jan 2020 17:53:54 GMT
Based on the error trace it's likely that the system doesn't have enough
offheap memory, when Parquet is doing Snappy compression it uses native IO
which works out of heap. You need to reduce the heap size which will give
more memory for off heap operations.

Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)

On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <arwin.tio@hotmail.com> wrote:

> Hello,
>
> I have a fairly straightforward Spark job that converts CSV to Parquet:
>
> ```
> Dataset<Row> df = spark.read(...)
>
> df
>   .repartition(5000)
>   .write()
>   .format("parquet")
>   .parquet("s3://mypath/...);
> ```
>
> For context, there are about 5 billion rows, each with 2000 columns. The
> entire dataset is about 1 TB (compressed).
>
> The error looks like this:
>
> ```
>   20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID
> 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13):
> org.apache.spark.SparkException: Task failed while writing rows.
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:123)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError
>         at sun.misc.Unsafe.allocateMemory(Native Method)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>         at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>         at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>         at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>         at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>         at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>         at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>         at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>         at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
>         at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
>         at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>         at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>         at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>         ... 10 more
> ```
>
> Can anybody give me some pointers on how to tune Spark to avoid this? I am
> using giant machines (r5d.12xlarge) with 384GB of memory. My executors have
> about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?
>
> Another issue I found was
> https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest
> that too many columns could be a problem. I don't want to throw any columns
> though.
>
> One more question, why does saving as Parquet create 4 different stages in
> Spark? You can see in the picture that there are 4 different stages, all at
> "save at LoglineParquetGenerator.java:241". I am not sure how to interpret
> these stages:
>
>
> Thanks,
>
> Arwin
>


-- 
Sent from my iPhone

Mime
View raw message