Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Waleed Fateem
Hello!

I noticed that in the documentation starting with 2.2.0 it states that the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version is 1 by default:

I don't actually see this being set anywhere explicitly in the Spark code and so the documentation isn't entirely accurate in case you run on an environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0). 

The default version was explicitly set to 2 in the FileOutputCommitter class, so any output committer that inherits from this class (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0 environment and v1 in the older Hadoop environments. 

Would it make sense for us to consider setting v1 as the default in code in case the configuration was not set by a user?

Regards,

Waleed
Reply | Threaded
Open this post in threaded view
|

Re: Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Sean Owen-2
I think is a Hadoop property that is just passed through? if the
default is different in Hadoop 3 we could mention that in the docs. i
don't know if we want to always set it to 1 as a Spark default, even
in Hadoop 3 right?

On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <[hidden email]> wrote:

>
> Hello!
>
> I noticed that in the documentation starting with 2.2.0 it states that the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version is 1 by default:
> https://issues.apache.org/jira/browse/SPARK-20107
>
> I don't actually see this being set anywhere explicitly in the Spark code and so the documentation isn't entirely accurate in case you run on an environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
>
> The default version was explicitly set to 2 in the FileOutputCommitter class, so any output committer that inherits from this class (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0 environment and v1 in the older Hadoop environments.
>
> Would it make sense for us to consider setting v1 as the default in code in case the configuration was not set by a user?
>
> Regards,
>
> Waleed

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Waleed Fateem
I was trying to make my email short and concise, but the rationale behind setting that as 1 by default is because it's safer. With algorithm version 2 you run the risk of having bad data in cases where tasks fail or even duplicate data if a task fails and succeeds on a reattempt (I don't know if this is true for all OutputCommitters that extend the FileOutputCommitter or not).

Imran and Marcelo also discussed this here:
https://issues.apache.org/jira/browse/SPARK-20107?focusedCommentId=15945177&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15945177

I also did discuss this a bit with Steve Loughran and his opinion was that v2 should just be deprecated all together. I believe he was going to bring that up with the Hadoop developers. 


On Thu, Jun 25, 2020 at 3:56 PM Sean Owen <[hidden email]> wrote:
I think is a Hadoop property that is just passed through? if the
default is different in Hadoop 3 we could mention that in the docs. i
don't know if we want to always set it to 1 as a Spark default, even
in Hadoop 3 right?

On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <[hidden email]> wrote:
>
> Hello!
>
> I noticed that in the documentation starting with 2.2.0 it states that the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version is 1 by default:
> https://issues.apache.org/jira/browse/SPARK-20107
>
> I don't actually see this being set anywhere explicitly in the Spark code and so the documentation isn't entirely accurate in case you run on an environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
>
> The default version was explicitly set to 2 in the FileOutputCommitter class, so any output committer that inherits from this class (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0 environment and v1 in the older Hadoop environments.
>
> Would it make sense for us to consider setting v1 as the default in code in case the configuration was not set by a user?
>
> Regards,
>
> Waleed
Reply | Threaded
Open this post in threaded view
|

Re: Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Steve Loughran-2
v2 does a file-by-file copy to the dest dir in task commit; v1 promotes task attempts to job attempt dir by dir rename, job commit lists those and moves the contents

if the worker fails during task commit -the next task attempt has to replace every file -so it had better use the same filenames.

The really scary issue is a network partition: if the first worker went off-line long enough for a second attempt to commit (If speculation has enabled that may not be very long at all as could already be waiting) then if the second worker goes online again it may continue with its commit and partially overwrite some but not all of the output.

That task commit is not atomic even though spark requires this. It is worse on Amazon S3 because rename is O(data). The window for failure is a lot longer.

The S3A committers don't commit their work until job commit; while that is non-atomic (nor is MR v1 BTW) it's time is |files|/(min(|threads|, max-http-pool-size))

The EMR spark committer does actually commit its work in task commit, so is also vulnerable. I wish they copied more of our ASF-licensed code :). Or some of IBM's stocator work.


Presumably their algorithm is

pre-task-reporting ready-to-commit: upload files from the localfd task attempt staging dir to dest dir, without completing the upload. You could actually do this with a scanning thread uploading as you go along.
task commit: POST all the uploads
job commit: touch _SUCCESS

The scales better (no need to load & commit uploads in job commit) and does not require any consistent cluster FS. And is faster.

But again: the failure semantic of task commit isn't what spark expects.

Bonus fun: google GCS dir commit is file-by-file so non atomic; v1 task commit does expect an atomic dir rename. So you may as well use v2.

They could add a committer which didn't do that rename, just write a manifest file to the job attempt dir pointing to the successful task attempt; commit that with their atomic file rename. The committer plugin point in MR lets you declare a committer factory for each FS, so it could be done without any further changes to spark. 

On Thu, 25 Jun 2020 at 22:38, Waleed Fateem <[hidden email]> wrote:
I was trying to make my email short and concise, but the rationale behind setting that as 1 by default is because it's safer. With algorithm version 2 you run the risk of having bad data in cases where tasks fail or even duplicate data if a task fails and succeeds on a reattempt (I don't know if this is true for all OutputCommitters that extend the FileOutputCommitter or not).

