Steve is right that the S3 committer isn't a ParquetOutputCommitter. I think that the reason that check exists is to make sure Parquet writes _metadata summary files to an output directory. But, I think the **summary files are a bad idea**, so we bypass that logic and use the committer directly if the output path is in S3.

Why are summary files a bad idea? Because they can easily get out of sync with the real data files and cause correctness problems. There are two reasons for using them, both optimizations to avoid reading all of the file footers in a table. First, _metadata can be used to plan a job because it has the row group offsets. But planning no longer reads all of the footers; it uses regular Hadoop file splits instead. The second use is to get the schema of a table more quickly, but this should be handled by a metastore that tracks the latest schema. A metastore provides even faster access, a more reliable schema, and can support schema evolution.

Even with the _metadata files, Spark has had to parallelize building a table from Parquet files in S3 without a metastore, so I think this requirement should be removed. In the mean time, you can probably just build a version of the S3 committer that inherits from ParquetOutputCommitter instead of FileOutputCommitter. That's probably the easiest solution. Be sure you run the tests!

rb

On Tue, Mar 28, 2017 at 3:17 AM, Steve Loughran <stevel@hortonworks.com> wrote:

> On 28 Mar 2017, at 05:20, sririshindra <sririshindra@gmail.com> wrote:
>
> Hi
>
> I have a job which saves a dataframe as parquet file to s3.
>
> The built a jar using your repository https://github.com/rdblue/s3committer.
>
> I added the following config in the to the Spark Session
> config("spark.hadoop.spark.sql.parquet.output.committer.class",
> "com.netflix.bdp.s3.S3PartitionedOutputCommitter")
>
>
> I submitted the job to spark 2.0.2 as follows
>
> ./bin/spark-submit --master local[*] --driver-memory 4G --jars
> /home/rishi/Downloads/hadoop-aws-2.7.3.jar,/home/rishi/Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/s3committer/build/libs/s3committer-0.5.5.jar
> --driver-library-path
> /home/user/Downloads/hadoop-aws-2.7.3.jar,/home/user/Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/s3committer/build/libs/s3committer-0.5.5.jar
> --class main.streaming.scala.backupdatatos3.backupdatatos3Processorr
> --packages
> joda-time:joda-time:2.9.7,org.mongodb.mongo-hadoop:mongo-hadoop-core:1.5.2,org.mongodb:mongo-java-driver:3.3.0
> /home/user/projects/backupjob/target/Backup-1.0-SNAPSHOT.jar


The miracle of OSS is that you have the right to fix things, the curse, only you get to fix your problems on a timescale that suits


>
>
> I am gettig the following runtime exception.
> xception in thread "main" java.lang.RuntimeException:
> java.lang.RuntimeException: class
> com.netflix.bdp.s3.S3PartitionedOutputCommitter not
> org.apache.parquet.hadoop.ParquetOutputCommitter
>        at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>        at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.prepareWrite(ParquetFileFormat.scala:81)
>        at


here:
    val committerClass =
      conf.getClass(
        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
        classOf[ParquetOutputCommitter],
        classOf[ParquetOutputCommitter])


At a guess, Ryan's committer isn't a ParquetOutputCommitter.

workarounds

1. Subclass ParquetOutputCommitter
2. Modify ParquetFileFormat to only look for a classOf[FileOutputFormat]; the ParquetOutputCommitter doesn't do anything other than optionally add a metadata file. As that is a performance killer on S3, you should have disabled that option already.

#2 is easiest., time to rebuild spark being the only overhead.

HADOOP-13786  is sneaking in Ryan's work underneath things, but even there the ParquetFileFormat is going to have trouble. Which is odd, given my integration tests did appear to be writing things. I'll take that as a sign of coverage problems




> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:108)
>        at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
>        at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>        at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>        at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>        at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>        at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>        at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>        at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>        at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>        at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>        at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>        at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>        at
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>        at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>        at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>        at
> main.streaming.scala.backupdatatos3.backupdatatos3Processorr$.main(backupdatatos3Processorr.scala:229)
>        at
> main.streaming.scala.backupdatatos3.backupdatatos3Processorr.main(backupdatatos3Processorr.scala)
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:498)
>        at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
>        at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
>        at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
>        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
>        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.RuntimeException: class
> com.netflix.bdp.s3.S3PartitionedOutputCommitter not
> org.apache.parquet.hadoop.ParquetOutputCommitter
>        at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2221)
>        ... 28 more
>
> can you please point out my mistake.
>
> If possible can you give a working example of saving a dataframe as a
> parquet file in s3.
>
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21246.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org




--
Ryan Blue
Software Engineer
Netflix