spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Grega Kešpret <gr...@celtra.com>
Subject Re: Large input file problem
Date Mon, 14 Oct 2013 10:03:46 GMT
I've tried using bzip2, but even with this method, when I do
sc.textFile("s3n://.../input.bz2", minSplits) for whatever value of
minSplits greater than 1, it doesn't seem to be able to process it in more
partitions:

scala> val logs = sc.textFile("s3n://.../input.bz2", 10)
scala> logs.toDebugString
13/10/14 09:55:42 INFO mapred.FileInputFormat: Total input paths to process
: 1
res12: String =
MappedRDD[287] at textFile at <console>:21 (1 partitions)
  HadoopRDD[286] at textFile at <console>:21 (1 partitions)

I'm using Spark v0.8.0-incubating.



Grega
--
[image: Inline image 1]
*Grega Kešpret*
Analytics engineer

Celtra — Rich Media Mobile Advertising
celtra.com <http://www.celtra.com/> |
@celtramobile<http://www.twitter.com/celtramobile>


On Sun, Oct 13, 2013 at 2:10 AM, Grega Kešpret <grega@celtra.com> wrote:

> Thanks a lot, didn't know this. If I use some other compresion format that
> supports splitting (like bzip2), do I get decompression for free when I do
> sc.textFile (like with gzipped files)?
>
> Grega
> --
> [image: Inline image 1]
> *Grega Kešpret*
> Analytics engineer
>
> Celtra — Rich Media Mobile Advertising
> celtra.com <http://www.celtra.com/> | @celtramobile<http://www.twitter.com/celtramobile>
>
>
> On Sun, Oct 13, 2013 at 2:07 AM, Mark Hamstra <mark@clearstorydata.com>wrote:
>
>> The basic problem that you are running into is that gzipped file is not
>> splittable<https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-4/compression#8ca1fda1252b67145680b3a5e9d45b2a>
>> .
>>
>>
>> On Sat, Oct 12, 2013 at 4:51 PM, Grega Kešpret <grega@celtra.com> wrote:
>>
>>> Hi,
>>>
>>> I'm getting Java OOM (Heap, GC overhead exceeded), Futures timed out
>>> after [10000] milliseconds, removing BlockManager with no recent heartbeat
>>> etc. I have narrowed down the cause to be a big input file from S3. I'm
>>> trying to make Spark split this file to several smaller chunks, so each of
>>> these chunks will fit in memory, but I'm out of luck.
>>>
>>> I have tried:
>>> - passing minSplits parameter to something greater than 1 in sc.textFile
>>> - increasing parameter numPartitions to groupByKey
>>> - using coalesce with numPartitions greater than 1 and shuffle = true
>>>
>>> Basically my flow is like this:
>>> val input = sc.textFile("s3n://.../input.gz", minSplits)
>>> input
>>>   .mapPartitions(l => (key, l))
>>>   .groupByKey(numPartitions)
>>>   .map(...)
>>>   .saveAsTextFile
>>>
>>> If I do input.toDebugString, I always have 1 partition (even if the
>>> minSplits is greater than 1). It seems like Spark is trying to ingest the
>>> whole input at once. When I manually split the file into several smaller
>>> ones, I was able to progress successfully, and input.toDebugString was
>>> showing 10 partitions in case of 10 files.
>>>
>>> Thanks,
>>>
>>> Grega
>>>
>>
>>
>

Mime
View raw message