tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From go...@apache.org
Subject [6/9] incubator-tephra git commit: (TEPHRA-227) Add new command to get the set of regions that have not been compacted
Date Tue, 07 Mar 2017 00:34:19 GMT
(TEPHRA-227) Add new command to get the set of regions that have not been compacted

This closes #40 from GitHub.

Signed-off-by: Gokul Gunasekaran <gokul@cask.co>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/66a2fce7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/66a2fce7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/66a2fce7

Branch: refs/heads/master
Commit: 66a2fce795dec34da7b992aeda883345fbbf3083
Parents: 95c6bfb
Author: Gokul Gunasekaran <gokul@cask.co>
Authored: Wed Mar 1 14:54:40 2017 -0800
Committer: Gokul Gunasekaran <gokul@cask.co>
Committed: Mon Mar 6 13:47:14 2017 -0800

----------------------------------------------------------------------
 .../hbase/txprune/InvalidListPruningDebug.java  | 105 +++++++++++++++++--
 .../hbase/txprune/InvalidListPruningDebug.java  | 105 +++++++++++++++++--
 .../hbase/txprune/InvalidListPruningDebug.java  | 105 +++++++++++++++++--
 .../hbase/txprune/InvalidListPruningDebug.java  | 105 +++++++++++++++++--
 .../hbase/txprune/InvalidListPruningDebug.java  | 105 +++++++++++++++++--
 5 files changed, 495 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index d48e48d..620885b 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
   private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
   private static final Gson GSON = new Gson();
   private DataJanitorState dataJanitorState;
+  private HConnection connection;
+  private TableName tableName;
 
   /**
    * Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
    */
   public void initialize(final Configuration conf) throws IOException {
     LOG.debug("InvalidListPruningDebugMain : initialize method called");
-    final HConnection connection = new HBaseAdmin(conf).getConnection();
+    connection = new HBaseAdmin(conf).getConnection();
+    tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                           TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {
-        return connection.getTable(TableName.valueOf(
-          conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                   TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+        return connection.getTable(tableName);
       }
     });
   }
 
+  public void destroy() throws IOException {
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  /**
+   * Returns a set of regions that are live but are not empty nor have a prune upper bound
recorded. These regions
+   * will stop the progress of pruning.
+   *
+   * @param numRegions number of regions
+   * @return {@link Set} of regions that needs to be compacted and flushed
+   */
+  public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException
{
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (latestTimeRegion.isEmpty()) {
+      return new HashSet<>();
+    }
+
+    Long timestamp = latestTimeRegion.keySet().iterator().next();
+    SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+    SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp,
null);
+    SortedSet<String> emptyRegionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      emptyRegionNames.add(regionString);
+    }
+
+    Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+    // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set
of regions that are
+    // not empty and have not been registered prune upper bound
+    Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+    for (RegionPruneInfo prunedRegion : prunedRegions) {
+      if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+        nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+      }
+    }
+
+    if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+      return nonEmptyRegions;
+    }
+
+    Set<String> subsetRegions = new HashSet<>(numRegions);
+    for (String regionName : nonEmptyRegions) {
+      if (subsetRegions.size() == numRegions) {
+        break;
+      }
+      subsetRegions.add(regionName);
+    }
+    return subsetRegions;
+  }
+
   /**
    * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune
upper bounds.
-   * If -1 is passed in, all the regions and their prune upper bound will be returned.
+   * If -1 is passed in, all the regions and their prune upper bound will be returned. Note
that only the regions
+   * that are known to be live will be returned.
    *
    * @param numRegions number of regions
    * @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
       return new LinkedList<>();
     }
 
+    // Create a set with region names
+    Set<String> pruneRegionNameSet = new HashSet<>();
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+    }
+
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (!latestTimeRegion.isEmpty()) {
+      SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+      Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+      List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+      for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+        if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+          liveRegionWithPruneInfoList.add(regionPruneInfo);
+        }
+      }
+
+      // Use the subset of live regions and prune regions
+      regionPruneInfos = liveRegionWithPruneInfoList;
+    }
+
     if (numRegions < 0) {
       numRegions = regionPruneInfos.size();
     }
-    
+
     Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>()
{
       @Override
       public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
                  "provided as the limit, prune upper bounds of all regions are returned.");
     pw.println("prune-info region-name-as-string");
     pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+    pw.println("to-compact-regions limit");
+    pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty,
" +
+                 "and have not registered a prune upper bound.");
   }
 
   private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
           pw.println(String.format("No prune info found for the region %s.", parameter));
         }
         return true;
+      } else if ("to-compact-regions".equals(command)) {
+        Integer numRegions = Integer.parseInt(parameter);
+        Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+        pw.println(GSON.toJson(toBeCompactedRegions));
+        return true;
       } else {
         pw.println(String.format("%s is not a valid command.", command));
         printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
     try {
       pruningDebug.initialize(hConf);
       boolean success = pruningDebug.execute(args);
+      pruningDebug.destroy();
       if (!success) {
         System.exit(1);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index d48e48d..620885b 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
   private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
   private static final Gson GSON = new Gson();
   private DataJanitorState dataJanitorState;
+  private HConnection connection;
+  private TableName tableName;
 
   /**
    * Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
    */
   public void initialize(final Configuration conf) throws IOException {
     LOG.debug("InvalidListPruningDebugMain : initialize method called");
-    final HConnection connection = new HBaseAdmin(conf).getConnection();
+    connection = new HBaseAdmin(conf).getConnection();
+    tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                           TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {
-        return connection.getTable(TableName.valueOf(
-          conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                   TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+        return connection.getTable(tableName);
       }
     });
   }
 
