Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Jungtaek Lim
Removing user@ since cross-posting multiple mailing lists are considered as not-good practice.

My knowledge is based on the codebase after SPARK-23966, so I'm reading SPARK-23966 back and try to explain what I can see in the patch. Anyone please correct me if I'm missing here.

You may want to note that abort() doesn't remove final delta file for 2.3:  assuming rename is not atomic operation, if the task is committing the file and if it fails when in the middle of renaming, partial file of delta could be left.

And commitUpdates() skips writing temporary file to delta, hence when partial file is left, both speculative task and task in retrying batch could skip writing and mark as successful, result in partial delta being considered for correct delta file.

Does it make sense?

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 30일 (일) 오후 4:51, chandan prakash <[hidden email]>님이 작성:
Anyone who can clear doubts on the questions asked here   ?

Regards,
Chandan


On Sat, Aug 11, 2018 at 10:03 PM chandan prakash <[hidden email]> wrote:
Hi All,
I was going through this pull request about new CheckpointFileManager abstraction in structured streaming coming in 2.4 :

I went through the code in detail and found it will indtroduce a very nice abstraction which is much cleaner and extensible for Direct Writes File System like S3 (in addition to current HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting State Store code which is currently  existing in Spark 2.3 ? 

My questions related to above statements in State Store code : 
 PR description:: "Checkpoint files must be written atomically such that no partial files are generated.
QUESTION: When are partial files generated in current code ?  I can see that data is first written to temp-delta file and then renamed to version.delta file. If something bad happens, the task will fail due to thrown exception and abort() will be called on store to close and delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files might be generated ?

 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes into account about checking existing file if exists and then taking appropriate action which together makes the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file, only 1st of them will succeed, the second one will see the file exists and will delete its temp-delta file. Looks good .

Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new pull request ?

Regards,
Chandan






--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Steve Loughran


On 11 Aug 2018, at 17:33, chandan prakash <[hidden email]> wrote:

Hi All,
I was going through this pull request about new CheckpointFileManager abstraction in structured streaming coming in 2.4 :

I went through the code in detail and found it will indtroduce a very nice abstraction which is much cleaner and extensible for Direct Writes File System like S3 (in addition to current HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting State Store code which is currently  existing in Spark 2.3 ? 

My questions related to above statements in State Store code : 
 PR description:: "Checkpoint files must be written atomically such that no partial files are generated.
QUESTION: When are partial files generated in current code ?  I can see that data is first written to temp-delta file and then renamed to version.delta file. If something bad happens, the task will fail due to thrown exception and abort() will be called on store to close and delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files might be generated ?

I suspect the issue is that as files are written to a "classic" Posix store, flush/sync operations can result in the intermediate data being visible to others. Which is why the convention for checkpointing/commit operations is : write to temp & rename. Which is not what you want for object stores, especially S3



 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes into account about checking existing file if exists and then taking appropriate action which together makes the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file, only 1st of them will succeed, the second one will see the file exists and will delete its temp-delta file. Looks good .


HDFS single file and dir rename is atomic; it grabs a lock on the metadatastore, does the change, unlocks it. If you are doing any FS op which explicitly renames more than one file in your commit, you lose atomicity.  If there's a check + rename then yes, it's two step, unless you can use create(path, overwrite=false) to create some lease file where you know that the creation is exclusive & atomic for HDFS + Posix, generally not-at-all for the stores, especially S3 which can actually cache the 404 in its load balancers for a few tens of milliseconds 

For object stores, you are in different world of pain

S3: nope; O(files+ data)  + observable + partial failures. List inconsistency + caching of negative GET/HEAD to defend against DoS
wasb: no, except for bits of the tree where you enable leases, something which increases cost of operations. O(files), with the odd pause if some shard movement has to take place
google GCS: not sure, but it is O(files)
Azure abfs. Not atomic yet As the code says:

    if (isAtomicRenameKey(source.getName())) {
      LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
              +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
    }

From my reading of the SPARK-23966 PR, it's the object store problem which is being addressed -both correctness and performance.


Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new pull request ?


Object stores as the back end. For S3 in particular, where that rename is O(data) and a direct PUT to the destination gives you that atomic ness. 


Someone needs to sit down and write that reference implementation. 

Whoever  does want to do that,

- I believe it can all be done with the normal Hadoop FS APIs, simply knowing that for the store that OutputStream.close() is (a) atomic, (b) potentially really slow as the remaining data gets uploaded and (c) when it fails, can mean all your data just got lost.
- I've got the TLA+ spec for the S3 API which they can use as the foundation for their proofs of correctness https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf 


-Steve
Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

chandan prakash
Thanks a lot Steve and Jungtaek for your answers.
Steve,
You explained really well in depth. 

 I understood that the existing old implementation was not correct for object store like S3. The new implementation will address that. And for better performance we should better choose a Direct Write based checkpoint rather than Rename based (which we can implement using the new CheckpointFilemanager abstraction)
My confusion was because of this line in PR: 
This is incorrect as rename is not atomic in HDFS FileSystem implementation
I thought the above line meant that existing old implementation is not correct for HDFS file system as well .
So wanted to understand if there is something I am missing . The new implementation is for addressing issue of Object Store like S3 and nor HDFS.
Thanks again for your explanation, I am sure it will help a lot of other code readers as well .

Regards,
Chandan



On Mon, Oct 1, 2018 at 5:37 PM Steve Loughran <[hidden email]> wrote:


On 11 Aug 2018, at 17:33, chandan prakash <[hidden email]> wrote:

Hi All,
I was going through this pull request about new CheckpointFileManager abstraction in structured streaming coming in 2.4 :

I went through the code in detail and found it will indtroduce a very nice abstraction which is much cleaner and extensible for Direct Writes File System like S3 (in addition to current HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting State Store code which is currently  existing in Spark 2.3 ? 

My questions related to above statements in State Store code : 
 PR description:: "Checkpoint files must be written atomically such that no partial files are generated.
QUESTION: When are partial files generated in current code ?  I can see that data is first written to temp-delta file and then renamed to version.delta file. If something bad happens, the task will fail due to thrown exception and abort() will be called on store to close and delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files might be generated ?

I suspect the issue is that as files are written to a "classic" Posix store, flush/sync operations can result in the intermediate data being visible to others. Which is why the convention for checkpointing/commit operations is : write to temp & rename. Which is not what you want for object stores, especially S3



 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes into account about checking existing file if exists and then taking appropriate action which together makes the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file, only 1st of them will succeed, the second one will see the file exists and will delete its temp-delta file. Looks good .


HDFS single file and dir rename is atomic; it grabs a lock on the metadatastore, does the change, unlocks it. If you are doing any FS op which explicitly renames more than one file in your commit, you lose atomicity.  If there's a check + rename then yes, it's two step, unless you can use create(path, overwrite=false) to create some lease file where you know that the creation is exclusive & atomic for HDFS + Posix, generally not-at-all for the stores, especially S3 which can actually cache the 404 in its load balancers for a few tens of milliseconds 

For object stores, you are in different world of pain

S3: nope; O(files+ data)  + observable + partial failures. List inconsistency + caching of negative GET/HEAD to defend against DoS
wasb: no, except for bits of the tree where you enable leases, something which increases cost of operations. O(files), with the odd pause if some shard movement has to take place
google GCS: not sure, but it is O(files)
Azure abfs. Not atomic yet As the code says:

    if (isAtomicRenameKey(source.getName())) {
      LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
              +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
    }

From my reading of the SPARK-23966 PR, it's the object store problem which is being addressed -both correctness and performance.


Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new pull request ?


Object stores as the back end. For S3 in particular, where that rename is O(data) and a direct PUT to the destination gives you that atomic ness. 


Someone needs to sit down and write that reference implementation. 

Whoever  does want to do that,

- I believe it can all be done with the normal Hadoop FS APIs, simply knowing that for the store that OutputStream.close() is (a) atomic, (b) potentially really slow as the remaining data gets uploaded and (c) when it fails, can mean all your data just got lost.
- I've got the TLA+ spec for the S3 API which they can use as the foundation for their proofs of correctness https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf 


-Steve


--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

Jungtaek Lim
Thanks Steve to answer in detail. I was under same feeling with Chandan from the line as well: it was against my knowledge as rename operation itself in HDFS is atomic, and I didn't imagine it was for tackling object store. 

I learned a lot for object store from your answer. Thanks again.

Jungtaek Lim (HeartSaVioR)

2018년 10월 3일 (수) 오전 2:48, chandan prakash <[hidden email]>님이 작성:
Thanks a lot Steve and Jungtaek for your answers.
Steve,
You explained really well in depth. 

 I understood that the existing old implementation was not correct for object store like S3. The new implementation will address that. And for better performance we should better choose a Direct Write based checkpoint rather than Rename based (which we can implement using the new CheckpointFilemanager abstraction)
My confusion was because of this line in PR: 
This is incorrect as rename is not atomic in HDFS FileSystem implementation
I thought the above line meant that existing old implementation is not correct for HDFS file system as well .
So wanted to understand if there is something I am missing . The new implementation is for addressing issue of Object Store like S3 and nor HDFS.
Thanks again for your explanation, I am sure it will help a lot of other code readers as well .

Regards,
Chandan



On Mon, Oct 1, 2018 at 5:37 PM Steve Loughran <[hidden email]> wrote:


On 11 Aug 2018, at 17:33, chandan prakash <[hidden email]> wrote:

Hi All,
I was going through this pull request about new CheckpointFileManager abstraction in structured streaming coming in 2.4 :

I went through the code in detail and found it will indtroduce a very nice abstraction which is much cleaner and extensible for Direct Writes File System like S3 (in addition to current HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting State Store code which is currently  existing in Spark 2.3 ? 

My questions related to above statements in State Store code : 
 PR description:: "Checkpoint files must be written atomically such that no partial files are generated.
QUESTION: When are partial files generated in current code ?  I can see that data is first written to temp-delta file and then renamed to version.delta file. If something bad happens, the task will fail due to thrown exception and abort() will be called on store to close and delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files might be generated ?

I suspect the issue is that as files are written to a "classic" Posix store, flush/sync operations can result in the intermediate data being visible to others. Which is why the convention for checkpointing/commit operations is : write to temp & rename. Which is not what you want for object stores, especially S3



 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes into account about checking existing file if exists and then taking appropriate action which together makes the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file, only 1st of them will succeed, the second one will see the file exists and will delete its temp-delta file. Looks good .


HDFS single file and dir rename is atomic; it grabs a lock on the metadatastore, does the change, unlocks it. If you are doing any FS op which explicitly renames more than one file in your commit, you lose atomicity.  If there's a check + rename then yes, it's two step, unless you can use create(path, overwrite=false) to create some lease file where you know that the creation is exclusive & atomic for HDFS + Posix, generally not-at-all for the stores, especially S3 which can actually cache the 404 in its load balancers for a few tens of milliseconds 

For object stores, you are in different world of pain

S3: nope; O(files+ data)  + observable + partial failures. List inconsistency + caching of negative GET/HEAD to defend against DoS
wasb: no, except for bits of the tree where you enable leases, something which increases cost of operations. O(files), with the odd pause if some shard movement has to take place
google GCS: not sure, but it is O(files)
Azure abfs. Not atomic yet As the code says:

    if (isAtomicRenameKey(source.getName())) {
      LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
              +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
    }

From my reading of the SPARK-23966 PR, it's the object store problem which is being addressed -both correctness and performance.


Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new pull request ?


Object stores as the back end. For S3 in particular, where that rename is O(data) and a direct PUT to the destination gives you that atomic ness. 


Someone needs to sit down and write that reference implementation. 

Whoever  does want to do that,

- I believe it can all be done with the normal Hadoop FS APIs, simply knowing that for the store that OutputStream.close() is (a) atomic, (b) potentially really slow as the remaining data gets uploaded and (c) when it fails, can mean all your data just got lost.
- I've got the TLA+ spec for the S3 API which they can use as the foundation for their proofs of correctness https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf 


-Steve


--
Chandan Prakash