spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arwin Tio <arwin....@hotmail.com>
Subject Re: Spark Executor OOMs when writing Parquet
Date Tue, 21 Jan 2020 13:35:00 GMT
Hi Phillip, All

Thanks for all the help. Empirically, increasing the amount of native memory by reducing heap
size from 90% -> 80% solved the problem.

I did not look into the usage breakdown of off-heap memory but I would imagine it's from Snappy's
off-heap buffers, like Vadim said, or perhaps, other Spark off-heap internals? I figure whatever
it may be, it's not really actionable other than making more memory available.

Other things I tried:


  *   Tried setting a high number of "-XX:MaxDirectMemorySize" based on https://issues.apache.org/jira/browse/SPARK-4073.
This was a red herring because it turns out the JVM will try to use up all available native
memory by default when that option is not set.
  *   Tried setting "spark.executor.memoryOverhead" to a higher number - but I got some complaints
about exceeding the maximum YARN memory allocation. Actually it is unclear to me the relationship
between memoryOverhead, the Spark JVM heap, off-heap, and YARN.


Anyways, it seems like all I had to do was reduce my heap size. Does anyone here monitor the
direct memory/off-heap usage of their Spark executors?

Cheers,

Arwin

________________________________
From: Phillip Henry <londonjavaman@gmail.com>
Sent: January 21, 2020 6:18 AM
To: Arwin Tio <arwin.tio@hotmail.com>
Cc: Vadim Semenov <vadim@datadoghq.com>; user@spark.apache.org <user@spark.apache.org>
Subject: Re: Spark Executor OOMs when writing Parquet

Hi, Arwin.

Did you establish that this was due to off-heap memory usage? If you're running on Linux,
you could poll pmap PID to see how much off-heap memory is being used...

Phillip

On Sat, Jan 18, 2020 at 2:13 AM Arwin Tio <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
wrote:
Hi Vadim,

Thank you for the help.  It seems that my executor JVMs has plenty of heap space, so off-heap
may be the issue here:

Green: committed
Orange: used

[cid:a26f23cf-287d-4792-b7d6-d94b5503cde5]

Will simply reducing heap size do the trick?

What about the following config options from https://spark.apache.org/docs/latest/configuration.html


  *   spark.executor.memoryOverhead
  *   spark.memory.offHeap.enabled
  *   spark.memory.offHeap.size

I will experiment with these at once but curious to know your thoughts. Is the Snappy off-heap
different to the Spark off-heap as allocated by "spark.executor.memoryOverhead"?

Thanks,

Arwin
________________________________
From: Vadim Semenov <vadim@datadoghq.com<mailto:vadim@datadoghq.com>>
Sent: January 17, 2020 12:53 PM
To: Arwin Tio <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org> <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark Executor OOMs when writing Parquet

Based on the error trace it's likely that the system doesn't have enough offheap memory, when
Parquet is doing Snappy compression it uses native IO which works out of heap. You need to
reduce the heap size which will give more memory for off heap operations.

Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)

On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is
about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal,
executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines
(r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m).
Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to
suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can
see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241".
I am not sure how to interpret these stages:

[cid:7f595360-e1cd-4e79-9517-aa996f15331d]
Thanks,

Arwin


--
Sent from my iPhone

Mime
View raw message