hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Kerzner <markkerz...@gmail.com>
Subject Upgrading from HBase 0.20 to 0.89 code question
Date Wed, 02 Feb 2011 02:36:15 GMT
Hi,

below is a textbook example of using HBase from a MapReduce job. I am trying
to rewrite it in the 0.89 API, and I have not succeeded yet. Can
anyone please give me some pointers?

Thank you very much. Sincerely,
Mark

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MailboxIdCount extends Configured implements Tool {
    // Name of this 'program'

    static final String NAME = "mailboxid-counter";
    private static IntWritable ONE = new IntWritable(1);

    static class RowCounterMapper
            implements TableMap<Text, IntWritable> {

        private static enum Counters {

            ROWS
        }

        @Override
        public void map(ImmutableBytesWritable row, RowResult value,
                OutputCollector<Text, IntWritable> output,
                Reporter reporter)
                throws IOException {
            for (Map.Entry<byte[], Cell> e : value.entrySet()) {
                Cell cell = e.getValue();
                if (cell != null && cell.getValue().length > 0) {
                    Text text = new Text(cell.getValue());
                    output.collect(text, ONE);
                }
            }

        }

        @Override
        public void configure(JobConf jc) {
            // Nothing to do.
        }

        @Override
        public void close() throws IOException {
            // Nothing to do.
        }
    }

    public static class RowCounterReducer
            implements TableReduce<Text, IntWritable> {

        @Override
        public void configure(JobConf jc) {
            // do nothing
        }

        @Override
        public void close() throws IOException {
            // do nothing
        }

        @Override
        public void reduce(Text k2, Iterator<IntWritable> itrtr,
OutputCollector<ImmutableBytesWritable, BatchUpdate> oc, Reporter rprtr)
throws IOException {
            int sum = 0;
            while (itrtr.hasNext()) {
                IntWritable val = itrtr.next();
                sum += val.get();
            }
            String family = "stats:";
            String familyCell = family + k2.toString();
            BatchUpdate update = new BatchUpdate(k2.toString());
            update.put(familyCell, String.valueOf(sum).getBytes());
            oc.collect(new ImmutableBytesWritable(familyCell.getBytes()),
update);
        }
    }

    public JobConf createSubmittableJob(String[] args) throws IOException {
        JobConf c = new JobConf(getConf(), getClass());
        c.setJobName(NAME);
        // Columns are space delimited
        StringBuilder sb = new StringBuilder();
        final int columnoffset = 2;
        for (int i = columnoffset; i < args.length; i++) {
            if (i > columnoffset) {
                sb.append(" ");
            }
            sb.append(args[i]);
        }
        // Second argument is the table name.
        TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
                RowCounterMapper.class, Text.class, IntWritable.class, c);
        TableMapReduceUtil.initTableReduceJob("mailbox-status",
RowCounterReducer.class, c);
        // First arg is the output directory.
        FileOutputFormat.setOutputPath(c, new Path(args[0]));
        return c;
    }

    static int printUsage() {
        System.out.println(NAME
                + " <outputdir> <tablename> <column1> [<column2>...]");
        return -1;
    }

    @Override
    public int run(final String[] args) throws Exception {
        // Make sure there are at least 3 parameters
        if (args.length < 3) {
            System.err.println("ERROR: Wrong number of parameters: " +
args.length);
            return printUsage();
        }
        JobClient.runJob(createSubmittableJob(args));
        return 0;
    }

    public static void main(String[] args) throws Exception {
        HBaseConfiguration c = new HBaseConfiguration();
        int errCode = ToolRunner.run(c, new MailboxIdCount(), args);
        System.exit(errCode);
    }
}

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message