concurrent writes with dynamic partition overwrite mode

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

concurrent writes with dynamic partition overwrite mode

Koert Kuipers
hi,
i am struggling to understand if concurrent writes to same basedir but different partitions are save with file sources such as parquet.

i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent jobs on hdfs and it seemed to work fine. admittedly this was a rather limited test.
as the jobs are running i see on hdfs:

drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:17 out/_temporary/0

it seems each job has its own temporary directory it writes to, set to ".spark-staging-" + jobId
this is consistent with the stagingDir i found in org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find it being used specifically for dynamic partition overwrite mode.
so it thought with this i am all set.
i dont really know what this _temporary/0 is for but it did not seem to get in the way.

but then i added some unit tests that also do concurrent writes to different partitions with dynamic partition overwrite mode (these test are much more rigorous than my ad-hoc tests on hdfs), and now i see errors like this:

java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl

so this seems to hint that the issue is with that _temporary/0. why is it trying to do a listStatus on this _temporary/0?
did i always have this issue and was just lucky enough to not run into it on hdfs, or is it specific to RawLocalFileSystem?

thanks!
koert
Reply | Threaded
Open this post in threaded view
|

Re: concurrent writes with dynamic partition overwrite mode

Koert Kuipers

On Sun, Sep 1, 2019 at 2:53 PM Koert Kuipers <[hidden email]> wrote:
hi,
i am struggling to understand if concurrent writes to same basedir but different partitions are save with file sources such as parquet.

i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent jobs on hdfs and it seemed to work fine. admittedly this was a rather limited test.
as the jobs are running i see on hdfs:

drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:17 out/_temporary/0

it seems each job has its own temporary directory it writes to, set to ".spark-staging-" + jobId
this is consistent with the stagingDir i found in org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find it being used specifically for dynamic partition overwrite mode.
so it thought with this i am all set.
i dont really know what this _temporary/0 is for but it did not seem to get in the way.

but then i added some unit tests that also do concurrent writes to different partitions with dynamic partition overwrite mode (these test are much more rigorous than my ad-hoc tests on hdfs), and now i see errors like this:

java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl

so this seems to hint that the issue is with that _temporary/0. why is it trying to do a listStatus on this _temporary/0?
did i always have this issue and was just lucky enough to not run into it on hdfs, or is it specific to RawLocalFileSystem?

thanks!
koert
Reply | Threaded
Open this post in threaded view
|

Re: concurrent writes with dynamic partition overwrite mode

Steve Loughran-2
In reply to this post by Koert Kuipers


On Sun, Sep 1, 2019 at 7:54 PM Koert Kuipers <[hidden email]> wrote:
hi,
i am struggling to understand if concurrent writes to same basedir but different partitions are save with file sources such as parquet.

i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent jobs on hdfs and it seemed to work fine. admittedly this was a rather limited test.
as the jobs are running i see on hdfs:

drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:17 out/_temporary/0

it seems each job has its own temporary directory it writes to, set to ".spark-staging-" + jobId
this is consistent with the stagingDir i found in org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find it being used specifically for dynamic partition overwrite mode.
so it thought with this i am all set.
i dont really know what this _temporary/0 is for but it did not seem to get in the way.


It's a core part of the hadoop MR commit protocols. I think the best (only!) docs of these other than the most confusing piece of co-recursive code I've ever had to step through taking notes of is : https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_draft_005


every MR app attempt has its own attempt ID; when the hadoop MR engine attempt N is restarted it looks for the temp dir of N-1 and can use this to recover from failure. Spark's solution to the app restart problem is "be faster and fix failures by restarting entirely", so the app attempt is always 0

If you have two app attempts writing to same destination path, their output is inevitably going to conflict and as the first job commit will delete the attempt dir then the second will fail. As your stack trace shows.

1. You need to (somehow) get a different attempt ID for each job to avoid that clash. No Idea how to do that; it may need code changes
2 and set  "mapreduce.fileoutputcommitter.cleanup.skipped" to false to avoid a full cleanup of _temporary on job commit

but then i added some unit tests that also do concurrent writes to different partitions with dynamic partition overwrite mode (these test are much more rigorous than my ad-hoc tests on hdfs), and now i see errors like this:

java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl

so this seems to hint that the issue is with that _temporary/0. why is it trying to do a listStatus on this _temporary/0?
did i always have this issue and was just lucky enough to not run into it on hdfs, or is it specific to RawLocalFileSystem?

thanks!
koert