spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vadim Semenov <va...@datadoghq.com.INVALID>
Subject Re: Spark Executor OOMs when writing Parquet
Date Tue, 21 Jan 2020 17:59:36 GMT
>
> Does anyone here monitor the direct memory/off-heap usage of their Spark
> executors?


We at Datadog monitor whole process memory usage (so offheap is roughly
process memory - heap used).

We forked etsy's statsd-jvm-profiler that reports heap metrics and added
process memory usage
https://github.com/DataDog/spark-jvm-profiler/pull/1

it then sends metrics via statsd to datadog and I think influxdb can be used

here's my talk about how we monitor memory and other stuff in spark
https://youtu.be/pz1gOj5grpA?t=355
https://www.slideshare.net/sedictor/using-apache-spark-for-processing-trillions-of-records-each-day-at-datadog/28?src=clipshare

On Tue, Jan 21, 2020 at 8:35 AM Arwin Tio <arwin.tio@hotmail.com> wrote:

> Hi Phillip, All
>
> Thanks for all the help. Empirically, increasing the amount of native
> memory by reducing heap size from 90% -> 80% solved the problem.
>
> I did not look into the usage breakdown of off-heap memory but I would
> imagine it's from Snappy's off-heap buffers, like Vadim said, or perhaps,
> other Spark off-heap internals? I figure whatever it may be, it's not
> really actionable other than making more memory available.
>
> Other things I tried:
>
>
>    - Tried setting a high number of "-XX:MaxDirectMemorySize" based on
>    https://issues.apache.org/jira/browse/SPARK-4073. This was a red
>    herring because it turns out the JVM will try to use up all available
>    native memory by default when that option is not set.
>    - Tried setting "spark.executor.memoryOverhead" to a higher number -
>    but I got some complaints about exceeding the maximum YARN memory
>    allocation. Actually it is unclear to me the relationship between memoryOverhead,
>    the Spark JVM heap, off-heap, and YARN.
>
> Anyways, it seems like all I had to do was reduce my heap size. Does
> anyone here monitor the direct memory/off-heap usage of their Spark
> executors?
>
> Cheers,
>
> Arwin
>
> ------------------------------
> *From:* Phillip Henry <londonjavaman@gmail.com>
> *Sent:* January 21, 2020 6:18 AM
> *To:* Arwin Tio <arwin.tio@hotmail.com>
> *Cc:* Vadim Semenov <vadim@datadoghq.com>; user@spark.apache.org <
> user@spark.apache.org>
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> Hi, Arwin.
>
> Did you establish that this was due to off-heap memory usage? If you're
> running on Linux, you could poll pmap PID to see how much off-heap memory
> is being used...
>
> Phillip
>
> On Sat, Jan 18, 2020 at 2:13 AM Arwin Tio <arwin.tio@hotmail.com> wrote:
>
> Hi Vadim,
>
> Thank you for the help.  It seems that my executor JVMs has plenty of heap
> space, so off-heap may be the issue here:
>
> Green: committed
> Orange: used
>
>
>
> Will simply reducing heap size do the trick?
>
> What about the following config options from
> https://spark.apache.org/docs/latest/configuration.html
>
>
>    - spark.executor.memoryOverhead
>    - spark.memory.offHeap.enabled
>    - spark.memory.offHeap.size
>
> I will experiment with these at once but curious to know your thoughts. Is
> the Snappy off-heap different to the Spark off-heap as allocated by "
> spark.executor.memoryOverhead"?
>
> Thanks,
>
> Arwin
> ------------------------------
> *From:* Vadim Semenov <vadim@datadoghq.com>
> *Sent:* January 17, 2020 12:53 PM
> *To:* Arwin Tio <arwin.tio@hotmail.com>
> *Cc:* user@spark.apache.org <user@spark.apache.org>
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> Based on the error trace it's likely that the system doesn't have enough
> offheap memory, when Parquet is doing Snappy compression it uses native IO
> which works out of heap. You need to reduce the heap size which will give
> more memory for off heap operations.
>
> Caused by: java.lang.OutOfMemoryError
>         at sun.misc.Unsafe.allocateMemory(Native Method)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>         at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
> On Fri, Jan 17, 2020 at 10:12 AM Arwin Tio <arwin.tio@hotmail.com> wrote:
>
> Hello,
>
> I have a fairly straightforward Spark job that converts CSV to Parquet:
>
> ```
> Dataset<Row> df = spark.read(...)
>
> df
>   .repartition(5000)
>   .write()
>   .format("parquet")
>   .parquet("s3://mypath/...);
> ```
>
> For context, there are about 5 billion rows, each with 2000 columns. The
> entire dataset is about 1 TB (compressed).
>
> The error looks like this:
>
> ```
>   20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID
> 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13):
> org.apache.spark.SparkException: Task failed while writing rows.
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:123)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError
>         at sun.misc.Unsafe.allocateMemory(Native Method)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>         at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>         at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>         at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>         at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>         at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>         at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>         at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>         at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
>         at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
>         at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>         at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>         at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>         ... 10 more
> ```
>
> Can anybody give me some pointers on how to tune Spark to avoid this? I am
> using giant machines (r5d.12xlarge) with 384GB of memory. My executors have
> about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?
>
> Another issue I found was
> https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest
> that too many columns could be a problem. I don't want to throw any columns
> though.
>
> One more question, why does saving as Parquet create 4 different stages in
> Spark? You can see in the picture that there are 4 different stages, all at
> "save at LoglineParquetGenerator.java:241". I am not sure how to interpret
> these stages:
>
>
> Thanks,
>
> Arwin
>
>
>
> --
> Sent from my iPhone
>
>

-- 
Sent from my iPhone

Mime
View raw message