spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Loughran <ste...@cloudera.com.INVALID>
Subject Re: [DISCUSS] writing structured streaming dataframe to custom S3 buckets?
Date Fri, 08 Nov 2019 12:37:13 GMT
> spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")

This is some superstition which seems to get carried through stack overflow
articles. You do not need to declare the implementation class for s3a://
any more than you have to do for HDFS. It's defined in core-defaults.xml in
hadoop-common. Remove

fs.s3a.awsAccessKeyId" -not the correct config name

S3 is an object store and uploads are not manifest until the put is
complete, which happens in close(). Is that what you're seeing here?

Otherwise, set the org.apache.hadoop.fs.s3a log level to DEBUG and see what
it says is going on.

HTH

On Tue, Oct 29, 2019 at 10:43 PM Aniruddha P Tekade <atekade1@binghamton.edu>
wrote:

> Hello,
>
> I have a local S3 service that is writable and readable using AWS sdk
> APIs. I created the spark session and then set the hadoop configurations as
> follows -
>
> // Create Spark Session
> val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .appName("S3Loaders")
>   .config("spark.sql.streaming.checkpointLocation", "/Users/atekade/checkpoint-s3-loaders/")
>   .getOrCreate()
>
> // Take spark context from spark session
> val sc = spark.sparkContext
>
> // Configure spark context with S3 values
> val accessKey = "00cce9eb2c589b1b1b5b"
> val secretKey = "flmheKX9Gb1tTlImO6xR++9kvnUByfRKZfI7LJT8"
> val endpoint = "http://s3-region1.mycloudianhyperstore.com:80"
>
> spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", endpoint)
> //    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
> //    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
>
> sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", accessKey)
> sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", secretKey)
>
> And then trying to write to the s3 as follows -
>
> val query = rawData
>   .writeStream
>   .format("csv")
>   .option("format", "append")
>   .option("path", "s3a://bucket0/")
>   .outputMode("append")
>   .start()
>
> But nothing is actually getting written. Since I am running this from my
> local machine, I have an entry for the ip-address and S3 endpoint into the
> /etc/hosts file. As you can see this is a streaming dataframe and so can
> not write without writeStream API. Can someone help about what am I missing
> here? Is there any better way to perform this?
>
> Best,
> Aniruddha
> -----------
>
> ᐧ
>

Mime
View raw message