From user-return-35012-apmail-spark-user-archive=spark.apache.org@spark.apache.org Mon Jun 8 07:00:19 2015 Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1BAC3177B8 for ; Mon, 8 Jun 2015 07:00:19 +0000 (UTC) Received: (qmail 43453 invoked by uid 500); 8 Jun 2015 07:00:13 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 43352 invoked by uid 500); 8 Jun 2015 07:00:13 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 43342 invoked by uid 99); 8 Jun 2015 07:00:13 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2015 07:00:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AC7FFC0944 for ; Mon, 8 Jun 2015 07:00:12 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.9 X-Spam-Level: ** X-Spam-Status: No, score=2.9 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id I7b7JZ7O6Ue4 for ; Mon, 8 Jun 2015 07:00:02 +0000 (UTC) Received: from mail-wi0-f169.google.com (mail-wi0-f169.google.com [209.85.212.169]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id BFD4D43ACE for ; Mon, 8 Jun 2015 07:00:01 +0000 (UTC) Received: by wibut5 with SMTP id ut5so75188965wib.1 for ; Mon, 08 Jun 2015 00:00:01 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=UNE0cpmYj8O0z3oTq5gbszlh08xaB3NMRCb/llml95w=; b=cSZDAV+7kZJkf7pCNNC4K/VU+RZez+vG3BYaWSDBkno3beaCJLP+ZCCs5dkTV+Cwcj VqnoO5/NHo5UY9Cg2HMPT/ppAOWOBDZyaNt8FG52zQFe5j/aqZU17qY3cn/CUJap7egH cnVlwSV64QoBB9+ALakkmk2F64XJdJQQUkuJ33DhwvzcyFU1oSSNl2EMTQCXIpR6TBi6 v8/POVkX9MGDsnaftUjkjH8l+X5YM0MsA1nYDAjairuMapMktJLhHw7TRod9wHGTcnxG PWG0F0K0LMgV8py43kQsAsQPYo/hckNpURORcSsEjY8iGNOomaLgrXDpKYaDqAe5DjCb wg6g== MIME-Version: 1.0 X-Received: by 10.180.89.234 with SMTP id br10mr19262546wib.86.1433746801014; Mon, 08 Jun 2015 00:00:01 -0700 (PDT) Received: by 10.180.107.193 with HTTP; Mon, 8 Jun 2015 00:00:00 -0700 (PDT) In-Reply-To: References: Date: Mon, 8 Jun 2015 12:30:00 +0530 Message-ID: Subject: Re: Optimisation advice for Avro->Parquet merge job From: kiran lonikar To: James Aley Cc: Eugen Cepoi , user Content-Type: multipart/alternative; boundary=e89a8f3ba2d5c5bc130517fc301b --e89a8f3ba2d5c5bc130517fc301b Content-Type: text/plain; charset=UTF-8 James, As I can see, there are three distinct parts to your program: - for loop - synchronized block - final outputFrame.save statement Can you do a separate timing measurement by putting a simple System.currentTimeMillis() around these blocks to know how much they are taking and then try to optimize where it takes longest? In the second block, you may want to measure the time for the two statements. Improving this boils down to playing with spark settings. Now consider the first block: I think this is a classic case of merge sort or a reduce tree. You already tried to improve this by submitting jobs in parallel using executor pool/Callable etc. To further improve the parallelization, I suggest you use a reduce tree like approach. For example, lets say you want to compute sum of all elements of an array in parallel. The way its solved for a GPU like platform is you divide your input array initially in chunks of 2, compute those n/2 sums parallely on separate threads and save the result in the first of the two elements. In the next iteration, you compute n/4 sums parallely of the earlier sums and so on till you are left with only two elements whose sum gives you final sum. You are performing many sequential unionAll operations for inputs.size() avro files. Assuming the unionAll() on DataFrame is blocking (and not a simple transformation like on RDDs) and actually performs the union operation, you will certainly benefit by parallelizing this loop. You may change the loop to something like below: // pseudo code only int n = inputs.size() // initialize executor executor = new FixedThreadPoolExecutor(n/2) dfInput = new DataFrame[n/2] for(int i =0;i < n/2;i++) { executor.submit(new Runnable() { public void run() { // union of i and i+n/2 // showing [] only to bring out array access. Replace with dfInput(i) in your code dfInput[i] = sqlContext.load(inputPaths.get(i), "com.databricks.spark.avro").unionAll(sqlContext.load(inputsPath.get(i + n/2), "com.databricks.spark.avro")) } }); } executor.awaitTermination(0, TimeUnit.SECONDS) int steps = log(n)/log(2.0) for(s = 2; s < steps;s++) { int stride = n/(1 << s); // n/(2^s) for(int i = 0;i < stride;i++) { executor.submit(new Runnable() { public void run() { // union of i and i+n/2 // showing [] only to bring out array access. Replace with dfInput(i) and dfInput(i+stride) in your code dfInput[i] = dfInput[i].unionAll(dfInput[i + stride]) } }); } executor.awaitTermination(0, TimeUnit.SECONDS) } Let me know if it helped. -Kiran On Thu, Jun 4, 2015 at 8:57 PM, James Aley wrote: > Thanks for the confirmation! We're quite new to Spark, so a little > reassurance is a good thing to have sometimes :-) > > The thing that's concerning me at the moment is that my job doesn't seem > to run any faster with more compute resources added to the cluster, and > this is proving a little tricky to debug. There are a lot of variables, so > here's what we've tried already and the apparent impact. If anyone has any > further suggestions, we'd love to hear! > > * Increase the "minimum" number of output files (targetPartitions above), > so that input groups smaller than our minimum chunk size can still be > worked on by more than one executor. This does measurably speed things up, > but obviously it's a trade-off, as the original goal for this job is to > merge our data into fewer, larger files. > > * Submit many jobs in parallel, by running the above code in a Callable, > on an executor pool. This seems to help, to some extent, but I'm not sure > what else needs to be configured alongside it -- driver threads, scheduling > policy, etc. We set scheduling to "FAIR" when doing this, as that seemed > like the right approach, but we're not 100% confident. It seemed to help > quite substantially anyway, so perhaps this just needs further tuning? > > * Increasing executors, RAM, etc. This doesn't make a difference by itself > for this job, so I'm thinking we're already not fully utilising the > resources we have in a smaller cluster. > > Again, any recommendations appreciated. Thanks for the help! > > > James. > > On 4 June 2015 at 15:00, Eugen Cepoi wrote: > >> Hi >> >> 2015-06-04 15:29 GMT+02:00 James Aley : >> >>> Hi, >>> >>> We have a load of Avro data coming into our data systems in the form of >>> relatively small files, which we're merging into larger Parquet files with >>> Spark. I've been following the docs and the approach I'm taking seemed >>> fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's >>> not the most optimal approach. >>> >>> I was wondering if anyone on this list might have some advice to make to >>> make this job as efficient as possible. Here's some code: >>> >>> DataFrame dfInput = sqlContext.load(inputPaths.get(0), >>> "com.databricks.spark.avro"); >>> long totalSize = getDirSize(inputPaths.get(0)); >>> >>> for (int i = 1; i < inputs.size(); ++i) { >>> dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), >>> "com.databricks.spark.avro")); >>> totalSize += getDirSize(inputPaths.get(i)); >>> } >>> >>> int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES); >>> DataFrame outputFrame; >>> >>> // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence >>> // the synchronize block below. Suggestions welcome here too! :-) >>> synchronized (this) { >>> RDD inputRdd = dfInput.rdd().coalesce(targetPartitions, false, >>> null); >>> outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema()); >>> } >>> >>> outputFrame.save(outputPath, "parquet", SaveMode.Overwrite); >>> >>> Here are some things bothering me: >>> >>> - Conversion to an RDD and back again so that we can use coalesce() >>> to reduce the number of partitions. This is because we read that >>> repartition() is not as efficient as coalesce(), and local micro benchmarks >>> seemed to somewhat confirm that this was faster. Is this really a good idea >>> though? Should we be doing something else? >>> >>> Repartition uses coalesce but with a forced shuffle step. Its just a >> shortcut for coalesce(xxx, true) >> Doing a coalesce sounds correct, I'd do the same :) Note that if you add >> the shuffle step, then your partitions should be better balanced. >> >>> >>> - Usage of unionAll() - this is the only way I could find to join >>> the separate data sets into a single data frame to save as Parquet. Is >>> there a better way? >>> >>> When using directly the inputformats you can do this >> FileInputFormat.addInputPath, it should perform at least as good as union. >> >>> >>> - Do I need to be using the DataFrame API at all? I'm not querying >>> any data here, so the nice API for SQL-like transformations of the data >>> isn't being used. The DataFrame API just seemed like the path of least >>> resistance for working with Avro and Parquet. Would there be any advantage >>> to using hadoopRDD() with the appropriate Input/Output formats? >>> >>> >>> >> Using directly the input/outputformats sounds viable. But the snippet you >> show seems clean enough and I am not sure there would be much value in >> making something (maybe) slightly faster but harder to understand. >> >> >> Eugen >> >> Any advice or tips greatly appreciated! >>> >>> >>> James. >>> >>> >>> >> > --e89a8f3ba2d5c5bc130517fc301b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
James,

As I can see, there are th= ree distinct parts to your program:
  • for loop
  • synchroniz= ed block
  • final outputFrame.save statement
Can you do a sep= arate timing measurement by putting a simple System.currentTimeMillis() aro= und these blocks to know how much they are taking and then try to optimize = where it takes longest? In the second block, you may want to measure the ti= me for the two statements. Improving this boils down to playing with spark = settings.

Now consider the first block: I think th= is is a classic case of merge sort or a reduce tree. You already tried to i= mprove this by submitting jobs in parallel using executor pool/Callable etc= .

To further improve the parallelization, I sugges= t you use a reduce tree like approach. For example, lets say you want to co= mpute sum of all elements of an array in parallel. The way its solved for a= GPU like platform is you divide your input array initially in chunks of 2,= compute those n/2 sums parallely on separate threads and save the result i= n the first of the two elements. In the next iteration, you compute n/4 sum= s parallely of the earlier sums and so on till you are left with only two e= lements whose sum gives you final sum.

You are per= forming many sequential unionAll operations for inputs.size() avro files. A= ssuming the unionAll() on DataFrame is blocking (and not a simple transform= ation like on RDDs) and actually performs the union operation, you will cer= tainly benefit by parallelizing this loop. You may change the loop to somet= hing like below:

// pseudo code only
int= n =3D inputs.size()
// initialize executor
executo= r =3D new FixedThreadPoolExecutor(n/2)
dfInput =3D new DataFrame[= n/2]
for(int i =3D0;i < n/2;i++) {
=C2=A0 =C2= =A0 executor.submit(new Runnable() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = public void run() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 // = union of i and i+n/2
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 //= showing [] only to bring out array access. Replace with dfInput(i) in your= code
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 dfInput[i] =3D sq= lContext.load(inputPaths.get(i), "com.databricks.spark.avro").uni= onAll(sqlContext.load(inputsPath.get(i + n/2), "com.databricks.spark.a= vro"))
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0= });
}

