spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: S3 token times out during data frame "write.csv"
Date Tue, 23 Jan 2018 23:03:25 GMT
 How large is the file?

If it is very large then you should have anyway several partitions for the output. This is
also important in case you need to read again from S3 - having several files there enables
parallel reading.

> On 23. Jan 2018, at 23:58, Vasyl Harasymiv <vasyl.harasymiv@gmail.com> wrote:
> 
> Hi Spark Community,
> 
> Saving a data frame into a file on S3 using:
> 
> df.write.csv(s3_location)
> 
> If run for longer than 30 mins, the following error persists:
> 
> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken;`)
> 
> Potentially, because there is a hardcoded session limit in temporary S3 connection from
Spark.
> 
> One can specify the duration as per here:
> 
> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html
> 
> One can, of course, chunk data into sub-30 min writes. However, Is there a way to change
the token expiry parameter directly in Spark before using "write.csv"?
> 
> Thanks a lot for any help!
> Vasyl
> 
> 
> 
> 
> 
>> On Tue, Jan 23, 2018 at 2:46 PM, Toy <noppanit.c@gmail.com> wrote:
>> Thanks, I get this error when I switched to s3a://
>> 
>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>> 	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
>> 	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>> 
>>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palwell@hortonworks.com> wrote:
>>> Spark cannot read locally from S3 without an S3a protocol; you’ll more than
likely need a local copy of the data or you’ll need to utilize the proper jars to enable
S3 communication from the edge to the datacenter.
>>> 
>>>  
>>> 
>>> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
>>> 
>>>  
>>> 
>>> Here are the jars: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
>>> 
>>>  
>>> 
>>> Looks like you already have them, in which case you’ll have to make small configuration
changes, e.g. s3 à s3a
>>> 
>>>  
>>> 
>>> Keep in mind: The Amazon JARs have proven very brittle: the version of the Amazon
libraries must match the versions against which the Hadoop binaries were built.
>>> 
>>>  
>>> 
>>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client
>>> 
>>>  
>>> 
>>>  
>>> 
>>>  
>>> 
>>>  
>>> 
>>> From: Toy <noppanit.c@gmail.com>
>>> Date: Tuesday, January 23, 2018 at 11:33 AM
>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>> Subject: I can't save DataFrame from running Spark locally
>>> 
>>>  
>>> 
>>> Hi,
>>> 
>>>  
>>> 
>>> First of all, my Spark application runs fine in AWS EMR. However, I'm trying
to run it locally to debug some issue. My application is just to parse log files and convert
to DataFrame then convert to ORC and save to S3. However, when I run locally I get this error
>>> 
>>>  
>>> 
>>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>>> 
>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
>>> 
>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
>>> 
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> 
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> 
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> 
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> 
>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>> 
>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>> 
>>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>>> 
>>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
>>> 
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>> 
>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>>> 
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>>> 
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>>> 
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>>> 
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>>> 
>>> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>>> 
>>> at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>>> 
>>> at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>>> 
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>>> 
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>>> 
>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>>> 
>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>>> 
>>>  
>>> 
>>> Here's what I have in sbt
>>> 
>>>  
>>> 
>>> scalaVersion := "2.11.8"
>>> 
>>>  
>>> 
>>> val sparkVersion = "2.1.0"
>>> 
>>> val hadoopVersion = "2.7.3"
>>> 
>>> val awsVersion = "1.11.155"
>>> 
>>>  
>>> 
>>> lazy val sparkAndDependencies = Seq(
>>> 
>>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>>> 
>>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>> 
>>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>>> 
>>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>>> 
>>>  
>>> 
>>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>>> 
>>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>>> 
>>> )
>>> 
>>>  
>>> 
>>> And this is where the code failed
>>> 
>>>  
>>> 
>>> val sparrowWriter = sparrowCastedDf.write.mode("append").format("orc").option("compression",
"zlib")
>>> 
>>> sparrowWriter.save(sparrowOutputPath)
>>> 
>>>  
>>> 
>>> sparrowOutputPath is something like s3://bucket/folder and it exists I checked
it with aws command line
>>> 
>>>  
>>> 
>>> I put a breakpoint there and the full path looks like this s3://bucket/orc/dt=2018-01-23
which exists.
>>> 
>>>  
>>> 
>>> I have also set up the credentials like this
>>> 
>>>  
>>> 
>>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>>> 
>>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>>> 
>>>  
>>> 
>>> My confusion is this code runs fine in the cluster but I get this error running
locally.
>>> 
>>>  
>>> 
>>>  
>>> 
> 

Mime
View raw message