Spark SQL : Exception on concurrent insert due to lease over _SUCCESS

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark SQL : Exception on concurrent insert due to lease over _SUCCESS

Ajith shetty

Hi all

 

I am using spark 2.1 and I encounter exception when do concurrent insert on a table, Here is my scenario and some analysis

 

create table sample using csv options('path' '/tmp/f/')

 

When concurrent insert are executed, we see exception like below:

 

2017-12-29 13:41:11,117 | ERROR | main | Aborting job null. | org.apache.spark.internal.Logging$class.logError(Logging.scala:91)

org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): No lease on /tmp/f/_SUCCESS (inode 1032508): File does not exist. Holder DFSClient_NONMAPREDUCE_8638078_1 does not have any open files.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3466)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3562)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3525)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:917)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:573)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:973)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2260)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2256)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1778)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2254)

 

        at org.apache.hadoop.ipc.Client.call(Client.java:1524)

        at org.apache.hadoop.ipc.Client.call(Client.java:1460)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)

        at com.sun.proxy.$Proxy14.complete(Unknown Source)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:480)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:202)

        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)

        at com.sun.proxy.$Proxy15.complete(Unknown Source)

        at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:887)

        at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:861)

        at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:822)

        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)

        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)

        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:336)

        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2$$anonfun$apply$mcV$sp$1.apply$mcV$sp(FileFormatWriter.scala:167)

        at org.apache.spark.util.Utils$.proxyOperate(Utils.scala:2706)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:166)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)

        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144)

        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)

        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)

        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)

        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)

        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)

        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:125)

        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:125)

        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)

        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)

        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600)

        at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)

        at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)

        at $line48.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)

        at $line48.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)

        at $line48.$read$$iw$$iw$$iw$$iw.<init>(<console>:35)

        at $line48.$read$$iw$$iw$$iw.<init>(<console>:37)

        at $line48.$read$$iw$$iw.<init>(<console>:39)

        at $line48.$read$$iw.<init>(<console>:41)

        at $line48.$read.<init>(<console>:43)

        at $line48.$read$.<init>(<console>:47)

        at $line48.$read$.<clinit>(<console>)

        at $line48.$eval$.$print$lzycompute(<console>:7)

        at $line48.$eval$.$print(<console>:6)

        at $line48.$eval.$print(<console>)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)

        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)

        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)

        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)

        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)

        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)

        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)

        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)

        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)

        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)

        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)

        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)

        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)

        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)

        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)

        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)

        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)

        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)

        at org.apache.spark.repl.Main$.doMain(Main.scala:69)

        at org.apache.spark.repl.Main$.main(Main.scala:52)

        at org.apache.spark.repl.Main.main(Main.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:761)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:190)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:215)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:129)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

org.apache.spark.SparkException: Job aborted.

  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply$mcV$sp(FileFormatWriter.scala:179)

  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)

  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$2.apply(FileFormatWriter.scala:144)

  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)

  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:144)

  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)

  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)

  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)

  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:75)

  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)

  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)

  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)

  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:125)

  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:125)

  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)

  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)

  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600)

  ... 48 elided

 

 

Basic analysis:

 

_SUCCESS file is used by map reduce framework to mark successful jobs (mapreduce.fileoutputcommitter.marksuccessfuljobs / DEFAULTS true). This is done by org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#commitJob via org.apache.spark.sql.execution.datasources.FileFormatWriter#write

 

            if (context.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {

                Path markerPath = new Path(this.outputPath, "_SUCCESS");

                fs.create(markerPath).close();

            }

 

 

The _SUCCESS is created by org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter depending on the output path specified via mapreduce.output.fileoutputformat.outputdir in job configuration

 

Lets take example of

create table sample using csv options('path' '/tmp/f/')

 

so the data files created on every insert is present in /tmp/f/

 

mapreduce.output.fileoutputformat.outputdir is passed by spark @ org.apache.spark.sql.execution.datasources.FileFormatWriter#write is used by committer for 2 reason

1. create part files with data under this folder ( first in _temporary folder and then move to output folder on commitJob)

2. create _SUCCESS files on job completion

 

If 2 applications try to insert to same table concurrently, on job completion when try to commit, the _SUCCESS will result in a race condition  (in our example /tmp/f/_SUCCESS close() call failed). HDFS can lease to only one HDFS client hence failing other

 

As mentioned, the _SUCCESS is created by MapReduce code, this could be turned off by setting mapreduce.fileoutputcommitter.marksuccessfuljobs = false @ org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#setupJob by Spark

 

_SUCCESS is only used by frameworks like OOZIE for file processing, (Refer https://books.google.co.in/books?id=HAY_CQAAQBAJ&pg=PA119&lpg=PA119&dq=Oozie+dependencies+on+_SUCCESS+file&source=bl&ots=RTr3hP0Cjj&sig=3B2yk24ebZt42SQo8O42eOX6OCI&hl=en&sa=X&ved=0ahUKEwjj053i5cDYAhVIr48KHUIGCZQQ6AEIYjAI#v=onepage&q=Oozie%20dependencies%20on%20_SUCCESS%20file&f=false)

 

so  setting mapreduce.fileoutputcommitter.marksuccessfuljobs = false at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#setupJob should be ok.?  Will it have any impact as I do not see _SUCCESS being used by spark. I am new to spark so please correct me if any of the analysis is wrong :)

 

Regards

Ajith