spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Silvio Fiorito <silvio.fior...@granturing.com>
Subject Re: Parquet 'bucketBy' creates a ton of files
Date Wed, 10 Jul 2019 12:12:26 GMT
It really depends on the use case. Bucketing is storing the data already hash-partitioned.
So, if you frequently perform aggregations or joins on the bucketing column(s) then it can
save you a shuffle. You need to keep in mind that for joins to completely avoid a shuffle
both tables would need to have the same bucketing.

Sorting the data may help with filtering assuming you’re using a file format like Parquet
(e.g. if you frequently filter by account id). If you look at slide 11 in this talk I gave
at Summit you can see a simple example: https://www.slideshare.net/databricks/lessons-from-the-field-episode-ii-applying-best-practices-to-your-apache-spark-applications-with-silvio-fiorito



From: Gourav Sengupta <gourav.sengupta@gmail.com>
Date: Wednesday, July 10, 2019 at 3:14 AM
To: Silvio Fiorito <silvio.fiorito@granturing.com>
Cc: Arwin Tio <arwin.tio@hotmail.com>, "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: Parquet 'bucketBy' creates a ton of files

yeah makes sense, also is there any massive performance improvement using bucketBy in comparison
to sorting?

Regards,
Gourav

On Thu, Jul 4, 2019 at 1:34 PM Silvio Fiorito <silvio.fiorito@granturing.com<mailto:silvio.fiorito@granturing.com>>
wrote:
You need to first repartition (at a minimum by bucketColumn1) since each task will write out
the buckets/files. If the bucket keys are distributed randomly across the RDD partitions,
then you will get multiple files per bucket.

From: Arwin Tio <arwin.tio@hotmail.com<mailto:arwin.tio@hotmail.com>>
Date: Thursday, July 4, 2019 at 3:22 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Parquet 'bucketBy' creates a ton of files

I am trying to use Spark's **bucketBy** feature on a pretty large dataset.

```java
dataframe.write()
    .format("parquet")
    .bucketBy(500, bucketColumn1, bucketColumn2)
    .mode(SaveMode.Overwrite)
    .option("path", "s3://my-bucket")
    .saveAsTable("my_table");
```

The problem is that my Spark cluster has about 500 partitions/tasks/executors (not sure the
terminology), so I end up with files that look like:

```
part-00001-{UUID}_00001.c000.snappy.parquet
part-00001-{UUID}_00002.c000.snappy.parquet
...
part-00001-{UUID}_00500.c000.snappy.parquet

part-00002-{UUID}_00001.c000.snappy.parquet
part-00002-{UUID}_00002.c000.snappy.parquet
...
part-00002-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_00001.c000.snappy.parquet
part-00500-{UUID}_00002.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
```

That's 500x500=250000 bucketed parquet files! It takes forever for the `FileOutputCommitter`
to commit that to S3.

Is there a way to generate **one file per bucket**, like in Hive? Or is there a better way
to deal with this problem? As of now it seems like I have to choose between lowering the parallelism
of my cluster (reduce number of writers) or reducing the parallelism of my parquet files (reduce
number of buckets), which will lower the parallelism of my downstream jobs.

Thanks
Mime
View raw message