Quantcast

[Discuss][Spark staging dir] way to disable spark writing to _temporary

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[Discuss][Spark staging dir] way to disable spark writing to _temporary

Yash Sharma
Hi All,
This is another issue that I was facing with the spark - s3 operability and wanted to ask to the broader community if its faced by anyone else.

I have a rather simple aggregation query with a basic transformation. The output however has lot of output partitions (20K partitions). The spark job runs very fast and reaches the end without any failures. So far the spark job has been writing to the staging dir and runs alright. 

As soon as spark starts renaming these files it faces 2 issues:
1. s3 single path renames are insanely slow : and the job spends huge time renaming these files
2. Sometimes renames fail : spark probably has checks after writing the file (not sure) and sometimes few renames fail randomly because of s3's eventual consistency, causing the job to fail intermittently. [added logs at end]

I was wondering what could be some work arounds for this problem or is it possible to override this behavior and write files directly to the expected paths (skipping the staging dir _temporary).

Cheers,
Yash

{logs}
java.io.IOException: Failed to rename FileStatus{path=s3://instances/_temporary/0/task_201704060437_0005_m_000052/utc_date=2012-06-19/product=obsolete; isDirectory=true; modification_time=0; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false} to s3://instances/utc_date=2012-06-19/product=obsolete
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:441)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:432)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
...
...
InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
...
...
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job job_201704060436_0000 aborted.
17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running ActiveInstances.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)

{logs}


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [Discuss][Spark staging dir] way to disable spark writing to _temporary

Ryan Blue
Yash,

We (Netflix) built a committer that uses the S3 multipart upload API to avoid the copy problem and still handle task failures. You can build and use the copy posted here:


You're probably interested in the S3PartitionedOutputCommitter.

rb

On Thu, Apr 6, 2017 at 10:08 PM, Yash Sharma <[hidden email]> wrote:
Hi All,
This is another issue that I was facing with the spark - s3 operability and wanted to ask to the broader community if its faced by anyone else.

I have a rather simple aggregation query with a basic transformation. The output however has lot of output partitions (20K partitions). The spark job runs very fast and reaches the end without any failures. So far the spark job has been writing to the staging dir and runs alright. 

As soon as spark starts renaming these files it faces 2 issues:
1. s3 single path renames are insanely slow : and the job spends huge time renaming these files
2. Sometimes renames fail : spark probably has checks after writing the file (not sure) and sometimes few renames fail randomly because of s3's eventual consistency, causing the job to fail intermittently. [added logs at end]

I was wondering what could be some work arounds for this problem or is it possible to override this behavior and write files directly to the expected paths (skipping the staging dir _temporary).

Cheers,
Yash

{logs}
java.io.IOException: Failed to rename FileStatus{path=s3://instances/_temporary/0/task_201704060437_0005_m_000052/utc_date=2012-06-19/product=obsolete; isDirectory=true; modification_time=0; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false} to s3://instances/utc_date=2012-06-19/product=obsolete
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:441)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:432)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
...
...
InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
...
...
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job job_201704060436_0000 aborted.
17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running ActiveInstances.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)

{logs}





--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [Discuss][Spark staging dir] way to disable spark writing to _temporary

Yash Sharma
Very interesting. I will give it a try. Thanks for pointing this.
Also, are you planning to contribute it to spark, and could it be a good default option for spark S3 copies ?
Have you got any bench marking that could show the improvements in the job.

Thanks,
Yash

On Sat, 8 Apr 2017 at 02:38 Ryan Blue <[hidden email]> wrote:
Yash,

We (Netflix) built a committer that uses the S3 multipart upload API to avoid the copy problem and still handle task failures. You can build and use the copy posted here:


You're probably interested in the S3PartitionedOutputCommitter.

rb

On Thu, Apr 6, 2017 at 10:08 PM, Yash Sharma <[hidden email]> wrote:
Hi All,
This is another issue that I was facing with the spark - s3 operability and wanted to ask to the broader community if its faced by anyone else.

I have a rather simple aggregation query with a basic transformation. The output however has lot of output partitions (20K partitions). The spark job runs very fast and reaches the end without any failures. So far the spark job has been writing to the staging dir and runs alright. 

As soon as spark starts renaming these files it faces 2 issues:
1. s3 single path renames are insanely slow : and the job spends huge time renaming these files
2. Sometimes renames fail : spark probably has checks after writing the file (not sure) and sometimes few renames fail randomly because of s3's eventual consistency, causing the job to fail intermittently. [added logs at end]