executor.awaitTermination(0= , TimeUnit.SECONDS)

int steps =3D log(n)/log(2.0)<= br>
for(s =3D 2; s < steps;s++) {
=C2=A0 =C2=A0 int = stride =3D n/(1 << s); // n/(2^s)
=C2=A0 =C2=A0 for(int i = =3D 0;i < stride;i++) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 execu= tor.submit(new Runnable() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 public void run() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 // union of i and i+n/2
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 // showing [] only to bring out array ac= cess. Replace with dfInput(i) and dfInput(i+stride) in your code
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 dfInput[i] =3D dfIn= put[i].unionAll(dfInput[i + stride])
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 });
= =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 executor.awaitTermination(0, TimeUn= it.SECONDS)
}

Let me know if it = helped.

-Kiran


On Thu, Jun 4, 2015 at 8:5= 7 PM, James Aley <james.aley@swiftkey.com> wrote:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex">
Thanks for the confirmation!= We're quite new to Spark, so a little reassurance is a good thing to h= ave sometimes :-)

The thing that's concerning me at = the moment is that my job doesn't seem to run any faster with more comp= ute resources added to the cluster, and this is proving a little tricky to = debug. There are a lot of variables, so here's what we've tried alr= eady and the apparent impact. If anyone has any further suggestions, we'= ;d love to hear!

