spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Aley <james.a...@swiftkey.com>
Subject Optimisation advice for Avro->Parquet merge job
Date Thu, 04 Jun 2015 13:29:16 GMT
Hi,

We have a load of Avro data coming into our data systems in the form of
relatively small files, which we're merging into larger Parquet files with
Spark. I've been following the docs and the approach I'm taking seemed
fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
not the most optimal approach.

I was wondering if anyone on this list might have some advice to make to
make this job as efficient as possible. Here's some code:

DataFrame dfInput = sqlContext.load(inputPaths.get(0),
"com.databricks.spark.avro");
long totalSize = getDirSize(inputPaths.get(0));

for (int i = 1; i < inputs.size(); ++i) {
    dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
"com.databricks.spark.avro"));
    totalSize += getDirSize(inputPaths.get(i));
}

int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
DataFrame outputFrame;

// Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
// the synchronize block below. Suggestions welcome here too! :-)
synchronized (this) {
    RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
null);
    outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
}

outputFrame.save(outputPath, "parquet", SaveMode.Overwrite);

Here are some things bothering me:

   - Conversion to an RDD and back again so that we can use coalesce() to
   reduce the number of partitions. This is because we read that repartition()
   is not as efficient as coalesce(), and local micro benchmarks seemed to
   somewhat confirm that this was faster. Is this really a good idea though?
   Should we be doing something else?
   - Usage of unionAll() - this is the only way I could find to join the
   separate data sets into a single data frame to save as Parquet. Is there a
   better way?
   - Do I need to be using the DataFrame API at all? I'm not querying any
   data here, so the nice API for SQL-like transformations of the data isn't
   being used. The DataFrame API just seemed like the path of least resistance
   for working with Avro and Parquet. Would there be any advantage to using
   hadoopRDD() with the appropriate Input/Output formats?


Any advice or tips greatly appreciated!


James.

Mime
View raw message