I was wondering what could be some work arounds for this problem or is it possible to override this behavior and write files directly to the expected paths (skipping the staging dir _temporary).

Cheers,
Yash

{logs}
java.io.IOException: Failed to rename FileStatus{path=s3://instances/_temporary/0/task_201704060437_0005_m_000052/utc_date=2012-06-19/product=obsolete; isDirectory=true; modification_time=0; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false} to s3://instances/utc_date=2012-06-19/product=obsolete
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:441)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:432)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
...
...
InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
...
...
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job job_201704060436_0000 aborted.
17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running ActiveInstances.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)

{logs}





--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [Discuss][Spark staging dir] way to disable spark writing to _temporary

Steve Loughran

On 8 Apr 2017, at 10:36, Yash Sharma <[hidden email]> wrote:

Very interesting. I will give it a try. Thanks for pointing this.
Also, are you planning to contribute it to spark, and could it be a good default option for spark S3 copies ?


It's going into Hadoop core itself, HADOOP-13786, with the relevant changes into FileOutputFormat to make it easier to implement similar committers for Azure, etc, where possible. There's matching changes going into S3A, and it'll be implemented in hadoop-aws JAR, so you'll need to be in sync across whichever hadoop release has it. 

Spark is going to need to some changes for Parquet output code of ParquetFileFormat as its too fussy about committer type; either that code gets relaxed inside spark, or someone implements a bridge class. Since I've seen almost no enthusiasm for s3/cloud related spark PRs from me, I'll probably just do option 2 externally. Sorry.




Have you got any bench marking that could show the improvements in the job.

Given S3 internal copy is 6-10 MB/S, you can estimate execution time of today;s rename as (bytes/#of parallel threads in s3a rename); you'll still have the upload at end of task commit, so whichever task(s) commit last will also have measurable effect.

I'll have some TCP-DS numbers once I've got my variant of Ryan's committer ready for a colleague to test against. We've already got some good speedups with the read pipeline and recursive listing stuff in Hadoop 2.8; the commit one is the big one. If you grab Hadoop 2.8.0 right now you get all its read-time speedups, and the option for fast buffer uploads which will remove/reduce the delay at the end of IOStream.close() for the PUT. 

In the meantime, don't commit directly to S3. Even with the classic "commit-by-rename" committer it's not safe: that committer assumes the rename is an atomic O(1) transaction, and in an object store, it isn't.





Thanks,
Yash

On Sat, 8 Apr 2017 at 02:38 Ryan Blue <[hidden email]> wrote:
Yash,

We (Netflix) built a committer that uses the S3 multipart upload API to avoid the copy problem and still handle task failures. You can build and use the copy posted here:


You're probably interested in the S3PartitionedOutputCommitter.

rb

On Thu, Apr 6, 2017 at 10:08 PM, Yash Sharma <[hidden email]> wrote:
Hi All,
This is another issue that I was facing with the spark - s3 operability and wanted to ask to the broader community if its faced by anyone else.

I have a rather simple aggregation query with a basic transformation. The output however has lot of output partitions (20K partitions). The spark job runs very fast and reaches the end without any failures. So far the spark job has been writing to the staging dir and runs alright. 

As soon as spark starts renaming these files it faces 2 issues:
1. s3 single path renames are insanely slow : and the job spends huge time renaming these files
2. Sometimes renames fail : spark probably has checks after writing the file (not sure) and sometimes few renames fail randomly because of s3's eventual consistency, causing the job to fail intermittently. [added logs at end]

I was wondering what could be some work arounds for this problem or is it possible to override this behavior and write files directly to the expected paths (skipping the staging dir _temporary).

Cheers,
Yash

{logs}
java.io.IOException: Failed to rename FileStatus{path=<a href="s3://instances/_temporary/0/task_201704060437_0005_m_000052/utc_date=2012-06-19/product=obsolete" class="">s3://instances/_temporary/0/task_201704060437_0005_m_000052/utc_date=2012-06-19/product=obsolete; isDirectory=true; modification_time=0; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false} to <a href="s3://instances/utc_date=2012-06-19/product=obsolete" class="">s3://instances/utc_date=2012-06-19/product=obsolete
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:441)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:432)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
...
...
InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
...
...
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job job_201704060436_0000 aborted.
17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running ActiveInstances.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)

{logs}





--
Ryan Blue
Software Engineer
Netflix

Loading...