spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chandan prakash <>
Subject Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?
Date Tue, 02 Oct 2018 17:48:24 GMT
Thanks a lot Steve and Jungtaek for your answers.
You explained really well in depth.

 I understood that the existing old implementation was not correct for
object store like S3. The new implementation will address that. And for
better performance we should better choose a Direct Write based checkpoint
rather than Rename based (which we can implement using the new
CheckpointFilemanager abstraction)
My confusion was because of this line in PR:
*This is incorrect as rename is not atomic in HDFS FileSystem
I thought the above line meant that existing old implementation is not
correct for HDFS file system as well .
So wanted to understand if there is something I am missing . The new
implementation is for addressing issue of Object Store like S3 and nor HDFS.
Thanks again for your explanation, I am sure it will help a lot of other
code readers as well .


On Mon, Oct 1, 2018 at 5:37 PM Steve Loughran <>

> On 11 Aug 2018, at 17:33, chandan prakash <>
> wrote:
> Hi All,
> I was going through this pull request about new CheckpointFileManager
> abstraction in structured streaming coming in 2.4 :
> I went through the code in detail and found it will indtroduce a very nice
> abstraction which is much cleaner and extensible for Direct Writes File
> System like S3 (in addition to current HDFS file system).
> *But I am unable to understand, is it really solving some problem in
> exsisting State Store code which is currently  existing in Spark 2.3 ? *
> *My questions related to above statements in State Store code : *
>  *PR description*:: "Checkpoint files must be written atomically such
> that *no partial files are generated*.
> *QUESTION*: When are partial files generated in current code ?  I can see
> that data is first written to temp-delta file and then renamed to
> file. If something bad happens, the task will fail due to
> thrown exception and abort() will be called on store to close and delete
> tempDeltaFileStream . I think it is quite clean, what is the case that
> partial files might be generated ?
> I suspect the issue is that as files are written to a "classic" Posix
> store, flush/sync operations can result in the intermediate data being
> visible to others. Which is why the convention for checkpointing/commit
> operations is : write to temp & rename. Which is not what you want for
> object stores, especially S3
>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
> implementation does not have atomic rename*"
> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
> line takes into account about checking existing file if exists and then
> taking appropriate action which together makes the file renaming operation
> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
> Even if multiple executors try to write to the same file,
> only 1st of them will succeed, the second one will see the file exists and
> will delete its temp-delta file. Looks good .
> HDFS single file and dir rename is atomic; it grabs a lock on the
> metadatastore, does the change, unlocks it. If you are doing any FS op
> which explicitly renames more than one file in your commit, you lose
> atomicity.  If there's a check + rename then yes, it's two step, unless you
> can use create(path, overwrite=false) to create some lease file where you
> know that the creation is exclusive & atomic for HDFS + Posix, generally
> not-at-all for the stores, especially S3 which can actually cache the 404
> in its load balancers for a few tens of milliseconds
> For object stores, you are in different world of pain
> S3: nope; O(files+ data)  + observable + partial failures. List
> inconsistency + caching of negative GET/HEAD to defend against DoS
> wasb: no, except for bits of the tree where you enable leases, something
> which increases cost of operations. O(files), with the odd pause if some
> shard movement has to take place
> google GCS: not sure, but it is O(files)
> Azure abfs. Not atomic yet As the code says:
>     if (isAtomicRenameKey(source.getName())) {
>       LOG.warn("The atomic rename feature is not supported by the ABFS
> scheme; however rename,"
>               +" create and delete operations are atomic if Namespace is
> enabled for your Azure Storage account.");
>     }
> From my reading of the SPARK-23966 PR, it's the object store problem which
> is being addressed -both correctness and performance.
> Anything I am missing here?
> Really curious to know which corner cases we are trying to solve by this
> new pull request ?
> Object stores as the back end. For S3 in particular, where that rename is
> O(data) and a direct PUT to the destination gives you that atomic ness.
> Someone needs to sit down and write that reference implementation.
> Whoever  does want to do that,
> - I believe it can all be done with the normal Hadoop FS APIs, simply
> knowing that for the store that OutputStream.close() is (a) atomic, (b)
> potentially really slow as the remaining data gets uploaded and (c) when it
> fails, can mean all your data just got lost.
> - I've got the TLA+ spec for the S3 API which they can use as the
> foundation for their proofs of correctness
> -Steve

Chandan Prakash

View raw message