S3 Read / Write makes executors deadlocked

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

S3 Read / Write makes executors deadlocked

invkrh
Given the following code which just reads from s3, then saves files to s3

----------------------------
val inputFileName: String = "s3n://input/file/path"
val outputFileName: String = "s3n://output/file/path"
val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]")
val sparkContext = new SparkContext(conf)

// Problems here: executors thread locked
sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName)
// But this one works
sparkContext.textFile(inputFileName).count()
----------------------------

It blocks without showing any exceptions or errors. jstack shows that all executors are locked. The thread dump is in end of this post.

I am using spark-1.4.0 on my PC which has 4 CPU cores.
There are 21 parquet files in the input directory, 500KB / file.

In addition, if we change the last action to a non IO bounded one, for example, count(). It works.
It seems that S3 read and write in the same stage makes executors deadlocked.

I encountered the same problem when using DataFrame load/save operations, jira created: https://issues.apache.org/jira/browse/SPARK-8869

"Executor task launch worker-3" #69 daemon prio=5 os_prio=0 tid=0x00007f7bd4036800 nid=0x1296 in Object.wait() [0x00007f7c1099a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518) - locked <0x00000000e56745b8> (a org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)


--
Hao Ren

Data Engineer @ leboncoin

Paris, France
Reply | Threaded
Open this post in threaded view
|

Re: S3 Read / Write makes executors deadlocked

invkrh
I have tested on another pc which has 8 CPU cores.
But it hangs when defaultParallelismLevel > 4, e.g. sparkConf.setMaster("local[*]")
local[1] ~ local[3] work well.

4 is the mysterious boundary.

It seems that I am not the only one encountered this problem: https://issues.apache.org/jira/browse/SPARK-8898

Here is Sean's answer for the jira above:
this is a jets3t problem. You will have to manage to update it in your build or get EC2 + Hadoop 2 to work, which I think can be done. At least, this is just a subset of "EC2 should support Hadoop 2" and/or that the EC2 support should move out of Spark anyway. I don't know there's another action to take in Spark.

But I just use sbt the get the published spark 1.4, and it does not work on my local PC, not EC2.
Seriously, I do think something should be done for Spark, because s3 read/write is quite a common use case.

Any help on this issue is highly appreciated.
If you need more info, checkout the jira I created: https://issues.apache.org/jira/browse/SPARK-8869

On Thu, Jul 16, 2015 at 11:39 AM, Hao Ren <[hidden email]> wrote:
Given the following code which just reads from s3, then saves files to s3

----------------------------
val inputFileName: String = "s3n://input/file/path"
val outputFileName: String = "s3n://output/file/path"
val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]")
val sparkContext = new SparkContext(conf)

// Problems here: executors thread locked
sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName)
// But this one works
sparkContext.textFile(inputFileName).count()
----------------------------

It blocks without showing any exceptions or errors. jstack shows that all executors are locked. The thread dump is in end of this post.

I am using spark-1.4.0 on my PC which has 4 CPU cores.
There are 21 parquet files in the input directory, 500KB / file.

In addition, if we change the last action to a non IO bounded one, for example, count(). It works.
It seems that S3 read and write in the same stage makes executors deadlocked.

I encountered the same problem when using DataFrame load/save operations, jira created: https://issues.apache.org/jira/browse/SPARK-8869

"Executor task launch worker-3" #69 daemon prio=5 os_prio=0 tid=0x00007f7bd4036800 nid=0x1296 in Object.wait() [0x00007f7c1099a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518) - locked <0x00000000e56745b8> (a org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)


--
Hao Ren

Data Engineer @ leboncoin

Paris, France



--
Hao Ren

Data Engineer @ leboncoin

Paris, France