spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Loughran <ste...@hortonworks.com>
Subject Re: saveAsNewAPIHadoopDataset must not enable speculation for parquet file?
Date Thu, 26 Apr 2018 11:23:56 GMT

sorry, not noticed this followup. Been busy with other issues

On 3 Apr 2018, at 11:19, cane <zhoukang199191@gmail.com<mailto:zhoukang199191@gmail.com>>
wrote:

Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
data loss.
I check the comment of thi api:

 We should make sure our tasks are idempotent when speculation is enabled,
i.e. do
  * not use output committer that writes data directly.
  * There is an example in
https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
  * result of using direct output committer with speculation enabled.
  */

But if this the rule we must follow?
For example,for parquet it will got ParquetOutPutCommitter.
In this case, speculation must disable for parquet?

ParquetOutputCommitter is a subclass of Hadoop's FileOutputCommitter, so you get the choice
of its two algorithms, as set by spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version


algorithm 1 :
- tasks write to _temporary/$jobId/_temporary/$taskId directory,
- task commit to _temporary/$jobId$taskId in what for a real FS is an O(1) atomic operation.
; speculation and retry straightforward.
 -job commit: copy the contents of all the task ID directories to the destination, create
_SUCCESS file
 job commit is non-atomic, If a job fails during commit you need to delete the dest dir and
try again.

alogirthm2: :
- tasks write to _temporary/$jobId/_temporary/$taskId directory,
 -task commit: merge to dest directory, potentially while other tasks are doing a merge at
the same time.
 -Job commit does nothing but create the _SUCCESS file, and can be repeated.

you can speculate with either, but if a task using algorithm 2 fails during task commit then
there's a problem, as the store is in an unknown state. Neither MapReduce nor Spark worry
about this. Usually its fast so the window of failure pretty small, when you are working with
object stores that doesn't hold. Really they should react to that failure by aborting the
job, but as object stores tend to have their own issues, this is more of a detail than the
underlying flaw.

As I said, you can read

https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf

and a precursor attempt to document what goes in the depths of FileOutpuitCommitter (which
has an error in one of the code samples; I forget which. The paper fixes that)

http://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/committer_architecture.html

+ an IBM paper on their Swift committer for spark:  Stocator: A High Performance Object Store
Connector for Spark: https://arxiv.org/pdf/1709.01812


I have some issues with that paper, but its worthwhile looking at to see their focus on rollback
over temp directories
http://steveloughran.blogspot.co.uk/2017/09/stocator-high-performance-object-store.html



Is there some one know the history?

If you check out hadoop, you can get the history after the svn -> git migration, though
the earlier history is lost in folklore, primarily stories of "what went wrong" at Yahoo!.

https://github.com/apache/hadoop/commits/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
https://github.com/apache/hadoop/commits/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java

For spark, look at
https://issues.apache.org/jira/browse/SPARK-4879

and the git logs of

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Once you start looking at the commit protocols, you end up in a fascinating world where things
like proof of correctness start to matter. Sadly, everyone is constrained not just but our
lack of everyday use of the language and tools, but by the lack of a foundation of specs of
the underlying storage systems. There is one for a model of a consistent s3 store, https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf
, but I couldn't work out how to define an eventually consistent one in TLA+.. Contributions
welcome.

-Steve

(*) the Hadoop Filesystem spec is actually Z disguised as Python, but it doesn't integrate
with any toolchain you can use for correctness proofs. But it is read, understood and maintained
by developers, which I consider a success. It's just, we could do more.

Mime
View raw message