spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Loughran <>
Subject Re: S3A + EMR failure when writing Parquet?
Date Tue, 30 Aug 2016 18:49:49 GMT

On 29 Aug 2016, at 18:18, Everett Anderson <<>>

Okay, I don't think it's really just S3A issue, anymore. I can run the job using fs.s3.impl/spark.hadoop.fs.s3.impl
set to the S3A impl as a --conf param from the EMR console successfully, as well.

The problem seems related to the fact that we're trying to spark-submit jobs to a YARN cluster
from outside the cluster itself.

The docs<> suggest one must
copy the Hadoop/YARN config XML outside of the cluster to do this, which feels gross, but
it's what we did. We had changed fs.s3.impl to use S3A in that config, and that seems to result
in the failure, though I still can't figure out why.

Interestingly, if I don't make that change to the XML, and leave it as the EMRFS implementation,
it will work, as long as I use s3a:// URIs for the jar, otherwise spark-submit won't be able
to ship them to the cluster since it won't have the EMRFS implementation locally.

I see: you are trying to use EMR's "special" S3 in-cluster, but spark-submit is trying to
submit remotely.

1.  Trying to change the value of fs.s3.impl to S3a works for upload, but not runtime
2. use s3a for the upload, leave things alone and all works.

I would just go with S3a, this is just the JARs being discussed here right —not the actual

When the JARs are needed, they'll be copied on EMR using the amazon S3A implementation —whatever
they've done there— to the local filesystem, where classloaders can pick them up and use.
It might be that s3a:// URLs are slower on EMR than s3:// URLs, but there's no fundamental
reason wny it isn't going to work.

On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson <<>>
(Sorry, typo -- I was using spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
not 'hadooop', of course)

On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson <<>>

I'm having some trouble figuring out a failure when using S3A when writing a DataFrame as
Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and Spark 1.6.2). It works when using EMRFS (s3://),

I'm using these extra conf params, though I've also tried without everything but the encryption
one with the same result:

--conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2
--conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
--conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256
--conf spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter

It looks like it does actually write the parquet shards under

<output root S3>/_temporary/0/_temporary/<attempt>/

but then must hit that S3 exception when trying to copy/rename. I think the NullPointerException
deep down in Parquet is due to it causing close() more than once so isn't the root cause,
but I'm not sure.

given the stack trace has abortTask() in it, I'd suspect that's a follow-on failure.

One possibility here may be related to how EMR will handle your credentials (session credentials
served up over IAM HTTP) and how Apache Hadoop 2.7's s3a auth works (IAM isn't supported until
2.8). That could trigger the problem. But I don't know.

I do know that I have dataframes writing back to s3a on Hadoop 2.7.3, *not on EMR*.

Anyone seen something like this?

16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed
4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 54,<>-west-2.computk.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
        ... 8 more
        Suppressed: java.lang.NullPointerException
                at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(
                at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(
                at org.apache.parquet.hadoop.ParquetRecordWriter.close(
                at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101)
                at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:266)
                at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1286)
                ... 9 more

probable bug in parquet cleanup if it never started may want to report that to

Caused by: Access Denied (Service: Amazon
S3; Status Code: 403; Error Code: AccessDenied; Request ID: EA0E434768316935), S3 Extended
Request ID: fHtu7Q9VSi/8h0RAyfRiyK6uAJnajZBrwqZH3eBfF5kM13H6dDl006031NTwU/whyGu1uNqW1mI=
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(
        at com.amazonaws.http.AmazonHttpClient.executeOneRequest(
        at com.amazonaws.http.AmazonHttpClient.executeHelper(
        at com.amazonaws.http.AmazonHttpClient.doExecute(
        at com.amazonaws.http.AmazonHttpClient.executeWithTimer(
        at com.amazonaws.http.AmazonHttpClient.execute(
        ... 3 more

The ASF implementation of S3A only uses the transfer manager for async transfers when using
the fast output stream ( "" = true), or on a rename. Sounds a bit like a
rename in the codepath —except I don't see why the 403 would wait until the rename(). I'd
have expected it to start on the attempt to create the temporary directory

Overall, it's not a problem I've seen before. I think trying to mix things is the root cause


View raw message