spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Antoine DUBOIS <antoine.dub...@cc.in2p3.fr>
Subject Re: Identify bottleneck
Date Wed, 18 Dec 2019 13:59:12 GMT

I can confirm that the job is able to use multiple cores on multiple nodes at the same time
and that I have several task running at the same time. 
Depending on my csv it take from 5 part up to several hundred part. 
Regarding the job running locally on one node I took more than 20 minutes, ans didn't had
time to let it finish. 




De: "Enrico Minack" <mail@Enrico.Minack.dev> 
À: "Chris Teoh" <chris.teoh@gmail.com>, "Antoine DUBOIS" <antoine.dubois@cc.in2p3.fr>

Cc: "user @spark" <user@spark.apache.org> 
Envoyé: Mercredi 18 Décembre 2019 14:29:07 
Objet: Re: Identify bottleneck 

Good points, but single-line CSV files are splitable (not multi-line CSV though), especially
in the mentioned size. And bz2 is also splitable, though reading speed is much slower than
uncompressed csv. 

If your csv.bz2 files are not splittable then repartitioning does not improve the situation
much because reading happens through one worker first before repartitioning happens. 

Besides checking the Spark UI SQL tab you can check that your stage has multiple tasks, ideally
200, at least 32 to fully employ your cluster. 


Am 18.12.19 um 13:33 schrieb Chris Teoh: 





Please look at the spark UI and confirm you are indeed getting more than 1 partition in your
dataframe. Text files are usually not splittable so you may just be doing all the work in
a single partition. 

If that is the case, It may be worthwhile considering calling the repartition method to distribute
your data across multiple partitions so you get more parallelism. 

On Wed, 18 Dec 2019, 9:35 pm Antoine DUBOIS, < [ mailto:antoine.dubois@cc.in2p3.fr | antoine.dubois@cc.in2p3.fr
] > wrote: 

BQ_BEGIN

There's 15 withColumn Statement and one drop at the end to remove old column. 
I which I could write it as a single sql statement, but it's not reasonable for maintaining
purpose. 
I will try on a local instance and let you know. 

Thanks for the help. 



De: "Enrico Minack" < [ mailto:mail@Enrico.Minack.dev | mail@Enrico.Minack.dev ] > 
À: [ mailto:user@spark.apache.org | user@spark.apache.org ] , "Antoine DUBOIS" < [ mailto:antoine.dubois@cc.in2p3.fr
| antoine.dubois@cc.in2p3.fr ] > 
Envoyé: Mercredi 18 Décembre 2019 11:13:38 
Objet: Re: Identify bottleneck 

How many withColumn statements do you have? Note that it is better to use a single select,
rather than lots of withColumn. This also makes drops redundant. 

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can
you try this on a single machine, i.e. run wit "local[*]". 

Can you rule out the writing part by counting the rows? I presume this all happens in a single
stage. 

Enrico 


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS: 

BQ_BEGIN

BQ_END

Hello 

I'm working on an ETL based on csv describing file systems to transform it into parquet so
I can work on them easily to extract informations. 
I'm using Mr. Powers framework Daria to do so. I've quiet different input and a lot of transformation
and the framework helps organize the code. 
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and 32GB of memory each.

The storage is handle by a CephFS volume mounted on all nodes. 
First a small description of my algorithm (it's quiet simple): 


BQ_BEGIN

Use SparkContext to load the csv.bz2 file, 
Chain a lot of withColumn() statement, 
Drop all unnecessary columns, 
Write parquet file to CephFS 


BQ_END

This treatment can take several hours depending on how much lines the CSV is and I wanted
to identify if bz2 or network could be an issue 
so I run the following test (several time with consistent result) : 
I tried the following scenario with 20 cores and 2 core per task: 


    * Read the csv.bz2 from CephFS with connection with 1Gb/s for each node: ~5 minutes. 
    * Read the csv.bz2 from TMPFS(setup to look like a shared storage space): ~5 minutes.

    * From the 2 previous tests I concluded that uncompressing the file was part of the bottleneck
so I decided to uncompress the file and store it in TMPFS as well, result: ~5.9 minutes. 
The test file has 25'833'369 lines and is 370MB compressed and 3700MB uncompressed. Those
results have been reproduced several time each. 
My question here is by what am I bottleneck in this case ? 

I though that the uncompressed file in RAM would be the fastest. Is it possible that my program
is suboptimal reading the CSV ? 
In the execution logs on the cluster I have 5 to 10 seconds GC time max, and timeline shows
mainly CPU time (no shuffling, no randomization overload either). 
I also noticed that memory storage is never used during the execution. I know from several
hours of research that bz2 is the only real compression algorithm usable as an input in spark
for parallelization reasons. 

Do you have any idea of why such a behaviour ? 
and do you have any idea on how to improve such treatment ? 

Cheers 

Antoine 





BQ_END






Mime
View raw message