spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabh...@gmail.com>
Subject Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?
Date Sun, 30 Sep 2018 08:19:12 GMT
Removing user@ since cross-posting multiple mailing lists are considered as
not-good practice.

My knowledge is based on the codebase after SPARK-23966, so I'm reading
SPARK-23966 back and try to explain what I can see in the patch. Anyone
please correct me if I'm missing here.

You may want to note that abort() doesn't remove final delta file for 2.3:
assuming rename is not atomic operation, if the task is committing the file
and if it fails when in the middle of renaming, partial file of delta could
be left.

And commitUpdates() skips writing temporary file to delta, hence when
partial file is left, both speculative task and task in retrying batch
could skip writing and mark as successful, result in partial delta being
considered for correct delta file.

Does it make sense?

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 30일 (일) 오후 4:51, chandan prakash <chandanbaranwal@gmail.com>님이
작성:

> Anyone who can clear doubts on the questions asked here   ?
>
> Regards,
> Chandan
>
>
> On Sat, Aug 11, 2018 at 10:03 PM chandan prakash <
> chandanbaranwal@gmail.com> wrote:
>
>> Hi All,
>> I was going through this pull request about new CheckpointFileManager
>> abstraction in structured streaming coming in 2.4 :
>> https://issues.apache.org/jira/browse/SPARK-23966
>> https://github.com/apache/spark/pull/21048
>>
>> I went through the code in detail and found it will indtroduce a very
>> nice abstraction which is much cleaner and extensible for Direct Writes
>> File System like S3 (in addition to current HDFS file system).
>>
>> *But I am unable to understand, is it really solving some problem in
>> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>>
>> *My questions related to above statements in State Store code : *
>>  *PR description*:: "Checkpoint files must be written atomically such
>> that *no partial files are generated*.
>> *QUESTION*: When are partial files generated in current code ?  I can
>> see that data is first written to temp-delta file and then renamed to
>> version.delta file. If something bad happens, the task will fail due to
>> thrown exception and abort() will be called on store to close and delete
>> tempDeltaFileStream . I think it is quite clean, what is the case that
>> partial files might be generated ?
>>
>>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
>> implementation does not have atomic rename*"
>> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
>> line takes into account about checking existing file if exists and then
>> taking appropriate action which together makes the file renaming operation
>> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
>> Even if multiple executors try to write to the same version.delta file,
>> only 1st of them will succeed, the second one will see the file exists and
>> will delete its temp-delta file. Looks good .
>>
>> Anything I am missing here?
>> Really curious to know which corner cases we are trying to solve by this
>> new pull request ?
>>
>> Regards,
>> Chandan
>>
>>
>>
>>
>>
>
> --
> Chandan Prakash
>
>

Mime
View raw message