spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lienhart, Pierre (DI IZ) - AF (ext)" <pilienhart-...@airfrance.fr>
Subject [SPARK-SQL] Writing partitioned parquet requires huge amounts of memory
Date Wed, 14 Nov 2018 15:56:32 GMT
Hi everyone,
The team I am working with is currently plagued by storing its data in hundreds of thousands
of tiny parquet files. I am trying 1) to reduce the number of file 2) reduce the number of
partitions. I wrote a very simple (Py)spark (Spark 2.1.1 packaged within HDP 2.6.2.0) application
which aims at producing one file per partition :
spark.read\
        .option('basePath', old_dir_path)\
        .parquet(*paths)\
        .repartition(*partition_cols)\
        .write\
        .partitionBy(*partition_cols)\
        .option('compression', 'snappy')\
        .mode("append")\
        .parquet(new_dir_path)
My issue is that my executors are keep getting killed by YARN because of memory overhead when
writing the new parquet files. The thing is that my executors (app settings described later
below) are rather big compared to the amount of data to be written : 23GiB heap vs 300MB of
gzipped parquet (1.6GB if cached in memory).
Spark UI description of failure : Job aborted due to stage failure: Task 169 in stage 2.0
failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 (TID 98, xxxxxxxxx.xxxxxx.airfrance.fr,
executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason:
Container killed by YARN for exceeding memory limits. 24.1 GB of 24 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
Every time, I see the process trying to dump HUGE amount of sort data to disk :
18/11/14 14:09:21 INFO UnsafeExternalSorter: Thread 68 spilling sort data of 15.4 GB to disk
(0  time so far)
I have already tried / made sure of many things :

-          Data distribution is not skewed

-          Partitions are rather even in size and not huge

-          Increased memory available to each worker by increasing spark.executor.memory /
increasing spark.memory.fraction / decreasing spark.executor.cores / increasing spark.executor.instances
but still, even when it works, it seems that each worker needs a huge amount of memory compared
to the amount of data to process.
The job always fails the same way : the container gets killed by YARN because of a memory
overhead. I have seen a handful of SO mainly unanswered posts about this: same logs, writing
parquet and amazed that so little data OOMs "big" executors. I found a few discussions mentioning
that writing partitioned parquet uses a lot of memory. SPARK-12546 seems related but the issue
should be solved in my version (brings spark.sql.sources.maxConcurrentWrites default value
down to 1 from 5). Spark 1.6.0 release notes mention this problem as a KNOWN ISSUE and advise
to bring down (why?) spark.memory.fraction to 0.4 for instance and set spark.hadoop.parquet.memory.pool.ratio
to 0.3 (why?).
The latter solution seems to help but still, memory consumption seems high. For example :
with 2 executors with 2 cores, 23GiB spark.executor.memory and 1GiB spark.yarn.executor.memoryOverhead
each, spark.memory.fraction = 0.4 and spark.hadoop.parquet.memory.pool.ratio = 0.3, repartitioning
300MB of gzipped parquet (1.6Gb if cached into memory) implies a high execution memory peak
(7.8 Gb read in Spark UI), dropping huge quantities of data to disk (see logs below + Spark
UI : Shuffle read size : 3.5 Gb). What is going on here ?
18/11/14 14:29:34 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk
(0  time so far)
18/11/14 14:31:12 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk
(1  time so far)
18/11/14 14:32:12 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk
(2  times so far)
18/11/14 14:33:32 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk
(3  times so far)
18/11/14 14:34:29 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk
(4  times so far)
18/11/14 14:35:24 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.8 GB to disk
(5  times so far)
18/11/14 14:37:19 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 7.9 GB to disk
(0  time so far)
18/11/14 14:38:00 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk
(1  time so far)
18/11/14 14:38:42 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk
(2  times so far)
18/11/14 14:39:24 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk
(3  times so far)
18/11/14 14:40:05 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk
(4  times so far)
18/11/14 14:40:47 INFO UnsafeExternalSorter: Thread 61 spilling sort data of 8.0 GB to disk
(5  times so far)
18/11/14 14:41:08 INFO FileFormatWriter: Sorting complete. Writing out partition files one
at a time.
18/11/14 14:41:09 INFO CodecConfig: Compression: SNAPPY
18/11/14 14:41:09 INFO CodecConfig: Compression: SNAPPY
18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet block size to 134217728
18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet page size to 1048576
18/11/14 14:41:09 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576
18/11/14 14:41:09 INFO ParquetOutputFormat: Dictionary is on
18/11/14 14:41:09 INFO ParquetOutputFormat: Validation is off
18/11/14 14:41:09 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
18/11/14 14:41:09 INFO ParquetOutputFormat: Maximum row group padding size is 0 bytes
18/11/14 14:41:09 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst
schema:
Still, I do not really understand what is going on and how the above-mentionned parameters
help in this situation. I really need to get a better understanding to know how to design
my application : how much Spark execution memory per core (=> which value for spark.memory.fraction
? spark.executor.memory ? spark.executor.cores ?) and/or what is roughly the maximum amount
of data a given app can process ?
Is there a particular issue when writing partitioned (?) parquet that should be better documented
?
Many, many thanks in advance for any help, this issue has been and still is a real world of
pain.
And sorry for the long post !
Best regards,
Pierre


[https://poolsite.airfrance.fr/repository/logo_corporate/af_logo_ita.png]<http://www.airfrance.com>
--

Acc?dez aux meilleurs tarifs Air France, g?rez vos r?servations et enregistrez-vous en ligne
sur http://www.airfrance.com
Find best Air France fares, manage your reservations and check in online at http://www.airfrance.com

________________________________
Les donn?es et renseignements contenus dans ce message peuvent ?tre de nature confidentielle
et soumis au secret professionnel et sont destin?s ? l'usage exclusif du destinataire dont
les coordonn?es figurent ci-dessus. Si vous recevez cette communication par erreur, nous vous
demandons de ne pas la copier, l'utiliser ou la divulguer. Nous vous prions de notifier cette
erreur ? l'exp?diteur et d'effacer imm?diatement cette communication de votre syst?me. Soci?t?
Air France - Soci?t? anonyme au capital de 126 748 775 euros - RCS Bobigny (France) 420 495
178 - 45, rue de Paris, Tremblay-en-France, 95747 Roissy Charles de Gaulle CEDEX
The data and information contained in this message may be confidential and subject to professional
secrecy and are intended for the exclusive use of the recipient at the address shown above.
If you receive this message by mistake, we ask you not to copy, use or disclose it. Please
notify this error to the sender immediately and delete this message from your system. Soci?t?
Air France - Limited company with capital of 126,748,775 euros - Bobigny register of companies
(France) 420 495 178 - 45, rue de Paris, Tremblay-en-France, 95747 Roissy Charles de Gaulle
CEDEX
________________________________
Pensez ? l'environnement avant d'imprimer ce message.
Think of the environment before printing this mail.

Mime
View raw message