TLDR Extend FileOutPutCommitter to eliminate the temporary_storage. There are some implementations to be found online, typically called DirectOutputCommitter, f.i. this spark pull request. Tell Spark to use your shiny new committer when writing to an object store and all will be well.

Aaron is on the right track but this renaming is bottlenecked by the object storage system itself, irrespective of being executed in the driver or the executor. Object stores (s3, google, azure, amplidata :P...) do not have a native rename (it is implemented as a server side copy operation, thus its duration is proportional to the object size). By default, Hadoop (and thus also Spark) uses rename from temporary to final output path to enable both retrying and speculative execution  because HDFS is a single writer system, so multiple "job attempts" cannot write to the final output concurrently. Object stores do allow multiple concurrent-writes to the same output, which is exactly what makes a native rename nigh impossible. The solution is to enable this concurrent writing instead of renaming to final output by using a custom OutputCommitter which does not use a temporary location.

Thomas Demoor
skype: demoor.thomas
mobile: +32 497883833

On Wed, Jan 28, 2015 at 3:54 AM, Josh Walton <josh@openbookben.com> wrote:
I'm not sure how to confirm how the moving is happening, however, one of the jobs just completed that I was talking about with 9k files of 4mb each. Spark UI showed the job being complete after ~2 hours. The last four hours of the job was just moving the files from _temporary to their final destination. The tasks for the write were definitely shown as complete, no logging is happening on the master or workers. The last line of my java code logs, but the job sits there as the moving of files happens.

On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson <ilikerps@gmail.com> wrote:
This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns.

I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the driver, which is very slow for S3 (and possibly Google Storage as well), as it actually copies the data rather than doing a metadata-only operation during rename. However, this should not be an issue in this case.

Could you confirm how the moving is happening -- i.e., on the executors or the driver?

On Tue, Jan 27, 2015 at 4:31 PM, jwalton <josh@openbookben.com> wrote:
We are running spark in Google Compute Engine using their One-Click Deploy.
By doing so, we get their Google Cloud Storage connector for hadoop for free
meaning we can specify gs:// paths for input and output.

We have jobs that take a couple of hours, end up with ~9k partitions which
means 9k output files. After the job is "complete" it then moves the output
files from our $output_path/_temporary to $output_path. That process can
take longer than the job itself depending on the circumstances. The job I
mentioned previously outputs ~4mb files, and so far has copied 1/3 of the
files in 1.5 hours from _temporary to the final destination.

Is there a solution to this besides reducing the number of partitions?
Anyone else run into similar issues elsewhere? I don't remember this being
an issue with Map Reduce jobs and hadoop, however, I probably wasn't
tracking the transfer of the output files like I am with Spark.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org