hadoop-mapreduce-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pedro Costa <psdc1...@gmail.com>
Subject Re: Understanding the MapOutput
Date Fri, 04 Nov 2011 18:10:37 GMT
I've looked to the MapOutputServlet class. But the problem is the following:

MapOutput can be compressed or not. When I'm talking about uncompressed
mapoutput, using the index mechanism of the MapOutputServlet, it works for
me. The map tasks generates digests for each partition, and it match with
the digests produce by the reduce.

Let me explain what I've updated in the code of MR at my own version. A map
task (MT) is producing a digest for each partition of data generated. So,
if MT1 produces 2 partitions, on uncompressed data, it produces Hash1 and
Hash2.

Now, when a reduce task (RT) fetch the map output, it will generate another
digest using the index mechanism of the MapOutputServlet and compares with
the respective digest generated by the map task.

As you can see in my explanation, when I'm talking about uncompressed map
output, the index mechanism is really useful.

But I've also tried to do the same with compressed map output. And it
doesn't work. That's the reason that I'm trying now with the IFile.Reader
class.

As you can see, I'm in a big dilemma and  I don't know what to do.

I will show you my code. This 2 methods are trying to generate digests from
the map and the reduce side. At the end, they give different results, and I
don't know why. These 2 methods  are my first tentative to generate digests
from compressed map output


[code]
// this method is trying to generate a digest from the compressed map
output on the map side.
public synchronized String generateHash(FileSystem fs, Path filename,
Decompressor decompressor, int offset, int mapOutputLength) {
 LOG.debug("Opening file2: " + filename);

MessageDigest md = null;
 String digest = null;
DecompressorStream decodec = null;
FSDataInputStream input = null;

try {
input = fs.open(filename);
decodec = new DecompressorStream(input, decompressor);
 md = MessageDigest.getInstance("SHA-1");
System.out.println("ABC");
 byte[] buffer;
int size;
while (mapOutputLength > 0) {
 // the case that the bytes read is small the the default size.
// We don't want that the message digest contains trash.
 size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024);
System.out.println("mapOutputLength: " + mapOutputLength + " Size: " +
size);

if(size == 0)
break;

buffer = new byte[size];
 size = decodec.read(buffer, offset, size);
System.out.println("read: " + size  + "\ndata: " + new String(buffer));
 mapOutputLength -= size;

if(size > 0)
md.update(buffer);
 else
if(size == -1)
break;
 }
System.out.println("DFG");
digest = hashIt(md);
 } catch (NoSuchAlgorithmException e) {
//TODO Auto-generated catch block
 e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
 e.printStackTrace();
} finally {
if(input!= null)
 try {
input.close();
} catch (IOException e) {
 // TODO Auto-generated catch block
e.printStackTrace();
}
 }

return digest;
}
[/code]



[code]
// this method is trying to generate the digest from the map output
compressed sent by the reduce
public synchronized String generateHash(byte[] data, Decompressor
decompressor, int offset, int mapOutputLength) {
 MessageDigest md = null;
String digest = null;
DecompressorStream decodec = null;
 ByteArrayInputStream bis = null;
try {
bis = new ByteArrayInputStream(data);
 decodec = new DecompressorStream(bis, decompressor);
md = MessageDigest.getInstance("SHA-1");

int size;
byte[] buffer;
while (mapOutputLength > 0) {
 // the case that the bytes read is small the the default size.
// We don't want that the message digest contains trash.
 size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024);

if(size == 0)
 break;

buffer = new byte[size];
decodec.read(buffer, offset, size);
 md.update(buffer);

mapOutputLength -= size;
}

digest = hashIt(md);
} catch (NoSuchAlgorithmException e) {
 // TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
 // TODO Auto-generated catch block
e.printStackTrace();
} finally {
 if(bis!= null)
try {
bis.close();
 } catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
 }
}

return digest;
 }
[/code]


2011/11/4 Todd Lipcon <todd@cloudera.com>

> On Fri, Nov 4, 2011 at 10:04 AM, Pedro Costa <psdc1978@gmail.com> wrote:
> > 1- I think that IFIle.reader can only read the whole map output file. I
> > want to read a partition of the map output. How can I do that? How do I
> set
> > the size of a partition in the I
>
> Look at the code for MapOutputServlet - it uses the index mechanism to
> find a particular partition.
>
> >
> > 2 - I know that map output is composed by blocks. What is the size of a
> > block? Is it 64MB by default?
>
> Nope, it doesn't use blocks. That's HDFS you're thinking of.
>
> -Todd
>
> > 2011/11/4 Todd Lipcon <todd@cloudera.com>
> >
> >> Hi Pedro,
> >>
> >> The format is called IFile. Check out the source for more info on the
> >> format - it's fairly simple. The partition starts are recorded in a
> >> separate index file next to the output file.
> >>
> >> I don't think you'll find significant docs on this format since it's
> >> MR-internal - the code is your best resource.
> >>
> >> -Todd
> >>
> >> On Fri, Nov 4, 2011 at 8:37 AM, Pedro Costa <psdc1978@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > I'm trying to understand the structure of the map output file. Here's
> an
> >> > example of a mapoutput file that contains 2 partitions:
> >> >
> >> > [code]
> >> > <FF><FF><FF><FF>^@^@716banana banana apple banana
carrot carrot apple
> >> > banana 0apple carrot carrot carrot banana carrot carrot 5^N4carrot
> apple
> >> > carrot apple apple carrot banana apple ^Mbanana apple
> >> <FF><FF><DF>|<8E><B7>
> >> > [/code]
> >> >
> >> > 1 - I would like to understand what are the ASCII characters parts.
> What
> >> > they means?
> >> >
> >> > 2 - What type of file is a map output? Is it a
> SequenceFileOutputFormat,
> >> or
> >> > a TextOutputFormat?
> >> >
> >> > 3 - I've a small program that runs independently of the MR that has
> the
> >> > goal to digest each partition and give the correspondent hash. How do
> I
> >> > know where each partition starts?
> >> >
> >> >
> >> > --
> >> > Thanks,
> >> > PSC
> >> >
> >>
> >>
> >>
> >> --
> >> Todd Lipcon
> >> Software Engineer, Cloudera
> >>
> >
> >
> >
> > --
> > Thanks,
> >
>
>
>
> --
> Todd Lipcon
> Software Engineer, Cloudera
>



-- 
Thanks,

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message