[DISCUSS] writing structured streaming dataframe to custom S3 buckets?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] writing structured streaming dataframe to custom S3 buckets?

Aniruddha P Tekade
Hello,

I have a local S3 service that is writable and readable using AWS sdk APIs. I created the spark session and then set the hadoop configurations as follows - 

// Create Spark Session
val spark = SparkSession
.builder()
.master("local[*]")
.appName("S3Loaders")
.config("spark.sql.streaming.checkpointLocation", "/Users/atekade/checkpoint-s3-loaders/")
.getOrCreate()

// Take spark context from spark session
val sc = spark.sparkContext

// Configure spark context with S3 values
val accessKey = "00cce9eb2c589b1b1b5b"
val secretKey = "flmheKX9Gb1tTlImO6xR++9kvnUByfRKZfI7LJT8"
val endpoint = "http://s3-region1.mycloudianhyperstore.com:80"

spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", endpoint)
// spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
// spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)

sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", accessKey)
sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", secretKey)
And then trying to write to the s3 as follows - 
val query = rawData
.writeStream
.format("csv")
.option("format", "append")
.option("path", "s3a://bucket0/")
.outputMode("append")
.start()
But nothing is actually getting written. Since I am running this from my local machine, I have an entry for the ip-address and S3 endpoint into the /etc/hosts file. As you can see this is a streaming dataframe and so can not write without writeStream API. Can someone help about what am I missing here? Is there any better way to perform this?

Best,
Aniruddha
-----------

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] writing structured streaming dataframe to custom S3 buckets?

Steve Loughran-2
> spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

This is some superstition which seems to get carried through stack overflow articles. You do not need to declare the implementation class for s3a:// any more than you have to do for HDFS. It's defined in core-defaults.xml in hadoop-common. Remove

fs.s3a.awsAccessKeyId" -not the correct config name

S3 is an object store and uploads are not manifest until the put is complete, which happens in close(). Is that what you're seeing here?

Otherwise, set the org.apache.hadoop.fs.s3a log level to DEBUG and see what it says is going on.

HTH

On Tue, Oct 29, 2019 at 10:43 PM Aniruddha P Tekade <[hidden email]> wrote:
Hello,

I have a local S3 service that is writable and readable using AWS sdk APIs. I created the spark session and then set the hadoop configurations as follows - 

// Create Spark Session
val spark = SparkSession
.builder()
.master("local[*]")
.appName("S3Loaders")
.config("spark.sql.streaming.checkpointLocation", "/Users/atekade/checkpoint-s3-loaders/")
.getOrCreate()

// Take spark context from spark session
val sc = spark.sparkContext

// Configure spark context with S3 values
val accessKey = "00cce9eb2c589b1b1b5b"
val secretKey = "flmheKX9Gb1tTlImO6xR++9kvnUByfRKZfI7LJT8"
val endpoint = "http://s3-region1.mycloudianhyperstore.com:80"

spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", endpoint)
// spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
// spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)

sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", accessKey)
sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", secretKey)
And then trying to write to the s3 as follows - 
val query = rawData
.writeStream
.format("csv")
.option("format", "append")
.option("path", "s3a://bucket0/")
.outputMode("append")
.start()
But nothing is actually getting written. Since I am running this from my local machine, I have an entry for the ip-address and S3 endpoint into the /etc/hosts file. As you can see this is a streaming dataframe and so can not write without writeStream API. Can someone help about what am I missing here? Is there any better way to perform this?

Best,
Aniruddha
-----------