spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pei-Lun Lee <pl...@appier.com>
Subject Re: Which OutputCommitter to use for S3?
Date Thu, 26 Mar 2015 05:54:28 GMT
I updated the PR for SPARK-6352 to be more like SPARK-3595.
I added a new setting "spark.sql.parquet.output.committer.class" in hadoop
configuration to allow custom implementation of ParquetOutputCommitter.
Can someone take a look at the PR?

On Mon, Mar 16, 2015 at 5:23 PM, Pei-Lun Lee <pllee@appier.com> wrote:

> Hi,
>
> I created a JIRA and PR for supporting a s3 friendly output committer for
> saveAsParquetFile:
> https://issues.apache.org/jira/browse/SPARK-6352
> https://github.com/apache/spark/pull/5042
>
> My approach is add a DirectParquetOutputCommitter class in spark-sql
> package and use a boolean config variable
> spark.sql.parquet.useDirectParquetOutputCommitter to choose between default
> output committer.
> This may not be the smartest solution but it works for me.
> Tested on spark 1.1, 1.3 with hadoop 1.0.4.
>
>
> On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson <ilikerps@gmail.com> wrote:
>
>> Yes, unfortunately that direct dependency makes this injection much more
>> difficult for saveAsParquetFile.
>>
>> On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee <pllee@appier.com> wrote:
>>
>>> Thanks for the DirectOutputCommitter example.
>>> However I found it only works for saveAsHadoopFile. What about
>>> saveAsParquetFile?
>>> It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
>>> of FileOutputCommitter.
>>>
>>> On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor <
>>> thomas.demoor@amplidata.com>
>>> wrote:
>>>
>>> > FYI. We're currently addressing this at the Hadoop level in
>>> > https://issues.apache.org/jira/browse/HADOOP-9565
>>> >
>>> >
>>> > Thomas Demoor
>>> >
>>> > On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath <
>>> > ddmcbeath@yahoo.com.invalid> wrote:
>>> >
>>> >> Just to close the loop in case anyone runs into the same problem I
>>> had.
>>> >>
>>> >> By setting --hadoop-major-version=2 when using the ec2 scripts,
>>> >> everything worked fine.
>>> >>
>>> >> Darin.
>>> >>
>>> >>
>>> >> ----- Original Message -----
>>> >> From: Darin McBeath <ddmcbeath@yahoo.com.INVALID>
>>> >> To: Mingyu Kim <mkim@palantir.com>; Aaron Davidson <
>>> ilikerps@gmail.com>
>>> >> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>> >> Sent: Monday, February 23, 2015 3:16 PM
>>> >> Subject: Re: Which OutputCommitter to use for S3?
>>> >>
>>> >> Thanks.  I think my problem might actually be the other way around.
>>> >>
>>> >> I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
>>> >> scripts, I don't specify a
>>> >> -hadoop-major-version and the default is 1.   I'm guessing that if I
>>> make
>>> >> that a 2 that it might work correctly.  I'll try it and post a
>>> response.
>>> >>
>>> >>
>>> >> ----- Original Message -----
>>> >> From: Mingyu Kim <mkim@palantir.com>
>>> >> To: Darin McBeath <ddmcbeath@yahoo.com>; Aaron Davidson <
>>> >> ilikerps@gmail.com>
>>> >> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>> >> Sent: Monday, February 23, 2015 3:06 PM
>>> >> Subject: Re: Which OutputCommitter to use for S3?
>>> >>
>>> >> Cool, we will start from there. Thanks Aaron and Josh!
>>> >>
>>> >> Darin, it¹s likely because the DirectOutputCommitter is compiled with
>>> >> Hadoop 1 classes and you¹re running it with Hadoop 2.
>>> >> org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1,
>>> and it
>>> >> became an interface in Hadoop 2.
>>> >>
>>> >> Mingyu
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On 2/23/15, 11:52 AM, "Darin McBeath" <ddmcbeath@yahoo.com.INVALID>
>>> >> wrote:
>>> >>
>>> >> >Aaron.  Thanks for the class. Since I'm currently writing Java based
>>> >> >Spark applications, I tried converting your class to Java (it seemed
>>> >> >pretty straightforward).
>>> >> >
>>> >> >I set up the use of the class as follows:
>>> >> >
>>> >> >SparkConf conf = new SparkConf()
>>> >> >.set("spark.hadoop.mapred.output.committer.class",
>>> >> >"com.elsevier.common.DirectOutputCommitter");
>>> >> >
>>> >> >And I then try and save a file to S3 (which I believe should use
the
>>> old
>>> >> >hadoop apis).
>>> >> >
>>> >> >JavaPairRDD<Text, Text> newBaselineRDDWritable =
>>> >> >reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
>>> >> >newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
>>> >> >Text.class, Text.class, SequenceFileOutputFormat.class,
>>> >> >org.apache.hadoop.io.compress.GzipCodec.class);
>>> >> >
>>> >> >But, I get the following error message.
>>> >> >
>>> >> >Exception in thread "main" java.lang.IncompatibleClassChangeError:
>>> Found
>>> >> >class org.apache.hadoop.mapred.JobContext, but interface was expected
>>> >> >at
>>> >>
>>> >>
>>> >com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
>>> >> >java:68)
>>> >> >at
>>> >>
>>> >org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
>>> >> >at
>>> >>
>>> >>
>>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
>>> >> >.scala:1075)
>>> >> >at
>>> >>
>>> >>
>>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>>> >> >ala:940)
>>> >> >at
>>> >>
>>> >>
>>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>>> >> >ala:902)
>>> >> >at
>>> >>
>>> >>
>>> >org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
>>> >> >71)
>>> >> >at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
>>> >> >
>>> >> >In my class, JobContext is an interface of  type
>>> >> >org.apache.hadoop.mapred.JobContext.
>>> >> >
>>> >> >Is there something obvious that I might be doing wrong (or messed
up
>>> in
>>> >> >the translation from Scala to Java) or something I should look
>>> into?  I'm
>>> >> >using Spark 1.2 with hadoop 2.4.
>>> >> >
>>> >> >
>>> >> >Thanks.
>>> >> >
>>> >> >Darin.
>>> >> >
>>> >> >
>>> >> >________________________________
>>> >> >
>>> >> >
>>> >> >From: Aaron Davidson <ilikerps@gmail.com>
>>> >> >To: Andrew Ash <andrew@andrewash.com>
>>> >> >Cc: Josh Rosen <rosenville@gmail.com>; Mingyu Kim <mkim@palantir.com
>>> >;
>>> >> >"user@spark.apache.org" <user@spark.apache.org>; Aaron Davidson
>>> >> ><aaron@databricks.com>
>>> >> >Sent: Saturday, February 21, 2015 7:01 PM
>>> >> >Subject: Re: Which OutputCommitter to use for S3?
>>> >> >
>>> >> >
>>> >> >
>>> >> >Here is the class:
>>> >> >
>>> >>
>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
>>> >>
>>> >>
>>> >dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
>>> >>
>>> >>
>>> >Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
>>> >>
>>> >zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
>>> >> >
>>> >> >You can use it by setting "mapred.output.committer.class" in the
>>> Hadoop
>>> >> >configuration (or "spark.hadoop.mapred.output.committer.class" in
the
>>> >> >Spark configuration). Note that this only works for the old Hadoop
>>> APIs,
>>> >> >I believe the new Hadoop APIs strongly tie committer to input format
>>> (so
>>> >> >FileInputFormat always uses FileOutputCommitter), which makes this
>>> fix
>>> >> >more difficult to apply.
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash <andrew@andrewash.com>
>>> >> wrote:
>>> >> >
>>> >> >Josh is that class something you guys would consider open sourcing,
>>> or
>>> >> >would you rather the community step up and create an OutputCommitter
>>> >> >implementation optimized for S3?
>>> >> >>
>>> >> >>
>>> >> >>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen <rosenville@gmail.com>
>>> >> wrote:
>>> >> >>
>>> >> >>We (Databricks) use our own DirectOutputCommitter implementation,
>>> which
>>> >> >>is a couple tens of lines of Scala code.  The class would almost
>>> >> >>entirely be a no-op except we took some care to properly handle
the
>>> >> >>_SUCCESS file.
>>> >> >>>
>>> >> >>>
>>> >> >>>On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim <mkim@palantir.com>
>>> wrote:
>>> >> >>>
>>> >> >>>I didn¹t get any response. It¹d be really appreciated
if anyone
>>> using a
>>> >> >>>special OutputCommitter for S3 can comment on this!
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>Thanks,
>>> >> >>>>Mingyu
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>From: Mingyu Kim <mkim@palantir.com>
>>> >> >>>>Date: Monday, February 16, 2015 at 1:15 AM
>>> >> >>>>To: "user@spark.apache.org" <user@spark.apache.org>
>>> >> >>>>Subject: Which OutputCommitter to use for S3?
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>HI all,
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>The default OutputCommitter used by RDD, which is
>>> FileOutputCommitter,
>>> >> >>>>seems to require moving files at the commit step, which
is not a
>>> >> >>>>constant operation in S3, as discussed in
>>> >> >>>>
>>> >>
>>> https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apa
>>> >>
>>> >>
>>> >>>>che.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40ent
>>> >>
>>> >>
>>> >>>>ropy.be-253E&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=e
>>> >>
>>> >>
>>> >>>>nnQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFszOvl_-
>>> >>
>>> >>>>ZnxmkBPHo1K24TfGE&s=EQOZaHRANJupdjXCfHSXL2t5BZ9YgMt2pRc3pht4o7o&e=
.
>>> >>
>>> >> >>>>People seem to develop their own NullOutputCommitter
>>> implementation or
>>> >> >>>>use DirectFileOutputCommitter (as mentioned in SPARK-3595),
but I
>>> >> >>>>wanted to check if there is a de facto standard, publicly
>>> available
>>> >> >>>>OutputCommitter to use for S3 in conjunction with Spark.
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>Thanks,
>>> >> >>>>Mingyu
>>> >> >>>
>>> >> >>
>>> >> >
>>> >> >---------------------------------------------------------------------
>>> >> >To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> >For additional commands, e-mail: user-help@spark.apache.org
>>> >>
>>> >> >
>>> >>
>>> >> ---------------------------------------------------------------------
>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>> >>
>>> >> ---------------------------------------------------------------------
>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>> >>
>>> >>
>>> >
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message