+  public void destroy() throws IOException {
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  /**
+   * Returns a set of regions that are live but are not empty nor have a prune upper bound
recorded. These regions
+   * will stop the progress of pruning.
+   *
+   * @param numRegions number of regions
+   * @return {@link Set} of regions that needs to be compacted and flushed
+   */
+  public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException
{
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (latestTimeRegion.isEmpty()) {
+      return new HashSet<>();
+    }
+
+    Long timestamp = latestTimeRegion.keySet().iterator().next();
+    SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+    SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp,
null);
+    SortedSet<String> emptyRegionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      emptyRegionNames.add(regionString);
+    }
+
+    Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+    // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set
of regions that are
+    // not empty and have not been registered prune upper bound
+    Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+    for (RegionPruneInfo prunedRegion : prunedRegions) {
+      if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+        nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+      }
+    }
+
+    if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+      return nonEmptyRegions;
+    }
+
+    Set<String> subsetRegions = new HashSet<>(numRegions);
+    for (String regionName : nonEmptyRegions) {
+      if (subsetRegions.size() == numRegions) {
+        break;
+      }
+      subsetRegions.add(regionName);
+    }
+    return subsetRegions;
+  }
+
   /**
    * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune
upper bounds.
-   * If -1 is passed in, all the regions and their prune upper bound will be returned.
+   * If -1 is passed in, all the regions and their prune upper bound will be returned. Note
that only the regions
+   * that are known to be live will be returned.
    *
    * @param numRegions number of regions
    * @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
       return new LinkedList<>();
     }
 
+    // Create a set with region names
+    Set<String> pruneRegionNameSet = new HashSet<>();
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+    }
+
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (!latestTimeRegion.isEmpty()) {
+      SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+      Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+      List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+      for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+        if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+          liveRegionWithPruneInfoList.add(regionPruneInfo);
+        }
+      }
+
+      // Use the subset of live regions and prune regions
+      regionPruneInfos = liveRegionWithPruneInfoList;
+    }
+
     if (numRegions < 0) {
       numRegions = regionPruneInfos.size();
     }
-    
+
     Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>()
{
       @Override
       public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
                  "provided as the limit, prune upper bounds of all regions are returned.");
     pw.println("prune-info region-name-as-string");
     pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+    pw.println("to-compact-regions limit");
+    pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty,
" +
+                 "and have not registered a prune upper bound.");
   }
 
   private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
           pw.println(String.format("No prune info found for the region %s.", parameter));
         }
         return true;
+      } else if ("to-compact-regions".equals(command)) {
+        Integer numRegions = Integer.parseInt(parameter);
+        Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+        pw.println(GSON.toJson(toBeCompactedRegions));
+        return true;
       } else {
         pw.println(String.format("%s is not a valid command.", command));
         printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
     try {
       pruningDebug.initialize(hConf);
       boolean success = pruningDebug.execute(args);
+      pruningDebug.destroy();
       if (!success) {
         System.exit(1);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index e748d90..443c998 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
   private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
   private static final Gson GSON = new Gson();
   private DataJanitorState dataJanitorState;
+  private Connection connection;
+  private TableName tableName;
 
   /**
    * Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
    */
   public void initialize(final Configuration conf) throws IOException {
     LOG.debug("InvalidListPruningDebugMain : initialize method called");
-    final Connection connection = ConnectionFactory.createConnection(conf);
+    connection = ConnectionFactory.createConnection(conf);
+    tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                           TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
-        return connection.getTable(TableName.valueOf(
-          conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                   TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+        return connection.getTable(tableName);
       }
     });
   }
 
