spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arwin Tio <arwin....@hotmail.com>
Subject Re: Spark Executor OOMs when writing Parquet
Date Sat, 18 Jan 2020 02:06:52 GMT
Okay! I didn't realize you can pump those partition numbers up that high. 15000 partitions
still failed. I am trying 30000 partitions now. There is still some disk spill but it is not
that high.

Thanks,

Arwin

________________________________
From: Chris Teoh <chris.teoh@gmail.com>
Sent: January 17, 2020 7:32 PM
To: Arwin Tio <arwin.tio@hotmail.com>
Cc: user @spark <user@spark.apache.org>
Subject: Re: Spark Executor OOMs when writing Parquet

You also have disk spill which is a performance hit.

Try multiplying the number of partitions by about 20x - 40x and see if you can eliminate shuffle
spill.

On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
wrote:
Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under the impression
that memory spill is OK?

[cid:52075a7e-f05d-4d0d-a6e3-0ea4f7cf2c6c]
(If you're wondering, this is EMR).

________________________________
From: Chris Teoh <chris.teoh@gmail.com<mailto:chris.teoh@gmail.com>>
Sent: January 17, 2020 10:30 AM
To: Arwin Tio <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
Cc: user @spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark Executor OOMs when writing Parquet

Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are
your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset<Row> df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is
about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal,
executor 13): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
        at sun.misc.Unsafe.allocateMemory(Native Method)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
        at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
        at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
        at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
        at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
        at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
        at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
        at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
        at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
        at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
        at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
```

Can anybody give me some pointers on how to tune Spark to avoid this? I am using giant machines
(r5d.12xlarge) with 384GB of memory. My executors have about 350GB of memory (-Xmx350000m).
Could it be that my heap is too large?

Another issue I found was https://issues.apache.org/jira/browse/PARQUET-222, which seems to
suggest that too many columns could be a problem. I don't want to throw any columns though.

One more question, why does saving as Parquet create 4 different stages in Spark? You can
see in the picture that there are 4 different stages, all at "save at LoglineParquetGenerator.java:241".
I am not sure how to interpret these stages:

[X]
Thanks,

Arwin

Mime
View raw message