saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

cane
Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
data loss.
I check the comment of thi api:

  We should make sure our tasks are idempotent when speculation is enabled,
i.e. do
   * not use output committer that writes data directly.
   * There is an example in
https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
   * result of using direct output committer with speculation enabled.
   */

But if this the rule we must follow?
For example,for parquet it will got ParquetOutPutCommitter.
In this case, speculation must disable for parquet?

Is there some one know the history?
Thanks too much!




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

Steve Loughran


> On 3 Apr 2018, at 11:19, cane <[hidden email]> wrote:
>
> Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
> data loss.
> I check the comment of thi api:
>
>  We should make sure our tasks are idempotent when speculation is enabled,
> i.e. do
>   * not use output committer that writes data directly.
>   * There is an example in
> https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
>   * result of using direct output committer with speculation enabled.
>   */
>
> But if this the rule we must follow?
> For example,for parquet it will got ParquetOutPutCommitter.
> In this case, speculation must disable for parquet?
>
> Is there some one know the history?
> Thanks too much!


If you are writing to HDFS or object stores other than s3 and you make sure that you are using the FileOutputFormat commit algorithm, you can use speculation without problems.

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

if you use the version 2 algorithm then you are vulnerable to a failure during task commit, but only during task commit and then if speculative/repeated tasks generate output files with different names.

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

If you are using S3 as a direct destination of work, then, in the absence of a consistency layer (S3mer, EMR consistent s3, Hadoop 3,x + S3Guard) or an S3-Specific committer, you are always at risk of data loss. Don't dp that

Further reading

https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

cane
I observe that.
If commit Job done on driver and commit task done on executor.
With speculation enable,it may cause data loss.
Since commit Job will call listStatus and commit Task will delete output file if already exist and rename to final output.
When listStatus called after delete and before rename, then data will loss!

Am i right!
Thanks Steve

2018-04-04 4:44 GMT+08:00 Steve Loughran <[hidden email]>:


> On 3 Apr 2018, at 11:19, cane <[hidden email]> wrote:
>
> Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
> data loss.
> I check the comment of thi api:
>
>  We should make sure our tasks are idempotent when speculation is enabled,
> i.e. do
>   * not use output committer that writes data directly.
>   * There is an example in
> https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
>   * result of using direct output committer with speculation enabled.
>   */
>
> But if this the rule we must follow?
> For example,for parquet it will got ParquetOutPutCommitter.
> In this case, speculation must disable for parquet?
>
> Is there some one know the history?
> Thanks too much!


If you are writing to HDFS or object stores other than s3 and you make sure that you are using the FileOutputFormat commit algorithm, you can use speculation without problems.

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

if you use the version 2 algorithm then you are vulnerable to a failure during task commit, but only during task commit and then if speculative/repeated tasks generate output files with different names.

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

If you are using S3 as a direct destination of work, then, in the absence of a consistency layer (S3mer, EMR consistent s3, Hadoop 3,x + S3Guard) or an S3-Specific committer, you are always at risk of data loss. Don't dp that

Further reading

https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf




--
祝好,
周康
Reply | Threaded
Open this post in threaded view
|

Re: saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

Steve Loughran
In reply to this post by cane

sorry, not noticed this followup. Been busy with other issues

On 3 Apr 2018, at 11:19, cane <[hidden email]> wrote:

Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
data loss.
I check the comment of thi api:

 We should make sure our tasks are idempotent when speculation is enabled,
i.e. do
  * not use output committer that writes data directly.
  * There is an example in
https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
  * result of using direct output committer with speculation enabled.
  */

But if this the rule we must follow?
For example,for parquet it will got ParquetOutPutCommitter.
In this case, speculation must disable for parquet?

ParquetOutputCommitter is a subclass of Hadoop's FileOutputCommitter, so you get the choice of its two algorithms, as set by spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version


algorithm 1 : 
- tasks write to _temporary/$jobId/_temporary/$taskId directory,
- task commit to _temporary/$jobId$taskId in what for a real FS is an O(1) atomic operation. ; speculation and retry straightforward.
 -job commit: copy the contents of all the task ID directories to the destination, create _SUCCESS file
 job commit is non-atomic, If a job fails during commit you need to delete the dest dir and try again.

alogirthm2: : 
- tasks write to _temporary/$jobId/_temporary/$taskId directory,
 -task commit: merge to dest directory, potentially while other tasks are doing a merge at the same time.
 -Job commit does nothing but create the _SUCCESS file, and can be repeated.

you can speculate with either, but if a task using algorithm 2 fails during task commit then there's a problem, as the store is in an unknown state. Neither MapReduce nor Spark worry about this. Usually its fast so the window of failure pretty small, when you are working with object stores that doesn't hold. Really they should react to that failure by aborting the job, but as object stores tend to have their own issues, this is more of a detail than the underlying flaw.

As I said, you can read 


and a precursor attempt to document what goes in the depths of FileOutpuitCommitter (which has an error in one of the code samples; I forget which. The paper fixes that)


+ an IBM paper on their Swift committer for spark:  Stocator: A High Performance Object Store Connector for Spark: https://arxiv.org/pdf/1709.01812


I have some issues with that paper, but its worthwhile looking at to see their focus on rollback over temp directories



Is there some one know the history?

If you check out hadoop, you can get the history after the svn -> git migration, though the earlier history is lost in folklore, primarily stories of "what went wrong" at Yahoo!.


For spark, look at 

and the git logs of

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Once you start looking at the commit protocols, you end up in a fascinating world where things like proof of correctness start to matter. Sadly, everyone is constrained not just but our lack of everyday use of the language and tools, but by the lack of a foundation of specs of the underlying storage systems. There is one for a model of a consistent s3 store, https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf , but I couldn't work out how to define an eventually consistent one in TLA+.. Contributions welcome.

-Steve

(*) the Hadoop Filesystem spec is actually Z disguised as Python, but it doesn't integrate with any toolchain you can use for correctness proofs. But it is read, understood and maintained by developers, which I consider a success. It's just, we could do more. 
Reply | Threaded
Open this post in threaded view
|

Re: saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

cane
Thanks Steve!
I will study about links you mentioned!



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]