Instead of having one job, you can try processing each file in a separate job, but run multiple jobs in parallel within one SparkContext.
Something like this should work for you, it'll submit N jobs from the driver, the jobs will run independently, but executors will dynamically work on different jobs, so you'll utilize executors at full.

```
import org.apache.spark.sql.SparkSession

import scala.collection.parallel.ForkJoinTaskSupport

val spark: SparkSession
val files: Seq[String]
val filesParallelCollection = files.toParArray
val howManyFilesToProcessInParallel = math.min(50, files.length)

filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
  new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel)
)
filesParallelCollection.foreach(file => {
  spark.read.text(file).filter(…)…
})
```

On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller <bluedasyatis@gmail.com> wrote:
More details on what I want to achieve. Maybe someone can suggest a
course of action.

My processing is extremely simple: reading <file_i>.json.gz text
files, filtering each line according a regex, and saving the surviving
lines in a similarly named <result_i>.gz file.

Unfortunately changing the data format is impossible (we are dealing
with terabytes here) so I need to process .gz files.

Ideally, I would like to process a large number of such files in
parallel, that is using n_e executors which would each take care of a
fraction 1/n_e of all the files. The trouble is that I don't know how
to process files in parallel without loading them in the driver first,
which would result in a major bottleneck.

Here was my naive approach in Scala-like pseudo-code:

//
// This happens on the driver
//
val files = List("s3://bckt/file-1.json.gz", ..., "s3://bckt/file-N.json.gz")
val files_rdd = sc.parallelize(files, num_partitions)
//
// Now files_rdd (which only holds file names) is distributed across
several executors
// and/or nodes
//

files_rdd.foreach(
    //
    // It is my understanding that what is performed within the foreach method
    // will be parallelized on several executors / nodes
    //
    file => {
        //
        // This would happen on a given executor: a given input file
is processed
        // entirely on a given executor
        //
        val lines = sc.read.text(file)
        val filtered_lines = lines.filter( // filter based on regex // )
        filtered_lines.write.option("compression",
"gzip").text("a_filename_tbd")
    }
)

Unfortunately this is not possible since the Spark context sc is
defined in the driver and cannot be shared.

My problem would be entirely solved if I could manage to read files
not from the driver, but from a given executor.

Another possibility would be to read each .gz file in the driver
(about 2GB each), materializing the whole resulting RDD on the driver
(around 12GB) and then calling repartition on that RDD, but only the
regex part would be parallelized, and the data shuffling will probably
ruin the performance.

Any idea?

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org