hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Enis Söztutar <e...@hortonworks.com>
Subject Re: Map Reduce with multiple scans
Date Thu, 28 Feb 2013 02:57:52 GMT
There is a

MultiTableInputFormat that has been recently added to HBase. You might want
to take a look at it.

https://github.com/apache/hbase/blob/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java

Enis


On Wed, Feb 27, 2013 at 8:17 AM, Paul van Hoven <
paul.van.hoven@googlemail.com> wrote:

> Thanks for your answers. I ended up by extending
> org.apache.hadoop.hbase.mapreduce.TableInputFormat and overwriting the
> split method and passing it to the map reduce job the following way:
>
> public class TimeRangeTableInputFormat extends TableInputFormat {
>
>         @Override
>         public List<InputSplit> getSplits( JobContext context ) throws
> IOException
>         {
>                 try {
>                         List<InputSplit> splits = new
> ArrayList<InputSplit>();
>                         Scan scan = getScan();
>
>                         //startrow and endrow must be a string as bytes in
> the format 2013-01-28
>                         byte startRow[] = scan.getStartRow();
>                         byte stopRow[] = scan.getStopRow();
>
>                         //For each date in the span, we are going to
> create a new scan object
>                         SimpleDateFormat dateFormatter = new
> SimpleDateFormat("yyyy-MM-dd");
>                         Date startDate = dateFormatter.parse(
> Bytes.toString( startRow ) );
>                         Date endDate = dateFormatter.parse(
> Bytes.toString( stopRow ) );
>
>                         for( Date iterDate = startDate;
> iterDate.compareTo(endDate) <= 0;
> iterDate = Utils.addDays( iterDate, 1 ) ) {
>
>                                 //since the dates in the row keys are
> stored using md5
>                                 byte[] md5Key = Utils.md5(
> dateFormatter.format(iterDate) );
>                                 int md5Length = 16;
>                                 int longLength = 8;
>
>                                 byte[] subStartRow = Bytes.padTail(
> md5Key, longLength ); //append
> "0 0 0 0 0 0 0 0"
>                                 byte[] subEndRow   = Bytes.padTail(
> md5Key, longLength );
>                                 subEndRow[md5Length-1]++; //last byte gets
> counted up
>
>                                 scan.setStartRow(subStartRow);
>                                 scan.setStopRow(subEndRow);
>                                 setScan(scan);
>
>                                 for (InputSplit subSplit :
> super.getSplits(context))
>                                         splits.add((InputSplit)
> ReflectionUtils.copy( context.getConfiguration(),
>
>     (TableSplit) subSplit, new TableSplit() ) );
>                         }
>
>                         return splits;
>
>                 } catch( Exception e ) {
>                         e.printStackTrace();
>                         return null;
>                 }
>         }
>
> }
>
> This way I get a new scan object for every day. And although I'm using
> md5 keys as a prefix in my rowkeys I can still scan ranges this way.
>
> Some questions remain:
> 1. What is your opinion about this approach?
> 2. @Nick: I've read somewhere that a filter list would be less
> efficient that overwriting the split method. What do you think?
>
>
> 2013/2/26 Nick Dimiduk <ndimiduk@gmail.com>:
> > Hi Paul,
> >
> > You want to run multiple scans so that you can filter the previous scan
> > results? Am I correct in my understanding of your objective?
> >
> > First, I suggest you use the PrefixFilter [0] instead of constructing the
> > rowkey prefix manually. This looks something like:
> >
> > byte[] md5Key = Utils.md5( "2013-01-07" );
> > Scan scan = new Scan(md5Key);
> > scan.setFilter(new PrefixFilter(md5Key));
> >
> > Yes, that's a bit redundant, but setting the startkey explicitly will
> save
> > you some unnecessary processing.
> >
> > This map reduce job works fine but this is just one scan job for this map
> >> reduce task. What do I have to do to pass multiple scans?
> >
> >
> > Do you mean processing on multiple dates? In that case, what you really
> > want is a full (unbounded) table scan. Since date is the first part of
> your
> > compound rowkey, there's no prefix and no need for a filter, just use new
> > Scan().
> >
> > In general, you can use multiple filters in a given Scan (or Get). See
> the
> > FilterList [1] for details.
> >
> > Does this help?
> > Nick
> >
> > [0]:
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PrefixFilter.html
> > [1]:
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/FilterList.html
> >
> > On Tue, Feb 26, 2013 at 5:41 AM, Paul van Hoven <
> > paul.van.hoven@googlemail.com> wrote:
> >
> >> My rowkeys look something like this:
> >>
> >> md5( date ) + md5( ip address )
> >>
> >> So an example would be
> >> md5( "2013-02-08") + md5( "192.168.187.2")
> >>
> >> For one particular date I got several rows. Now I'd like to query
> >> different dates, for example "2013-01-01" and "2013-02-01" and some
> >> other. Additionally I'd like to perform this or these scans in a map
> >> reduce job.
> >>
> >> Currently my map reduce job looks like this:
> >>
> >> Configuration config = HBaseConfiguration.create();
> >> Job job = new Job(config,"ToyJob");
> >> job.setJarByClass( PlayWithMapReduce.class );
> >>
> >> byte[] md5Key = Utils.md5( "2013-01-07" );
> >> int md5Length = 16;
> >> int longLength = 8;
> >>
> >> byte[] startRow = Bytes.padTail( md5Key, longLength ); //append "0 0 0
> >> 0 0 0 0 0"
> >> byte[] endRow = Bytes.padTail( md5Key, longLength );
> >> endRow[md5Length-1]++; //last byte gets counted up
> >>
> >> Scan scan = new Scan( startRow, endRow );
> >> scan.setCaching(500);
> >> scan.setCacheBlocks(false);
> >>
> >> Filter f = new SingleColumnValueFilter( Bytes.toBytes("CF"),
> >> Bytes.toBytes("creativeId"), CompareOp.EQUAL, Bytes.toBytes("100") );
> >> scan.setFilter(f);
> >>
> >> String tableName = "ToyDataTable";
> >> TableMapReduceUtil.initTableMapperJob( tableName, scan, Mapper.class,
> >> null, null, job);
> >>
> >> This map reduce job works fine but this is just one scan job for this
> >> map reduce task. What do I have to do to pass multiple scans? Or do
> >> you have any other suggestions on how to achieve that goal? The
> >> constraint would be that it must be possible to combine it with map
> >> reduce.
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message