crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Brush,Ryan" <RBR...@CERNER.COM>
Subject Re: Making crunch job output single file
Date Fri, 01 Nov 2013 18:57:55 GMT
Can you look at the output file itself to see if it is in fact compressed as expected -- (or
perhaps uncompressed or using an unexpected codec)? If it's too big to examine, you might
try recreating the issue with a significantly smaller file that can be easily inspected...whether
data is being written twice (which is hard to imagine) or if the file isn't of the expected

If all else fails, creating a very simple project (without additional dependencies) that exhibits
the problem and sharing that code could be in order.  This would simplify the problem to the
point where the root cause is much easier to identify.

On Nov 1, 2013, at 1:44 PM, Durfey,Stephen wrote:

Yes. mapred.output.compress = true for both jobs, and mapred.output.compress.codec is snappy
for both jobs.

Out of curiosity, if there was something going on with the combination process, or the possibility
of the directory being read twice, I added in some additional job counters to keep track.
The first job that wrote out the initial 14 files wrote out ~48 million records. The next
job that combined the files read in the same number of records. The number of bytes read by
the second job match the number of bytes written by the first job. However, the number of
bytes written by the second job was essentially double what was read in. Here is the code
that I wrote to combine my csv output:

    protected void mergeJobInfoFiles() {
        // statPaths are all the directories that were written to
        final TextFileSource<String> source = new TextFileSource<String>(statPaths,

        final Pipeline infoPipeline = new MRPipeline(MyClass.class, hBaseConf);
        // statsPath is a Path object that is the base directory for all output.
        infoPipeline.writeTextFile(Shard.shard(, 1), statsPath.toString()
+ "-final");

To further test if it was a compression issue I wrote some code to merge the files outside
of the crunch pipeline. This lead to the the single output file size being the same as the
size of the 14 files. So, it doesn't look like a compression issue.

If you're interested in looking at that code, I added it on pastebin, as it would be a little
lengthy to add to this email (our mapred.output.compression.codec is currently set to Snappy).<>

Stephen Durfey
Software Engineer|The Record
816-201-2689 |<>

From: Josh Wills <<>>
Reply-To: "<>" <<>>
Date: Friday, November 1, 2013 12:23 PM
To: "<>" <<>>
Subject: Re: Making crunch job output single file

And the other settings look fine-- mapred.output.compress and mapred.output.compression.codec?

On Fri, Nov 1, 2013 at 7:30 AM, Durfey,Stephen <<>>
Checking the job.xml on job tracker, the mapred.output.compression.type
for both the original output and the combined output (a separate job) are
both set at BLOCK level compression.

Stephen Durfey
Software Engineer|The Record
816-201-2689<tel:816-201-2689> |<>

On 11/1/13 12:31 AM, "Gabriel Reid" <<>>

>It sounds like this could be down to block-level vs record-level
>compression -- could you check that mapred.output.compression.type was
>set to the same thing (should probably be BLOCK) in both cases?
>On Thu, Oct 31, 2013 at 7:57 PM, Josh Wills <<>>
>> That's surprising-- I know that the block size can matter for
>> files w/Snappy, but I don't know of any similar issues or settings that
>> to be in place for text.
>> On Thu, Oct 31, 2013 at 11:38 AM, Durfey,Stephen
>> wrote:
>>> Coincidentally enough, yesterday I was also looking into a way to merge
>>> csv output files into one larger csv output files to prevent
>>>cluttering up
>>> the namenode with many smaller csv files.
>>> Background:
>>> In our crunch pipeline we are capturing context information about
>>> we encountered, and then writing them out to csv files. The csv files
>>> themselves are just a side effect of our processing and not the main
>>> and they are written out from our map tasks, before the data we did
>>> is bulk loaded into hbase. The output of these csv files is compressed
>>> snappy.
>>> Problem:
>>> I ran the pipeline against one of our data sources and it produced 14
>>> different snappy compressed csv files, totaling 4.6GB. After the job
>>> finished I created a new TextFileSource that would point to the
>>>directory in
>>> hdfs that contained the 14 files, and using Shard, set the number of
>>> partitions to 1 to write everything out to one file. The new file size
>>> the combination is 11.6GB, compressed as snappy.  It's not clear to me
>>> the file size would almost triple.  Any ideas?
>>> Thanks,
>>> Stephen
>>> From: Som Satpathy <<>>
>>> Reply-To: "<>" <<>>
>>> Date: Wednesday, October 30, 2013 5:36 PM
>>> To: "<>" <<>>
>>> Subject: Re: Making crunch job output single file
>>> Thanks for the help Josh!
>>> On Wed, Oct 30, 2013 at 2:37 PM, Josh Wills <<>>
>>>> Best guess is that the input data is compressed, but the output data
>>>> not- Crunch does not turn it on by default.
>>>> On Oct 30, 2013 4:56 PM, "Som Satpathy" <<>>
>>>>> May be we can expect the csv to size up by that much compared to the
>>>>> input sequence file, just wanted to confirm if I'm using the shard()
>>>>> correctly.
>>>>> Thanks,
>>>>> Som
>>>>> On Wed, Oct 30, 2013 at 1:46 PM, Som Satpathy <<>>
>>>>> wrote:
>>>>>> Hi Josh,
>>>>>> Thank you for the input. I incorporated Shard in the mrpipeline,
>>>>>> time I get a one output csv part-r file, but interestingly the file
>>>>>>size is
>>>>>> much bigger than the input sequence file size.
>>>>>> The input sequence file size is around 11GB and the final csv turns
>>>>>> to be 65GB in size.
>>>>>> Let me explain what I'm trying to do. This is my mrpipeline:
>>>>>> Pcollection<T> collection1 =
>>>>>> PCollection<T> collection2 = collection1.filter(filterFn1())
>>>>>> PCollection<T> collection3 = collection2.filter(filterFn2())
>>>>>> PCollection<T> collection4 = collection3.parallelDo(doFn3())
>>>>>> PCollection<T> finalShardedCollection = Shard.shard(collection4,1)
>>>>>> pipeline.writeTextFile(finalShardedCollection, csvFilePath)
>>>>>> pipeline.done()
>>>>>> Am I using the shard correctly? It is weird that the output file
>>>>>> is much bigger than the input file.
>>>>>> Look forward to hear from you.
>>>>>> Thanks,
>>>>>> Som
>>>>>> On Wed, Oct 30, 2013 at 8:14 AM, Josh Wills <<>>
>>>>>> wrote:
>>>>>>> Hey Som,
>>>>>>> Check out org.apache.crunch.lib.Shard, it does what you want.
>>>>>>> J
>>>>>>> On Wed, Oct 30, 2013 at 8:05 AM, Som Satpathy
>>>>>>> wrote:
>>>>>>>> Hi all,
>>>>>>>> I have a crunch job that should process a big sequence file
>>>>>>>> produce a single csv file. I am using the
>>>>>>>> "pipeline.writeTextFile(transformedRecords, csvFilePath)"
>>>>>>>>write to a csv.
>>>>>>>> (csvFilePath is like "/data/csv_directory"). The larger the
>>>>>>>> file is, more number of mappers are being created and thus
>>>>>>>>equivalent number
>>>>>>>> of csv output files are being created.
>>>>>>>> In classic mapreduce one could output a single file by setting
>>>>>>>> #reducers to 1 while configuring the job. How could I achieve
>>>>>>>>this with
>>>>>>>> crunch?
>>>>>>>> I would really appreciate any help here.
>>>>>>>> Thanks,
>>>>>>>> Som
>>>>>>> --
>>>>>>> Director of Data Science
>>>>>>> Cloudera
>>>>>>> Twitter: @josh_wills
>>> CONFIDENTIALITY NOTICE This message and any included attachments are
>>> Cerner Corporation and are intended only for the addressee. The
>>> contained in this message is confidential and may constitute inside or
>>> non-public information under international, federal, or state
>>> laws. Unauthorized forwarding, printing, copying, distribution, or use
>>> such information is strictly prohibited and may be unlawful. If you
>>>are not
>>> the addressee, please promptly delete this message and notify the
>>>sender of
>>> the delivery error by e-mail or you may call Cerner's corporate
>>>offices in
>>> Kansas City, Missouri, U.S.A at (+1) (816)221-1024<tel:%28%2B1%29%20%28816%29221-1024>.
>> --
>> Director of Data Science
>> Cloudera
>> Twitter: @josh_wills

Director of Data Science
Twitter: @josh_wills<>

View raw message