spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Loughran <>
Subject Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?
Date Mon, 01 Oct 2018 12:05:43 GMT

On 11 Aug 2018, at 17:33, chandan prakash <<>>

Hi All,
I was going through this pull request about new CheckpointFileManager abstraction in structured
streaming coming in 2.4 :

I went through the code in detail and found it will indtroduce a very nice abstraction which
is much cleaner and extensible for Direct Writes File System like S3 (in addition to current
HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting State Store
code which is currently  existing in Spark 2.3 ?

My questions related to above statements in State Store code :
 PR description:: "Checkpoint files must be written atomically such that no partial files
are generated.
QUESTION: When are partial files generated in current code ?  I can see that data is first
written to temp-delta file and then renamed to file. If something bad happens,
the task will fail due to thrown exception and abort() will be called on store to close and
delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files
might be generated ?

I suspect the issue is that as files are written to a "classic" Posix store, flush/sync operations
can result in the intermediate data being visible to others. Which is why the convention for
checkpointing/commit operations is : write to temp & rename. Which is not what you want
for object stores, especially S3

 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does
not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes into account
about checking existing file if exists and then taking appropriate action which together makes
the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect
Even if multiple executors try to write to the same file, only 1st of them will
succeed, the second one will see the file exists and will delete its temp-delta file. Looks
good .

HDFS single file and dir rename is atomic; it grabs a lock on the metadatastore, does the
change, unlocks it. If you are doing any FS op which explicitly renames more than one file
in your commit, you lose atomicity.  If there's a check + rename then yes, it's two step,
unless you can use create(path, overwrite=false) to create some lease file where you know
that the creation is exclusive & atomic for HDFS + Posix, generally not-at-all for the
stores, especially S3 which can actually cache the 404 in its load balancers for a few tens
of milliseconds

For object stores, you are in different world of pain

S3: nope; O(files+ data)  + observable + partial failures. List inconsistency + caching of
negative GET/HEAD to defend against DoS
wasb: no, except for bits of the tree where you enable leases, something which increases cost
of operations. O(files), with the odd pause if some shard movement has to take place
google GCS: not sure, but it is O(files)
Azure abfs. Not atomic yet As the code says:

    if (isAtomicRenameKey(source.getName())) {
      LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
              +" create and delete operations are atomic if Namespace is enabled for your
Azure Storage account.");

>From my reading of the SPARK-23966 PR, it's the object store problem which is being addressed
-both correctness and performance.

Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new pull request

Object stores as the back end. For S3 in particular, where that rename is O(data) and a direct
PUT to the destination gives you that atomic ness.

Someone needs to sit down and write that reference implementation.

Whoever  does want to do that,

- I believe it can all be done with the normal Hadoop FS APIs, simply knowing that for the
store that OutputStream.close() is (a) atomic, (b) potentially really slow as the remaining
data gets uploaded and (c) when it fails, can mean all your data just got lost.
- I've got the TLA+ spec for the S3 API which they can use as the foundation for their proofs
of correctness


View raw message