+  public void destroy() throws IOException {
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  /**
+   * Returns a set of regions that are live but are not empty nor have a prune upper bound
recorded. These regions
+   * will stop the progress of pruning.
+   *
+   * @param numRegions number of regions
+   * @return {@link Set} of regions that needs to be compacted and flushed
+   */
+  public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException
{
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (latestTimeRegion.isEmpty()) {
+      return new HashSet<>();
+    }
+
+    Long timestamp = latestTimeRegion.keySet().iterator().next();
+    SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+    SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp,
null);
+    SortedSet<String> emptyRegionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      emptyRegionNames.add(regionString);
+    }
+
+    Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+    // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set
of regions that are
+    // not empty and have not been registered prune upper bound
+    Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+    for (RegionPruneInfo prunedRegion : prunedRegions) {
+      if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+        nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+      }
+    }
+
+    if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+      return nonEmptyRegions;
+    }
+
+    Set<String> subsetRegions = new HashSet<>(numRegions);
+    for (String regionName : nonEmptyRegions) {
+      if (subsetRegions.size() == numRegions) {
+        break;
+      }
+      subsetRegions.add(regionName);
+    }
+    return subsetRegions;
+  }
+
   /**
    * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune
upper bounds.
-   * If -1 is passed in, all the regions and their prune upper bound will be returned.
+   * If -1 is passed in, all the regions and their prune upper bound will be returned. Note
that only the regions
+   * that are known to be live will be returned.
    *
    * @param numRegions number of regions
    * @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
       return new LinkedList<>();
     }
 
+    // Create a set with region names
+    Set<String> pruneRegionNameSet = new HashSet<>();
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+    }
+
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (!latestTimeRegion.isEmpty()) {
+      SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+      Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+      List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+      for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+        if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+          liveRegionWithPruneInfoList.add(regionPruneInfo);
+        }
+      }
+
+      // Use the subset of live regions and prune regions
+      regionPruneInfos = liveRegionWithPruneInfoList;
+    }
+
     if (numRegions < 0) {
       numRegions = regionPruneInfos.size();
     }
-    
+
     Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>()
{
       @Override
       public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
                  "provided as the limit, prune upper bounds of all regions are returned.");
     pw.println("prune-info region-name-as-string");
     pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+    pw.println("to-compact-regions limit");
+    pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty,
" +
+                 "and have not registered a prune upper bound.");
   }
 
   private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
           pw.println(String.format("No prune info found for the region %s.", parameter));
         }
         return true;
+      } else if ("to-compact-regions".equals(command)) {
+        Integer numRegions = Integer.parseInt(parameter);
+        Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+        pw.println(GSON.toJson(toBeCompactedRegions));
+        return true;
       } else {
         pw.println(String.format("%s is not a valid command.", command));
         printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
     try {
       pruningDebug.initialize(hConf);
       boolean success = pruningDebug.execute(args);
+      pruningDebug.destroy();
       if (!success) {
         System.exit(1);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index e748d90..443c998 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
   private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
   private static final Gson GSON = new Gson();
   private DataJanitorState dataJanitorState;
+  private Connection connection;
+  private TableName tableName;
 
   /**
    * Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
    */
   public void initialize(final Configuration conf) throws IOException {
     LOG.debug("InvalidListPruningDebugMain : initialize method called");
-    final Connection connection = ConnectionFactory.createConnection(conf);
+    connection = ConnectionFactory.createConnection(conf);
+    tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                           TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
-        return connection.getTable(TableName.valueOf(
-          conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                   TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+        return connection.getTable(tableName);
       }
     });
   }
 
