Here's the use case:

We've a bunch of directories (over 1000) which contain tons of small files in each. Each directory is for a different customer so they are independent in that respect. We need to merge all the small files in each directory into one (or a few) compacted file(s) by using a 'coalesce' function.

Clearly we can do this on the Driver by doing something like:

list.par.foreach (dir =>compact(spark, dir))

This works but the problem here is that the parallelism happens on Driver which won't scale when we've 10,000 customers! At any given time there will be only as many compactions happening as the number of cores on the Driver, right?

We were hoping to do this:

val df = list.toDF()
df.foreach(dir => compact(spark,dir))

Our hope was, this will distribute the load amongst Spark Executors & will scale better.  But this throws the NullPointerException shown in the original email.

Is there a better way to do this?


On Tue, May 25, 2021 at 1:10 PM Silvio Fiorito <silvio.fiorito@granturing.com> wrote:

Why not just read from Spark as normal? Do these files have different or incompatible schemas?

 

val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)

 

From: Eric Beabes <mailinglists19@gmail.com>
Date: Tuesday, May 25, 2021 at 1:24 PM
To: spark-user <user@spark.apache.org>
Subject: Reading parquet files in parallel on the cluster

 

I've a use case in which I need to read Parquet files in parallel from over 1000+ directories. I am doing something like this:

 

   val df = list.toList.toDF()

   
df.foreach(c => {
     
val config = getConfigs()
      doSomething(spark, config)
    })
 
In the doSomething method, when I try to do this:
val df1 = spark.read.parquet(pathToRead).collect()
 
I get a NullPointer exception given below. It seems the 'spark.read' only works on the Driver not on the cluster. How can I do what I want to do? Please let me know. Thank you.
 

21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9, ip-10-0-5-3.us-west-2.compute.internal, executor 11): java.lang.NullPointerException

 

        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)

 

        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)

 

        at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:789)

 

        at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)