spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Grega Kešpret <gr...@celtra.com>
Subject Re: Large input file problem
Date Mon, 14 Oct 2013 11:45:27 GMT
Ah, I think I finally got this. Spark v0.8.0-incubating uses Hadoop 1.0.4
by default. I needed to compile it with "SPARK_HADOOP_VERSION=1.1.0 sbt/sbt
assembly", as this fix is only available from Hadoop 1.1.0 on.

http://hadoop.apache.org/releases.html#13+October%2C+2012%3A+Release+1.1.0+available
lists "Splittable bzip2 files" under bug fixes.

Grega


On Mon, Oct 14, 2013 at 12:03 PM, Grega Kešpret <grega@celtra.com> wrote:

> I've tried using bzip2, but even with this method, when I do
> sc.textFile("s3n://.../input.bz2", minSplits) for whatever value of
> minSplits greater than 1, it doesn't seem to be able to process it in more
> partitions:
>
> scala> val logs = sc.textFile("s3n://.../input.bz2", 10)
> scala> logs.toDebugString
> 13/10/14 09:55:42 INFO mapred.FileInputFormat: Total input paths to
> process : 1
> res12: String =
> MappedRDD[287] at textFile at <console>:21 (1 partitions)
>   HadoopRDD[286] at textFile at <console>:21 (1 partitions)
>
> I'm using Spark v0.8.0-incubating.
>
>
>
> Grega
> --
> [image: Inline image 1]
> *Grega Kešpret*
> Analytics engineer
>
> Celtra — Rich Media Mobile Advertising
> celtra.com <http://www.celtra.com/> | @celtramobile<http://www.twitter.com/celtramobile>
>
>
> On Sun, Oct 13, 2013 at 2:10 AM, Grega Kešpret <grega@celtra.com> wrote:
>
>> Thanks a lot, didn't know this. If I use some other compresion format
>> that supports splitting (like bzip2), do I get decompression for free when
>> I do sc.textFile (like with gzipped files)?
>>
>> Grega
>> --
>> [image: Inline image 1]
>> *Grega Kešpret*
>> Analytics engineer
>>
>> Celtra — Rich Media Mobile Advertising
>> celtra.com <http://www.celtra.com/> | @celtramobile<http://www.twitter.com/celtramobile>
>>
>>
>> On Sun, Oct 13, 2013 at 2:07 AM, Mark Hamstra <mark@clearstorydata.com>wrote:
>>
>>> The basic problem that you are running into is that gzipped file is not
>>> splittable<https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-4/compression#8ca1fda1252b67145680b3a5e9d45b2a>
>>> .
>>>
>>>
>>> On Sat, Oct 12, 2013 at 4:51 PM, Grega Kešpret <grega@celtra.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm getting Java OOM (Heap, GC overhead exceeded), Futures timed out
>>>> after [10000] milliseconds, removing BlockManager with no recent heartbeat
>>>> etc. I have narrowed down the cause to be a big input file from S3. I'm
>>>> trying to make Spark split this file to several smaller chunks, so each of
>>>> these chunks will fit in memory, but I'm out of luck.
>>>>
>>>> I have tried:
>>>> - passing minSplits parameter to something greater than 1 in sc.textFile
>>>> - increasing parameter numPartitions to groupByKey
>>>> - using coalesce with numPartitions greater than 1 and shuffle = true
>>>>
>>>> Basically my flow is like this:
>>>> val input = sc.textFile("s3n://.../input.gz", minSplits)
>>>> input
>>>>   .mapPartitions(l => (key, l))
>>>>   .groupByKey(numPartitions)
>>>>   .map(...)
>>>>   .saveAsTextFile
>>>>
>>>> If I do input.toDebugString, I always have 1 partition (even if the
>>>> minSplits is greater than 1). It seems like Spark is trying to ingest the
>>>> whole input at once. When I manually split the file into several smaller
>>>> ones, I was able to progress successfully, and input.toDebugString was
>>>> showing 10 partitions in case of 10 files.
>>>>
>>>> Thanks,
>>>>
>>>> Grega
>>>>
>>>
>>>
>>
>

Mime
View raw message