* Increase the "minimum"= ; number of output files (targetPartitions above), so that input groups sma= ller than our minimum chunk size can still be worked on by more than one ex= ecutor. This does measurably speed things up, but obviously it's a trad= e-off, as the original goal for this job is to merge our data into fewer, l= arger files.

* Submit many jobs in parallel, by ru= nning the above code in a Callable, on an executor pool. This seems to help= , to some extent, but I'm not sure what else needs to be configured alo= ngside it -- driver threads, scheduling policy, etc. We set scheduling to &= quot;FAIR" when doing this, as that seemed like the right approach, bu= t we're not 100% confident. It seemed to help quite substantially anywa= y, so perhaps this just needs further tuning?

* In= creasing executors, RAM, etc. This doesn't make a difference by itself = for this job, so I'm thinking we're already not fully utilising the= resources we have in a smaller cluster.

Again, an= y recommendations appreciated. Thanks for the help!


James.

On 4 June 2015 at 15:00, Eugen = Cepoi <cepoi.eugen@gmail.com> wrote:
Hi

<= div class=3D"gmail_quote">2015-06-04 15:29 GMT+02:00 James Aley <james.aley@swiftkey.com>:
Hi,

We have a load= of Avro data coming into our data systems in the form of relatively small = files, which we're merging into larger Parquet files with Spark. I'= ve been following the docs and the approach I'm taking seemed fairly ob= vious, and pleasingly simple, but I'm wondering if perhaps it's not= the most optimal approach.=C2=A0

