[Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

chandan prakash
Hi All,
I was going through this pull request about new CheckpointFileManager abstraction in structured streaming coming in 2.4 :

I went through the code in detail and found it will indtroduce a very nice abstraction which is much cleaner and extensible for Direct Writes File System like S3 (in addition to current HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting State Store code which is currently  existing in Spark 2.3 ? 

My questions related to above statements in State Store code : 
 PR description:: "Checkpoint files must be written atomically such that no partial files are generated.
QUESTION: When are partial files generated in current code ?  I can see that data is first written to temp-delta file and then renamed to version.delta file. If something bad happens, the task will fail due to thrown exception and abort() will be called on store to close and delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files might be generated ?

 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes into account about checking existing file if exists and then taking appropriate action which together makes the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file, only 1st of them will succeed, the second one will see the file exists and will delete its temp-delta file. Looks good .

Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new pull request ?

Regards,
Chandan




Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

chandan prakash
Anyone who can clear doubts on the questions asked here   ?

Regards,
Chandan

On Sat, Aug 11, 2018 at 10:03 PM chandan prakash <[hidden email]> wrote:
Hi All,
I was going through this pull request about new CheckpointFileManager abstraction in structured streaming coming in 2.4 :

I went through the code in detail and found it will indtroduce a very nice abstraction which is much cleaner and extensible for Direct Writes File System like S3 (in addition to current HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting State Store code which is currently  existing in Spark 2.3 ? 

My questions related to above statements in State Store code : 
 PR description:: "Checkpoint files must be written atomically such that no partial files are generated.
QUESTION: When are partial files generated in current code ?  I can see that data is first written to temp-delta file and then renamed to version.delta file. If something bad happens, the task will fail due to thrown exception and abort() will be called on store to close and delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files might be generated ?

 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes into account about checking existing file if exists and then taking appropriate action which together makes the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file, only 1st of them will succeed, the second one will see the file exists and will delete its temp-delta file. Looks good .

Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new pull request ?

Regards,
Chandan






--
Chandan Prakash