spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicholas Chammas <nicholas.cham...@gmail.com>
Subject Re: Reading multiple S3 objects, transforming, writing back one
Date Sun, 04 May 2014 15:49:29 GMT
Chris,

To use s3distcp in this case, are you suggesting saving the RDD to
local/ephemeral HDFS and then copying it up to S3 using this tool?


On Sat, May 3, 2014 at 7:14 PM, Chris Fregly <chris@fregly.com> wrote:

> not sure if this directly addresses your issue, peter, but it's worth
> mentioned a handy AWS EMR utility called s3distcp that can upload a single
> HDFS file - in parallel - to a single, concatenated S3 file once all the
> partitions are uploaded.  kinda cool.
>
> here's some info:
>
>
> http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html
>
>
> s3distcp is an extension of the familiar hadoop distcp, of course.
>
>
> On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> The fastest way to save to S3 should be to leave the RDD with many
>> partitions, because all partitions will be written out in parallel.
>>
>> Then, once the various parts are in S3, somehow concatenate the files
>> together into one file.
>>
>> If this can be done within S3 (I don't know if this is possible), then
>> you get the best of both worlds: a highly parallelized write to S3, and a
>> single cleanly named output file.
>>
>>
>> On Thu, May 1, 2014 at 12:52 PM, Peter <thenephilim13@yahoo.com> wrote:
>>
>>> Thank you Patrick.
>>>
>>> I took a quick stab at it:
>>>
>>>     val s3Client = new AmazonS3Client(...)
>>>     val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
>>> "/part-00000", "rolled-up-logs", "2014-04-28.csv")
>>>     val objectListing = s3Client.listObjects("upload", outputPrefix)
>>>     s3Client.deleteObjects(new
>>> DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
>>> => new KeyVersion(s.getKey)).asJava))
>>>
>>>  Using a 3GB object I achieved about 33MB/s between buckets in the same
>>> AZ.
>>>
>>> This is a workable solution for the short term but not ideal for the
>>> longer term as data size increases. I understand it's a limitation of the
>>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
>>> object :)
>>>
>>>   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell <
>>> pwendell@gmail.com> wrote:
>>>  This is a consequence of the way the Hadoop files API works. However,
>>> you can (fairly easily) add code to just rename the file because it
>>> will always produce the same filename.
>>>
>>> (heavy use of pseudo code)
>>>
>>> dir = "/some/dir"
>>> rdd.coalesce(1).saveAsTextFile(dir)
>>> f = new File(dir + "part-00000")
>>> f.moveTo("somewhere else")
>>> dir.remove()
>>>
>>> It might be cool to add a utility called `saveAsSingleFile` or
>>> something that does this for you. In fact probably we should have
>>> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>>>
>>> On Wed, Apr 30, 2014 at 2:00 PM, Peter <thenephilim13@yahoo.com> wrote:
>>> > Thanks Nicholas, this is a bit of a shame, not very practical for log
>>> roll
>>> > up for example when every output needs to be in it's own "directory".
>>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>>> > <nicholas.chammas@gmail.com> wrote:
>>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>>> > coalesce(1), you move everything in the RDD to a single partition,
>>> which
>>> > then gives you 1 output file.
>>> > It will still be called part-00000 or something like that because
>>> that's
>>> > defined by the Hadoop API that Spark uses for reading to/writing from
>>> S3. I
>>> > don't know of a way to change that.
>>> >
>>> >
>>> > On Wed, Apr 30, 2014 at 2:47 PM, Peter <thenephilim13@yahoo.com>
>>> wrote:
>>> >
>>> > Ah, looks like RDD.coalesce(1) solves one part of the problem.
>>> > On Wednesday, April 30, 2014 11:15 AM, Peter <thenephilim13@yahoo.com>
>>> > wrote:
>>> > Hi
>>> >
>>> > Playing around with Spark & S3, I'm opening multiple objects (CSV
>>> files)
>>> > with:
>>> >
>>> >    val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>>> >
>>> > so hfile is a RDD representing 10 objects that were "underneath"
>>> 2014-04-28.
>>> > After I've sorted and otherwise transformed the content, I'm trying to
>>> write
>>> > it back to a single object:
>>> >
>>> >
>>> >
>>> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>>> >
>>> > unfortunately this results in a "folder" named concatted.csv with 10
>>> objects
>>> > underneath, part-00000 .. part-00010, corresponding to the 10 original
>>> > objects loaded.
>>> >
>>> > How can I achieve the desired behaviour of putting a single object
>>> named
>>> > concatted.csv ?
>>> >
>>> > I've tried 0.9.1 and 1.0.0-RC3.
>>> >
>>> > Thanks!
>>> > Peter
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>>
>>>
>>
>

Mime
View raw message