I was wondering = if anyone on this list might have some advice to make to make this job as e= fficient as possible. Here's some code:

DataFrame dfInput =3D sqlContext.load(inputPa= ths.get(0), "com.databricks.spark.avro");
long totalSize =3D getDirSize(inputPaths.get(= 0));

=
for (int i =3D 1; i < inputs.si= ze(); ++i) {
=C2=A0 = =C2=A0 dfInput =3D dfInput.unionAll(sqlContext.load(inputPaths.get(i), &quo= t;com.databricks.spark.avro"));
=C2=A0 =C2=A0 totalSize +=3D getDirSize(inputPaths.get(i));<= /font>
}

int targetPartitions =3D (int) Math.max(2L, totalSize / TAR= GET_SIZE_BYTES);
DataF= rame outputFrame;

=
// Note: HADOOP-10456= impacts us, as we're stuck on 2.4.0 in EMR, hence
// the synchronize block below. Suggestion= s welcome here too! :-)
}<= /div>

outputFrame.save(outputPath, "parquet&quo= t;, SaveMode.Overwrite);

Here are some thin= gs bothering me:
  • Conversion to an RDD and back again so t= hat we can use coalesce() to reduce the number of partitions. This is becau= se we read that repartition() is not as efficient as coalesce(), and local = micro benchmarks seemed to somewhat confirm that this was faster. Is this r= eally a good idea though? Should we be doing something else?
Repartition uses coalesce but with a forced= shuffle step. Its just a shortcut for coalesce(xxx, true)
Do= ing a coalesce sounds correct, I'd do the same :) Note that if you add = the shuffle step, then your partitions should be better balanced.
<= div>
  • Usage of unionAll() - this is the only way I could find to join= the separate data sets into a single data frame to save as Parquet. Is the= re a better way?
When using d= irectly the inputformats you can do this FileInputFormat.addInputPath, it s= hould perform at least as good as union.
  • Do I need to= be using the DataFrame API at all? I'm not querying any data here, so = the nice API for SQL-like transformations of the data isn't being used.= The DataFrame API just seemed like the path of least resistance for workin= g with Avro and Parquet. Would there be any advantage to using hadoopRDD() = with the appropriate Input/Output formats?


Using directly the input/output= formats sounds viable. But the snippet you show seems clean enough and I am= not sure there would be much value in making something (maybe) slightly fa= ster but harder to understand.
=C2=A0

Eugen<= br>

Any advice o= r tips greatly appreciated!


James.





--e89a8f3ba2d5c5bc130517fc301b--