2015-06-04 15:29 GMT+02:00 James Aley <james.aley@swiftkey.com>:

We have a load of Avro data coming into our data systems in the form of relatively small files, which we're merging into larger Parquet files with Spark. I've been following the docs and the approach I'm taking seemed fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's not the most optimal approach. 

I was wondering if anyone on this list might have some advice to make to make this job as efficient as possible. Here's some code:

DataFrame dfInput = sqlContext.load(inputPaths.get(0), "com.databricks.spark.avro");
long totalSize = getDirSize(inputPaths.get(0));

for (int i = 1; i < inputs.size(); ++i) {
    dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), "com.databricks.spark.avro"));
    totalSize += getDirSize(inputPaths.get(i));

int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
DataFrame outputFrame;

// Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
// the synchronize block below. Suggestions welcome here too! :-)
synchronized (this) {
    RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false, null);
    outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());

outputFrame.save(outputPath, "parquet", SaveMode.Overwrite);

Here are some things bothering me:
  • Conversion to an RDD and back again so that we can use coalesce() to reduce the number of partitions. This is because we read that repartition() is not as efficient as coalesce(), and local micro benchmarks seemed to somewhat confirm that this was faster. Is this really a good idea though? Should we be doing something else?
Repartition uses coalesce but with a forced shuffle step. Its just a shortcut for coalesce(xxx, true)
Doing a coalesce sounds correct, I'd do the same :) Note that if you add the shuffle step, then your partitions should be better balanced.
  • Usage of unionAll() - this is the only way I could find to join the separate data sets into a single data frame to save as Parquet. Is there a better way?
When using directly the inputformats you can do this FileInputFormat.addInputPath, it should perform at least as good as union.
  • Do I need to be using the DataFrame API at all? I'm not querying any data here, so the nice API for SQL-like transformations of the data isn't being used. The DataFrame API just seemed like the path of least resistance for working with Avro and Parquet. Would there be any advantage to using hadoopRDD() with the appropriate Input/Output formats?

Using directly the input/outputformats sounds viable. But the snippet you show seems clean enough and I am not sure there would be much value in making something (maybe) slightly faster but harder to understand.


Any advice or tips greatly appreciated!