spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Enrico Minack <>
Subject Re: Identify bottleneck
Date Wed, 18 Dec 2019 10:13:38 GMT
How many withColumn statements do you have? Note that it is better to 
use a single select, rather than lots of withColumn. This also makes 
drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is 
really slow. Can you try this on a single machine, i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume this 
all happens in a single stage.


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
> Hello
> I'm working on an ETL based on csv describing file systems to 
> transform it into parquet so I can work on them easily to extract 
> informations.
> I'm using Mr. Powers framework Daria to do so. I've quiet different 
> input and a lot of transformation and the framework helps organize the 
> code.
> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores 
> and 32GB of memory each.
> The storage is handle by a CephFS volume mounted on all nodes.
> First a small description of my algorithm (it's quiet simple):
>     Use SparkContext to load the csv.bz2 file,
>     Chain a lot of withColumn() statement,
>     Drop all unnecessary columns,
>     Write parquet file to CephFS
> This treatment can take several hours depending on how much lines the 
> CSV is and I wanted to identify if bz2 or network could be an issue
> so I run the following test (several time with consistent result) :
> I tried the following scenario with 20 cores and 2 core per task:
>   * Read the csv.bz2 from CephFS with connection with 1Gb/s for each
>     node: ~5 minutes.
>   * Read the csv.bz2 from TMPFS(setup to look like a shared storage
>     space): ~5 minutes.
>   * From the 2 previous tests I concluded that uncompressing the file
>     was part of the bottleneck so I decided to uncompress the file and
>     store it in TMPFS as well, result: ~5.9 minutes.
> The test file has 25'833'369 lines and is 370MB compressed and 3700MB 
> uncompressed. Those results have been reproduced several time each.
> My question here is by what am I bottleneck in this case ?
> I though that the uncompressed file in RAM would be the fastest. Is it 
> possible that my program is suboptimal reading the CSV ?
> In the execution logs on the cluster I have 5 to 10 seconds GC time 
> max, and timeline shows mainly CPU time (no shuffling, no 
> randomization overload either).
> I also noticed that memory storage is never used during the execution. 
> I know from several hours of research that bz2 is the only real 
> compression algorithm usable as an input in spark for parallelization 
> reasons.
> Do you have any idea of why such a behaviour ?
> and do you have any idea on how to improve such treatment ?
> Cheers
> Antoine

View raw message