spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Loughran <>
Subject Re: Output Committers for S3
Date Tue, 21 Feb 2017 14:15:48 GMT

On 21 Feb 2017, at 01:00, Ryan Blue <<>>

We just wrote a couple new committers for S3 that we're beginning to roll out to our Spark
users. I've uploaded a repo with it if you'd like to take a look:

The main problem with the UUID approach is that data is live as soon as the S3 upload completes.
That means that readers can get partial results while a job is running that may not be eventually
committed (since you will remove the UUID later). You may also have a problem with partitioned
task outputs.

You'd have to encode the task ID in the output file name to identify files to roll back in
the event you need to revert a task, but if you have partitioned output, you have to do a
lot of directory listing to find all the files that need to be removed. That, or you could
risk duplicate data by not rolling back tasks.

Bear in mind that recursive directory listing isn't so expensive once you have the O(1)-ish
listFiles(files, recursive) operation of HADOOP-13208.

The approach we took is to use the multi-part upload API to stage data from tasks without
issuing the final call to complete the upload and make the data live in S3. That way, we get
distributed uploads without any visible data until the job committer runs. The job committer
reads all of the pending uploads and commits them. If the job has failed, then it can roll
back the known uploads by aborting them instead, with the data never visible to readers.

Yes, that's what I've been doing too. I'm choreographing the task and committer via data serialized
to S3 itself. On a task failure that will allow us to roll back all completely written files,
without the need for any task-job communications. I'm still thinking about having an optional+async
scan for pending commits to the dest path, to identify problems and keep bills down.

The flaw in this approach is that you can still get partial writes if the driver fails while
running the job committer, but it covers the other cases.

There's a bit of that in both the FileOutputFormat and indeed, in HadoopMapReduceCommitProtocol.
It's just a small window, especially if you do those final PUTs in parallel

We're working on getting users moved over to the new committers, so now seems like a good
time to get a copy out to the community. Please let me know what you think.

I'll have a look at your code, see how it compares to mine. I'm able to take advantage of
the fact that we can tune the S3A FS, for example, by modifying the block output stream to
*not* commit its work in the final close()

This means that provided the output writer doesn't attempt to read the file it's just written,
we can do a write straight to the final destination

What your patch has made me realise is that I could also do a delayed-commit copy by reading
in a file, doing a multipart put to its final destination, and again, postponing the final
commit. this is something which tasks could do in their commit rather than a normal COPY+DELETE
 rename, passing the final pending commit information to the job committer. This'd make the
rename() slower as it will read and write the data again, rather than the 6-10 MB/s of in-S3
copies, but as these happen in-task-commit, rather than in-job-commit, they slow down the
overall job less. That could be used for the absolute path commit phase.



On Mon, Feb 20, 2017 at 10:14 AM, Matthew Schauer <<>>
I'm using Spark 1.5.2 and trying to append a data frame to partitioned
Parquet directory in S3.  It is known that the default
`ParquetOutputCommitter` performs poorly in S3 because move is implemented
as copy/delete, but the `DirectParquetOutputCommitter` is not safe to use
for append operations in case of failure.  I'm not very familiar with the
intricacies of job/task committing/aborting, but I've written a rough
replacement output committer that seems to work.  It writes the results
directly to their final locations and uses the write UUID to determine which
files to remove in the case of a job/task abort.  It seems to be a workable
concept in the simple tests that I've tried.  However, I can't make Spark
use this alternate output committer because the changes in SPARK-8578
categorically prohibit any custom output committer from being used, even if
it's safe for appending.  I have two questions: 1) Does anyone more familiar
with output committing have any feedback on my proposed "safe" append
strategy, and 2) is there any way to circumvent the restriction on append
committers without editing and recompiling Spark?  Discussion of solutions
in Spark 2.1 is also welcome.

View this message in context:
Sent from the Apache Spark Developers List mailing list archive at<>.

To unsubscribe e-mail:<>

Ryan Blue
Software Engineer

View raw message