+  public void destroy() throws IOException {
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  /**
+   * Returns a set of regions that are live but are not empty nor have a prune upper bound
recorded. These regions
+   * will stop the progress of pruning.
+   *
+   * @param numRegions number of regions
+   * @return {@link Set} of regions that needs to be compacted and flushed
+   */
+  public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException
{
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (latestTimeRegion.isEmpty()) {
+      return new HashSet<>();
+    }
+
+    Long timestamp = latestTimeRegion.keySet().iterator().next();
+    SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+    SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp,
null);
+    SortedSet<String> emptyRegionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      emptyRegionNames.add(regionString);
+    }
+
+    Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+    // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set
of regions that are
+    // not empty and have not been registered prune upper bound
+    Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+    for (RegionPruneInfo prunedRegion : prunedRegions) {
+      if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+        nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+      }
+    }
+
+    if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+      return nonEmptyRegions;
+    }
+
+    Set<String> subsetRegions = new HashSet<>(numRegions);
+    for (String regionName : nonEmptyRegions) {
+      if (subsetRegions.size() == numRegions) {
+        break;
+      }
+      subsetRegions.add(regionName);
+    }
+    return subsetRegions;
+  }
+
   /**
    * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune
upper bounds.
-   * If -1 is passed in, all the regions and their prune upper bound will be returned.
+   * If -1 is passed in, all the regions and their prune upper bound will be returned. Note
that only the regions
+   * that are known to be live will be returned.
    *
    * @param numRegions number of regions
    * @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
       return new LinkedList<>();
     }
 
+    // Create a set with region names
+    Set<String> pruneRegionNameSet = new HashSet<>();
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+    }
+
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (!latestTimeRegion.isEmpty()) {
+      SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+      Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+      List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+      for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+        if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+          liveRegionWithPruneInfoList.add(regionPruneInfo);
+        }
+      }
+
+      // Use the subset of live regions and prune regions
+      regionPruneInfos = liveRegionWithPruneInfoList;
+    }
+
     if (numRegions < 0) {
       numRegions = regionPruneInfos.size();
     }
-    
+
     Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>()
{
       @Override
       public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
                  "provided as the limit, prune upper bounds of all regions are returned.");
     pw.println("prune-info region-name-as-string");
     pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+    pw.println("to-compact-regions limit");
+    pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty,
" +
+                 "and have not registered a prune upper bound.");
   }
 
   private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
           pw.println(String.format("No prune info found for the region %s.", parameter));
         }
         return true;
+      } else if ("to-compact-regions".equals(command)) {
+        Integer numRegions = Integer.parseInt(parameter);
+        Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+        pw.println(GSON.toJson(toBeCompactedRegions));
+        return true;
       } else {
         pw.println(String.format("%s is not a valid command.", command));
         printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
     try {
       pruningDebug.initialize(hConf);
       boolean success = pruningDebug.execute(args);
+      pruningDebug.destroy();
       if (!success) {
         System.exit(1);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
index e748d90..443c998 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
@@ -53,6 +57,8 @@ public class InvalidListPruningDebug {
   private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
   private static final Gson GSON = new Gson();
   private DataJanitorState dataJanitorState;
+  private Connection connection;
+  private TableName tableName;
 
   /**
    * Initialize the Invalid List Debug Tool.
@@ -61,20 +67,76 @@ public class InvalidListPruningDebug {
    */
   public void initialize(final Configuration conf) throws IOException {
     LOG.debug("InvalidListPruningDebugMain : initialize method called");
-    final Connection connection = ConnectionFactory.createConnection(conf);
+    connection = ConnectionFactory.createConnection(conf);
+    tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                           TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
-        return connection.getTable(TableName.valueOf(
-          conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                   TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)));
+        return connection.getTable(tableName);
       }
     });
   }
 
