phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkrish86 <...@git.apache.org>
Subject [GitHub] phoenix pull request: Phoenix-180
Date Thu, 11 Sep 2014 11:32:36 GMT
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/12#discussion_r17415867
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
---
    @@ -117,121 +125,59 @@ public boolean apply(HRegionLocation location) {
         }
     
         protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions)
{
    -        if (regions.isEmpty()) {
    -            return Collections.emptyList();
    +        if (regions.isEmpty()) { return Collections.emptyList(); }
    +        List<PColumnFamily> columnFamilies = this.tableRef.getTable().getColumnFamilies();
    +        // Collect all the guide posts across families. Sort them and then create a key
range that starts 
    +        // from [] to [].  Then intersect it with the region boundary
    +        List<KeyRange> regionStartEndKey = Lists.newArrayListWithExpectedSize(regions.size());
    +        for (HRegionLocation region : regions) {
    +            regionStartEndKey.add(KeyRange.getKeyRange(region.getRegionInfo().getStartKey(),
region.getRegionInfo()
    +                    .getEndKey()));
             }
    -        
    -        StatsManager statsManager = context.getConnection().getQueryServices().getStatsManager();
    -        // the splits are computed as follows:
    -        //
    -        // let's suppose:
    -        // t = target concurrency
    -        // m = max concurrency
    -        // r = the number of regions we need to scan
    -        //
    -        // if r >= t:
    -        //    scan using regional boundaries
    -        // elif r > t/2:
    -        //    split each region in s splits such that:
    -        //    s = max(x) where s * x < m
    -        // else:
    -        //    split each region in s splits such that:
    -        //    s = max(x) where s * x < t
    -        //
    -        // The idea is to align splits with region boundaries. If rows are not evenly
    -        // distributed across regions, using this scheme compensates for regions that
    -        // have more rows than others, by applying tighter splits and therefore spawning
    -        // off more scans over the overloaded regions.
    -        int splitsPerRegion = getSplitsPerRegion(regions.size());
    -        // Create a multi-map of ServerName to List<KeyRange> which we'll use to
round robin from to ensure
    -        // that we keep each region server busy for each query.
    -        ListMultimap<HRegionLocation,KeyRange> keyRangesPerRegion = ArrayListMultimap.create(regions.size(),regions.size()
* splitsPerRegion);;
    -        if (splitsPerRegion == 1) {
    -            for (HRegionLocation region : regions) {
    -                keyRangesPerRegion.put(region, ParallelIterators.TO_KEY_RANGE.apply(region));
    -            }
    -        } else {
    -            // Maintain bucket for each server and then returns KeyRanges in round-robin
    -            // order to ensure all servers are utilized.
    -            for (HRegionLocation region : regions) {
    -                byte[] startKey = region.getRegionInfo().getStartKey();
    -                byte[] stopKey = region.getRegionInfo().getEndKey();
    -                boolean lowerUnbound = Bytes.compareTo(startKey, HConstants.EMPTY_START_ROW)
== 0;
    -                boolean upperUnbound = Bytes.compareTo(stopKey, HConstants.EMPTY_END_ROW)
== 0;
    -                /*
    -                 * If lower/upper unbound, get the min/max key from the stats manager.
    -                 * We use this as the boundary to split on, but we still use the empty
    -                 * byte as the boundary in the actual scan (in case our stats are out
    -                 * of date).
    -                 */
    -                if (lowerUnbound) {
    -                    startKey = statsManager.getMinKey(tableRef);
    -                    if (startKey == null) {
    -                        keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
    -                        continue;
    -                    }
    -                }
    -                if (upperUnbound) {
    -                    stopKey = statsManager.getMaxKey(tableRef);
    -                    if (stopKey == null) {
    -                        keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
    -                        continue;
    -                    }
    -                }
    -                
    -                byte[][] boundaries = null;
    -                // Both startKey and stopKey will be empty the first time
    -                if (Bytes.compareTo(startKey, stopKey) >= 0 || (boundaries = Bytes.split(startKey,
stopKey, splitsPerRegion - 1)) == null) {
    -                    // Bytes.split may return null if the key space
    -                    // between start and end key is too small
    -                    keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
    -                } else {
    -                    keyRangesPerRegion.put(region,KeyRange.getKeyRange(lowerUnbound ?
KeyRange.UNBOUND : boundaries[0], boundaries[1]));
    -                    if (boundaries.length > 1) {
    -                        for (int i = 1; i < boundaries.length-2; i++) {
    -                            keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[i],
true, boundaries[i+1], false));
    +        List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size());
    +        List<byte[]> guidePostsBytes = Lists.newArrayListWithCapacity(regions.size());
    +        for (PColumnFamily fam : columnFamilies) {
    +            List<byte[]> gps = fam.getGuidePosts();
    +            if (gps != null) {
    +                for (byte[] guidePost : gps) {
    +                    PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePost);
    +                    if (array != null && array.getDimensions() != 0) {
    +                        for (int j = 0; j < array.getDimensions(); j++) {
    +                            guidePostsBytes.add(array.toBytes(j));
                             }
    -                        keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[boundaries.length-2],
true, upperUnbound ? KeyRange.UNBOUND : boundaries[boundaries.length-1], false));
                         }
                     }
                 }
             }
    -        List<KeyRange> splits = Lists.newArrayListWithCapacity(regions.size() *
splitsPerRegion);
    -        // as documented for ListMultimap
    -        Collection<Collection<KeyRange>> values = keyRangesPerRegion.asMap().values();
    -        List<Collection<KeyRange>> keyRangesList = Lists.newArrayList(values);
    -        // Randomize range order to ensure that we don't hit the region servers in the
same order every time
    -        // thus helping to distribute the load more evenly
    -        Collections.shuffle(keyRangesList);
    -        // Transpose values in map to get regions in round-robin server order. This ensures
that
    -        // all servers will be used to process the set of parallel threads available
in our executor.
    -        int i = 0;
    -        boolean done;
    -        do {
    -            done = true;
    -            for (int j = 0; j < keyRangesList.size(); j++) {
    -                List<KeyRange> keyRanges = (List<KeyRange>)keyRangesList.get(j);
    -                if (i < keyRanges.size()) {
    -                    splits.add(keyRanges.get(i));
    -                    done = false;
    +        // If the guideposts are already sorted this may not be needed. But across family
it is difficult to ensure
    +        // they are sorted
    +        Collections.sort(guidePostsBytes, Bytes.BYTES_COMPARATOR);
    +        int size = guidePostsBytes.size();
    +        if (size > 0) {
    +            if (size > 1) {
    +                guidePosts.add(KeyRange.getKeyRange(HConstants.EMPTY_BYTE_ARRAY, guidePostsBytes.get(0)));
    +                for (int i = 0; i < size - 2; i++) {
    +                    guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(i), (guidePostsBytes.get(i
+ 1))));
                     }
    +                guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(size - 2), (guidePostsBytes.get(size
- 1))));
    +                guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(size - 1), (HConstants.EMPTY_BYTE_ARRAY)));
    +            } else {
    +                byte[] gp = guidePostsBytes.get(0);
    +                guidePosts.add(KeyRange.getKeyRange(HConstants.EMPTY_BYTE_ARRAY, gp));
    +                guidePosts.add(KeyRange.getKeyRange(gp, HConstants.EMPTY_BYTE_ARRAY));
                 }
    -            i++;
    -        } while (!done);
    -        return splits;
    +
    +        }
    +        if (guidePosts.size() > 0) {
    +            List<KeyRange> intersect = KeyRange.intersect(guidePosts, regionStartEndKey);
    --- End diff --
    
    I can do that of getting one single familyl and get the guideposts for them but the above
logic of arranging the guideposts and creating keyranges from it and then intersecting from
the regions would work fine right?
    So assume there are two regions (A -D) (E - J)
    and the collected guide posts have A C E G I
    So we will create keyranges from this [] A, A C, C E, E G, G I, I [].
    And then intersect with the region boundary which will give you
    A C, C E, E G, G I, I J. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message