spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Fregly <ch...@fregly.com>
Subject Re: Reading multiple S3 objects, transforming, writing back one
Date Sat, 03 May 2014 23:14:59 GMT
not sure if this directly addresses your issue, peter, but it's worth
mentioned a handy AWS EMR utility called s3distcp that can upload a single
HDFS file - in parallel - to a single, concatenated S3 file once all the
partitions are uploaded.  kinda cool.

here's some info:

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html


s3distcp is an extension of the familiar hadoop distcp, of course.


On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

> The fastest way to save to S3 should be to leave the RDD with many
> partitions, because all partitions will be written out in parallel.
>
> Then, once the various parts are in S3, somehow concatenate the files
> together into one file.
>
> If this can be done within S3 (I don't know if this is possible), then you
> get the best of both worlds: a highly parallelized write to S3, and a
> single cleanly named output file.
>
>
> On Thu, May 1, 2014 at 12:52 PM, Peter <thenephilim13@yahoo.com> wrote:
>
>> Thank you Patrick.
>>
>> I took a quick stab at it:
>>
>>     val s3Client = new AmazonS3Client(...)
>>     val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
>> "/part-00000", "rolled-up-logs", "2014-04-28.csv")
>>     val objectListing = s3Client.listObjects("upload", outputPrefix)
>>     s3Client.deleteObjects(new
>> DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
>> => new KeyVersion(s.getKey)).asJava))
>>
>>  Using a 3GB object I achieved about 33MB/s between buckets in the same
>> AZ.
>>
>> This is a workable solution for the short term but not ideal for the
>> longer term as data size increases. I understand it's a limitation of the
>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
>> object :)
>>
>>   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell <
>> pwendell@gmail.com> wrote:
>>  This is a consequence of the way the Hadoop files API works. However,
>> you can (fairly easily) add code to just rename the file because it
>> will always produce the same filename.
>>
>> (heavy use of pseudo code)
>>
>> dir = "/some/dir"
>> rdd.coalesce(1).saveAsTextFile(dir)
>> f = new File(dir + "part-00000")
>> f.moveTo("somewhere else")
>> dir.remove()
>>
>> It might be cool to add a utility called `saveAsSingleFile` or
>> something that does this for you. In fact probably we should have
>> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>>
>> On Wed, Apr 30, 2014 at 2:00 PM, Peter <thenephilim13@yahoo.com> wrote:
>> > Thanks Nicholas, this is a bit of a shame, not very practical for log
>> roll
>> > up for example when every output needs to be in it's own "directory".
>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>> > <nicholas.chammas@gmail.com> wrote:
>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>> > coalesce(1), you move everything in the RDD to a single partition, which
>> > then gives you 1 output file.
>> > It will still be called part-00000 or something like that because that's
>> > defined by the Hadoop API that Spark uses for reading to/writing from
>> S3. I
>> > don't know of a way to change that.
>> >
>> >
>> > On Wed, Apr 30, 2014 at 2:47 PM, Peter <thenephilim13@yahoo.com> wrote:
>> >
>> > Ah, looks like RDD.coalesce(1) solves one part of the problem.
>> > On Wednesday, April 30, 2014 11:15 AM, Peter <thenephilim13@yahoo.com>
>> > wrote:
>> > Hi
>> >
>> > Playing around with Spark & S3, I'm opening multiple objects (CSV files)
>> > with:
>> >
>> >    val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>> >
>> > so hfile is a RDD representing 10 objects that were "underneath"
>> 2014-04-28.
>> > After I've sorted and otherwise transformed the content, I'm trying to
>> write
>> > it back to a single object:
>> >
>> >
>> >
>> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>> >
>> > unfortunately this results in a "folder" named concatted.csv with 10
>> objects
>> > underneath, part-00000 .. part-00010, corresponding to the 10 original
>> > objects loaded.
>> >
>> > How can I achieve the desired behaviour of putting a single object named
>> > concatted.csv ?
>> >
>> > I've tried 0.9.1 and 1.0.0-RC3.
>> >
>> > Thanks!
>> > Peter
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>>
>>
>

Mime
View raw message