spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ayan guha <>
Subject Re: Identify bottleneck
Date Thu, 19 Dec 2019 21:55:47 GMT
Quick question: Why is it better to use one sql vs multiple withColumn?
isnt everything eventually rewritten by catalyst?

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <>

> How many withColumn statements do you have? Note that it is better to use
> a single select, rather than lots of withColumn. This also makes drops
> redundant.
> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is
> really slow. Can you try this on a single machine, i.e. run wit "local[*]".
> Can you rule out the writing part by counting the rows? I presume this all
> happens in a single stage.
> Enrico
> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
> Hello
> I'm working on an ETL based on csv describing file systems to transform it
> into parquet so I can work on them easily to extract informations.
> I'm using Mr. Powers framework Daria to do so. I've quiet different input
> and a lot of transformation and the framework helps organize the code.
> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and
> 32GB of memory each.
> The storage is handle by a CephFS volume mounted on all nodes.
> First a small description of my algorithm (it's quiet simple):
> Use SparkContext to load the csv.bz2 file,
> Chain a lot of withColumn() statement,
> Drop all unnecessary columns,
> Write parquet file to CephFS
> This treatment can take several hours depending on how much lines the CSV
> is and I wanted to identify if bz2 or network could be an issue
> so I run the following test (several time with consistent result) :
> I tried the following scenario with 20 cores and 2 core per task:
>    - Read the csv.bz2 from CephFS with connection with 1Gb/s for each
>    node: ~5 minutes.
>    - Read the csv.bz2 from TMPFS(setup to look like a shared storage
>    space): ~5 minutes.
>    - From the 2 previous tests I concluded that uncompressing the file
>    was part of the bottleneck so I decided to uncompress the file and store it
>    in TMPFS as well, result: ~5.9 minutes.
> The test file has 25'833'369 lines and is 370MB compressed and 3700MB
> uncompressed. Those results have been reproduced several time each.
> My question here is by what am I bottleneck in this case ?
> I though that the uncompressed file in RAM would be the fastest. Is it
> possible that my program is suboptimal reading the CSV ?
> In the execution logs on the cluster I have 5 to 10 seconds GC time max,
> and timeline shows mainly CPU time (no shuffling, no randomization overload
> either).
> I also noticed that memory storage is never used during the execution. I
> know from several hours of research that bz2 is the only real compression
> algorithm usable as an input in spark for parallelization reasons.
> Do you have any idea of why such a behaviour ?
> and do you have any idea on how to improve such treatment ?
> Cheers
> Antoine
> --
Best Regards,
Ayan Guha

View raw message