Imran and Marcelo also discussed this here:
https://issues.apache.org/jira/browse/SPARK-20107?focusedCommentId=15945177&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15945177

I also did discuss this a bit with Steve Loughran and his opinion was that v2 should just be deprecated all together. I believe he was going to bring that up with the Hadoop developers. 


On Thu, Jun 25, 2020 at 3:56 PM Sean Owen <[hidden email]> wrote:
I think is a Hadoop property that is just passed through? if the
default is different in Hadoop 3 we could mention that in the docs. i
don't know if we want to always set it to 1 as a Spark default, even
in Hadoop 3 right?

On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <[hidden email]> wrote:
>
> Hello!
>
> I noticed that in the documentation starting with 2.2.0 it states that the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version is 1 by default:
> https://issues.apache.org/jira/browse/SPARK-20107
>
> I don't actually see this being set anywhere explicitly in the Spark code and so the documentation isn't entirely accurate in case you run on an environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
>
> The default version was explicitly set to 2 in the FileOutputCommitter class, so any output committer that inherits from this class (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0 environment and v1 in the older Hadoop environments.
>
> Would it make sense for us to consider setting v1 as the default in code in case the configuration was not set by a user?
>
> Regards,
>
> Waleed
Reply | Threaded
Open this post in threaded view
|

Re: Setting spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 and Doc issue

Steve Loughran-2
https://issues.apache.org/jira/browse/MAPREDUCE-7282

"MR v2 commit algorithm is dangerous, should be deprecated and not the default"

someone do a PR to change the default & if it doesn't break too much I'l merge it



On Mon, 29 Jun 2020 at 13:20, Steve Loughran <[hidden email]> wrote:
v2 does a file-by-file copy to the dest dir in task commit; v1 promotes task attempts to job attempt dir by dir rename, job commit lists those and moves the contents

if the worker fails during task commit -the next task attempt has to replace every file -so it had better use the same filenames.

The really scary issue is a network partition: if the first worker went off-line long enough for a second attempt to commit (If speculation has enabled that may not be very long at all as could already be waiting) then if the second worker goes online again it may continue with its commit and partially overwrite some but not all of the output.

That task commit is not atomic even though spark requires this. It is worse on Amazon S3 because rename is O(data). The window for failure is a lot longer.

The S3A committers don't commit their work until job commit; while that is non-atomic (nor is MR v1 BTW) it's time is |files|/(min(|threads|, max-http-pool-size))

The EMR spark committer does actually commit its work in task commit, so is also vulnerable. I wish they copied more of our ASF-licensed code :). Or some of IBM's stocator work.


Presumably their algorithm is

pre-task-reporting ready-to-commit: upload files from the localfd task attempt staging dir to dest dir, without completing the upload. You could actually do this with a scanning thread uploading as you go along.
task commit: POST all the uploads
job commit: touch _SUCCESS

The scales better (no need to load & commit uploads in job commit) and does not require any consistent cluster FS. And is faster.

But again: the failure semantic of task commit isn't what spark expects.

Bonus fun: google GCS dir commit is file-by-file so non atomic; v1 task commit does expect an atomic dir rename. So you may as well use v2.

They could add a committer which didn't do that rename, just write a manifest file to the job attempt dir pointing to the successful task attempt; commit that with their atomic file rename. The committer plugin point in MR lets you declare a committer factory for each FS, so it could be done without any further changes to spark. 

On Thu, 25 Jun 2020 at 22:38, Waleed Fateem <[hidden email]> wrote:
I was trying to make my email short and concise, but the rationale behind setting that as 1 by default is because it's safer. With algorithm version 2 you run the risk of having bad data in cases where tasks fail or even duplicate data if a task fails and succeeds on a reattempt (I don't know if this is true for all OutputCommitters that extend the FileOutputCommitter or not).

Imran and Marcelo also discussed this here:
https://issues.apache.org/jira/browse/SPARK-20107?focusedCommentId=15945177&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15945177

I also did discuss this a bit with Steve Loughran and his opinion was that v2 should just be deprecated all together. I believe he was going to bring that up with the Hadoop developers. 


On Thu, Jun 25, 2020 at 3:56 PM Sean Owen <[hidden email]> wrote:
I think is a Hadoop property that is just passed through? if the
default is different in Hadoop 3 we could mention that in the docs. i
don't know if we want to always set it to 1 as a Spark default, even
in Hadoop 3 right?

On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <[hidden email]> wrote:
>
> Hello!
>
> I noticed that in the documentation starting with 2.2.0 it states that the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version is 1 by default:
> https://issues.apache.org/jira/browse/SPARK-20107
>
> I don't actually see this being set anywhere explicitly in the Spark code and so the documentation isn't entirely accurate in case you run on an environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
>
> The default version was explicitly set to 2 in the FileOutputCommitter class, so any output committer that inherits from this class (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0 environment and v1 in the older Hadoop environments.
>
> Would it make sense for us to consider setting v1 as the default in code in case the configuration was not set by a user?
>
> Regards,
>
> Waleed