spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Stojanov <m...@danielstojanov.com>
Subject S3 read/write from PySpark
Date Thu, 06 Aug 2020 01:15:28 GMT
Hi,

I am trying to read/write files to S3 from PySpark. The procedure that I
have used is to download Spark, start PySpark with the hadoop-aws, guava,
aws-java-sdk-bundle packages. The versions are explicitly specified by
looking up the exact dependency version on Maven. Allowing dependencies to
be auto determined does not work. This procedure works for Spark 3.0.0 with
Hadoop 2.7, but does not work for Hadoop 3.2. I get this exception:
java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider


Can somebody point to a procedure for doing this using Spark bundled with
Hadoop 3.2?

Regards,





To replicate:

Download Spark 3.0.0 with support for Hadoop 3.2.

Launch spark with:

./pyspark --packages
org.apache.hadoop:hadoop-common:3.3.0,org.apache.hadoop:hadoop-client:3.3.0,org.apache.hadoop:hadoop-aws:3.3.0,com.amazonaws:aws-java-sdk-bundle:1.11.563,com.google.guava:guava:27.1-jre

Then run the following Python.


# Set these 4 parameters as appropriate.

aws_bucket = ""

aws_filename = ""

aws_access_key = ""

aws_secret_key = ""


spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4",
"true")

spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")

spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)

spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)

df = spark.read.option("header",
"true").csv(f"s3a://{aws_bucket}/{aws_filename}")




Leads to this error message:




Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/pyspark/sql/readwriter.py",
line 535, in csv
    return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
  File
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/pyspark/sql/utils.py",
line 131, in deco
    return f(*a, **kw)
  File
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.csv.
: java.io.IOException: From option fs.s3a.aws.credentials.provider
java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider not found
        at
org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:645)
        at
org.apache.hadoop.fs.s3a.S3AUtils.buildAWSProviderList(S3AUtils.java:668)
        at
org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:619)
        at
org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:636)
        at
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:390)
        at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
        at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
        at
org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
        at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
        at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
        at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
        at scala.Option.getOrElse(Option.scala:189)
        at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
        at
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider not found
        at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
        at
org.apache.hadoop.conf.Configuration.getClasses(Configuration.java:2570)
        at
org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:642)
        ... 28 more

Mime
View raw message