spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Loughran <>
Subject Re: Spark with S3 DirectOutputCommitter
Date Sun, 11 Sep 2016 18:34:43 GMT

> On 9 Sep 2016, at 21:54, Srikanth <> wrote:
> Hello,
> I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a few configs
and none of them seem to work.
> Output always creates _temporary directory. Rename is killing performance.

> I read some notes about DirectOutputcommitter causing problems with speculation turned
on. Was this option removed entirely? 

Spark turns off any committer with the word "direct' in its name if speculation==true . Concurrency,

even on on-speculative execution, the trouble with the direct options is that executor/job
failures can leave incomplete/inconsistent work around —and the things downstream wouldn't
even notice

There's work underway to address things, work which requires a consistent metadata store alongside
S3 ( HADOOP-13345 : S3Guard).

For now: stay with the file output committer


Even better: use HDFS as the intermediate store for work, only do a bulk upload at the end.

>   val spark = SparkSession.builder()
>                 .appName("MergeEntities")
>                 .config("spark.sql.warehouse.dir", mergeConfig.getString("sparkSqlWarehouseDir"))
>                 .config("fs.s3a.buffer.dir", "/tmp")
>                 .config("spark.hadoop.mapred.output.committer.class", classOf[DirectOutputCommitter].getCanonicalName)
>                 .config("mapred.output.committer.class", classOf[DirectOutputCommitter].getCanonicalName)
>                 .config("mapreduce.use.directfileoutputcommitter", "true")
>                 //.config("spark.sql.sources.outputCommitterClass", classOf[DirectOutputCommitter].getCanonicalName)
>                 .getOrCreate()
> Srikanth

View raw message