+  public void destroy() throws IOException {
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  /**
+   * Returns a set of regions that are live but are not empty nor have a prune upper bound
recorded. These regions
+   * will stop the progress of pruning.
+   *
+   * @param numRegions number of regions
+   * @return {@link Set} of regions that needs to be compacted and flushed
+   */
+  public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException
{
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (latestTimeRegion.isEmpty()) {
+      return new HashSet<>();
+    }
+
+    Long timestamp = latestTimeRegion.keySet().iterator().next();
+    SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+    SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp,
null);
+    SortedSet<String> emptyRegionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      emptyRegionNames.add(regionString);
+    }
+
+    Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+    // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set
of regions that are
+    // not empty and have not been registered prune upper bound
+    Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+    for (RegionPruneInfo prunedRegion : prunedRegions) {
+      if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+        nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+      }
+    }
+
+    if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+      return nonEmptyRegions;
+    }
+
+    Set<String> subsetRegions = new HashSet<>(numRegions);
+    for (String regionName : nonEmptyRegions) {
+      if (subsetRegions.size() == numRegions) {
+        break;
+      }
+      subsetRegions.add(regionName);
+    }
+    return subsetRegions;
+  }
+
   /**
    * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune
upper bounds.
-   * If -1 is passed in, all the regions and their prune upper bound will be returned.
+   * If -1 is passed in, all the regions and their prune upper bound will be returned. Note
that only the regions
+   * that are known to be live will be returned.
    *
    * @param numRegions number of regions
    * @return Map of region name and its prune upper bound
@@ -85,10 +147,32 @@ public class InvalidListPruningDebug {
       return new LinkedList<>();
     }
 
+    // Create a set with region names
+    Set<String> pruneRegionNameSet = new HashSet<>();
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+    }
+
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (!latestTimeRegion.isEmpty()) {
+      SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+      Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+      List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+      for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+        if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+          liveRegionWithPruneInfoList.add(regionPruneInfo);
+        }
+      }
+
+      // Use the subset of live regions and prune regions
+      regionPruneInfos = liveRegionWithPruneInfoList;
+    }
+
     if (numRegions < 0) {
       numRegions = regionPruneInfos.size();
     }
-    
+
     Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>()
{
       @Override
       public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
@@ -148,6 +232,9 @@ public class InvalidListPruningDebug {
                  "provided as the limit, prune upper bounds of all regions are returned.");
     pw.println("prune-info region-name-as-string");
     pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+    pw.println("to-compact-regions limit");
+    pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty,
" +
+                 "and have not registered a prune upper bound.");
   }
 
   private boolean execute(String[] args) throws IOException {
@@ -177,6 +264,11 @@ public class InvalidListPruningDebug {
           pw.println(String.format("No prune info found for the region %s.", parameter));
         }
         return true;
+      } else if ("to-compact-regions".equals(command)) {
+        Integer numRegions = Integer.parseInt(parameter);
+        Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+        pw.println(GSON.toJson(toBeCompactedRegions));
+        return true;
       } else {
         pw.println(String.format("%s is not a valid command.", command));
         printUsage(pw);
@@ -191,6 +283,7 @@ public class InvalidListPruningDebug {
     try {
       pruningDebug.initialize(hConf);
       boolean success = pruningDebug.execute(args);
+      pruningDebug.destroy();
       if (!success) {
         System.exit(1);
       }



Mime
View raw message