tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From poo...@apache.org
Subject incubator-tephra git commit: TEPHRA-198 Compute global prune upper bound using compaction state of every region
Date Mon, 05 Dec 2016 05:29:07 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master f5f682af6 -> 0b256202f


TEPHRA-198 Compute global prune upper bound using compaction state of every region

This closes #20

Signed-off-by: poorna <poorna@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/0b256202
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/0b256202
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/0b256202

Branch: refs/heads/master
Commit: 0b256202fc97eaa0e7e462a4b6104cd0fbf82b1f
Parents: f5f682a
Author: poorna <poorna@cask.co>
Authored: Thu Nov 10 15:09:02 2016 -0800
Committer: poorna <poorna@apache.org>
Committed: Sun Dec 4 21:28:42 2016 -0800

----------------------------------------------------------------------
 .../janitor/TransactionPruningPlugin.java       |  91 +++++
 .../java/org/apache/tephra/util/TxUtils.java    |  10 +
 .../coprocessor/janitor/CompactionState.java    |   2 +-
 .../coprocessor/janitor/DataJanitorState.java   | 297 ++++++++++++++-
 .../janitor/HBaseTransactionPruningPlugin.java  | 299 +++++++++++++++
 .../hbase/coprocessor/janitor/TimeRegions.java  |  85 +++++
 .../tephra/hbase/AbstractHBaseTableTest.java    |  15 +-
 .../tephra/hbase/InvalidListPruneTest.java      | 210 -----------
 .../janitor/DataJanitorStateTest.java           | 205 +++++++++++
 .../janitor/InvalidListPruneTest.java           | 364 +++++++++++++++++++
 10 files changed, 1356 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java b/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java
new file mode 100644
index 0000000..7ccceec
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ *
+ * <p/>
+ * An invalid transaction can only be removed from the invalid list after the data written
+ * by the invalid transactions has been removed from all the data stores.
+ * The term data store is used here to represent a set of tables in a database that have
+ * the same data clean up policy, like all Apache Phoenix tables in an HBase instance.
+ *
+ * <p/>
+ * Typically every data store will have a background job which cleans up the data written by invalid transactions.
+ * Prune upper bound for a data store is defined as the largest invalid transaction whose data has been
+ * cleaned up from that data store.
+ * <pre>
+ * prune-upper-bound = min(max(invalid list), min(in-progress list) - 1)
+ * </pre>
+ * where invalid list and in-progress list are from the transaction snapshot used to clean up the invalid data in the
+ * data store.
+ *
+ * <p/>
+ * There will be one such plugin per data store. The plugins will be executed as part of the Transaction Service.
+ * Each plugin will be invoked periodically to fetch the prune upper bound for its data store.
+ * Invalid transaction list can pruned up to the minimum of prune upper bounds returned by all the plugins.
+ */
+public interface TransactionPruningPlugin {
+  /**
+   * Called once when the Transaction Service starts up.
+   *
+   * @param conf configuration for the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Called periodically to fetch prune upper bound for a data store. The plugin examines the state of data cleanup
+   * in the data store, and determines an upper bound for invalid transactions such that any invalid transaction
+   * smaller than or equal to this upper bound is guaranteed to have all its writes removed from the data store.
+   * It then returns this upper bound as the prune upper bound for this data store.
+   *
+   * @param time start time of this prune iteration in milliseconds
+   * @param inactiveTransactionBound the largest invalid transaction that can be possibly removed
+   *                                 from the invalid list for the given time. This is an upper bound determined
+   *                                 by the Transaction Service, based on its knowledge of in-progress and invalid
+   *                                 transactions that may still have active processes and therefore future writes.
+   *                                 The plugin will typically return a reduced upper bound based on the state of
+   *                                 the invalid transaction data clean up in the data store.
+   * @return prune upper bound for the data store
+   */
+  long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException;
+
+  /**
+   * Called after successfully pruning the invalid list using the prune upper bound returned by
+   * {@link #fetchPruneUpperBound(long, long)}.
+   * The largest invalid transaction that was removed from the invalid list is passed as a parameter in this call.
+   * The plugin can use this information to clean up its state.
+   *
+   * @param time start time of this prune iteration in milliseconds (same value as passed to
+   *             {@link #fetchPruneUpperBound(long, long)} in the same run)
+   * @param maxPrunedInvalid the largest invalid transaction that was removed from the invalid list
+   */
+  void pruneComplete(long time, long maxPrunedInvalid) throws IOException;
+
+  /**
+   * Called once during the shutdown of the Transaction Service.
+   */
+  void destroy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
index b3d4ace..3a1a071 100644
--- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
+++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
@@ -166,4 +166,14 @@ public class TxUtils {
     long firstInProgress = tx.getFirstInProgress();
     return Math.min(maxInvalidTx, firstInProgress - 1);
   }
+
+  /**
+   * Returns the timestamp at which the given transaction id was generated.
+   *
+   * @param txId transaction id
+   * @return timestamp in milliseconds
+   */
+  public static long getTimestamp(long txId) {
+    return txId / TxConstants.MAX_TX_PER_MS;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
index d02456a..7412a18 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
@@ -81,7 +81,7 @@ public class CompactionState {
   public void persist() {
     if (pruneUpperBound != -1) {
       try {
-        dataJanitorState.savePruneUpperBound(regionName, pruneUpperBound);
+        dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
         LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
       } catch (IOException e) {
         LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
index 9d4f279..7daac94 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
@@ -19,20 +19,48 @@
 
 package org.apache.tephra.hbase.coprocessor.janitor;
 
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
 
 /**
  * Persist data janitor state into an HBase table.
+ * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin}
+ * to persist and read the compaction state.
  */
+@SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
   public static final byte[] FAMILY = {'f'};
-  private static final byte[] PRUNE_UPPER_BOUND_COL = {'u'};
+
+  private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
+  private static final byte[] REGION_TIME_COL = {'r'};
+  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+
   private static final byte[] REGION_KEY_PREFIX = {0x1};
+  private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
+
+  private static final byte[] REGION_TIME_KEY_PREFIX = {0x2};
+  private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3};
+
+  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
+  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+
+  private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
 
   private final TableSupplier stateTableSupplier;
 
@@ -41,7 +69,23 @@ public class DataJanitorState {
     this.stateTableSupplier = stateTableSupplier;
   }
 
-  public void savePruneUpperBound(byte[] regionId, long pruneUpperBound) throws IOException {
+  // ----------------------------------------------------------------
+  // ------- Methods for prune upper bound for a given region -------
+  // ----------------------------------------------------------------
+  // The data is stored in the following format -
+  // Key: 0x1<region-id>
+  // Col 'u': <prune upper bound>
+  // ----------------------------------------------------------------
+
+  /**
+   * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor}
+   * after major compaction.
+   *
+   * @param regionId region id
+   * @param pruneUpperBound the latest prune upper bound for the region
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException {
     try (Table stateTable = stateTableSupplier.get()) {
       Put put = new Put(makeRegionKey(regionId));
       put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound));
@@ -49,7 +93,15 @@ public class DataJanitorState {
     }
   }
 
-  public long getPruneUpperBound(byte[] regionId) throws IOException {
+  /**
+   * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no
+   * longer has writes in this region.
+   *
+   * @param regionId region id
+   * @return latest prune upper bound for the region
+   * @throws IOException when not able to read the data from HBase
+   */
+  public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException {
     try (Table stateTable = stateTableSupplier.get()) {
       Get get = new Get(makeRegionKey(regionId));
       get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
@@ -58,10 +110,249 @@ public class DataJanitorState {
     }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions. This is a batch operation of method
+   * {@link #getPruneUpperBoundForRegion(byte[])}
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
+    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    try (Table stateTable = stateTableSupplier.get()) {
+      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+      try (ResultScanner scanner = stateTable.getScanner(scan)) {
+        Result next;
+        while ((next = scanner.next()) != null) {
+          byte[] region = getRegionFromKey(next.getRow());
+          if (regions.contains(region)) {
+            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+            if (timeBytes != null) {
+              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+              resultMap.put(region, pruneUpperBoundRegion);
+            }
+          }
+        }
+      }
+      return resultMap;
+    }
+  }
+
+  /**
+   * Delete prune upper bounds for the regions that are not in the given exclude set, and the
+   * prune upper bound is less than the given value.
+   * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
+   * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
+   * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+    throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+      try (ResultScanner scanner = stateTable.getScanner(scan)) {
+        Result next;
+        while ((next = scanner.next()) != null) {
+          byte[] region = getRegionFromKey(next.getRow());
+          if (!excludeRegions.contains(region)) {
+            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+            if (timeBytes != null) {
+              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+                stateTable.delete(new Delete(next.getRow()));
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  // ---------------------------------------------------
+  // ------- Methods for regions at a given time -------
+  // ---------------------------------------------------
+  // Key: 0x2<time><region-id>
+  // Col 't': <empty byte array>
+  // ---------------------------------------------------
+
+  /**
+   * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
+   * transactional regions existing in the HBase instance periodically.
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    try (Table stateTable = stateTableSupplier.get()) {
+      for (byte[] region : regions) {
+        Put put = new Put(makeTimeRegionKey(timeBytes, region));
+        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+        stateTable.put(put);
+      }
+    }
+  }
+
+  /**
+   * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
+   * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
+   * older than that.
+   *
+   * @param time timestamp in milliseconds
+   * @return set of regions and time at which they were recorded, or null if no regions found
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+
+      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+      long currentRegionTime = -1;
+      try (ResultScanner scanner = stateTable.getScanner(scan)) {
+        Result next;
+        while ((next = scanner.next()) != null) {
+          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+          // Stop if reached next time value
+          if (currentRegionTime == -1) {
+            currentRegionTime = timeRegion.getKey();
+          } else if (timeRegion.getKey() < currentRegionTime) {
+            break;
+          } else if (timeRegion.getKey() > currentRegionTime) {
+            throw new IllegalStateException(
+              String.format("Got out of order time %d when expecting time less than or equal to %d",
+                            timeRegion.getKey(), currentRegionTime));
+          }
+          regions.add(timeRegion.getValue());
+        }
+      }
+      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+    }
+  }
+
+  /**
+   * Delete all the regions that were recorded for all times equal or less than the given time.
+   *
+   * @param time timestamp in milliseconds
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+
+      try (ResultScanner scanner = stateTable.getScanner(scan)) {
+        Result next;
+        while ((next = scanner.next()) != null) {
+          stateTable.delete(new Delete(next.getRow()));
+        }
+      }
+    }
+  }
+
+  // ---------------------------------------------------------------------
+  // ------- Methods for inactive transaction bound for given time -------
+  // ---------------------------------------------------------------------
+  // Key: 0x3<inverted time>
+  // Col 'p': <inactive transaction bound>
+  // ---------------------------------------------------------------------
+
+  /**
+   * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that
+   * will not have writes in any HBase regions that are created after the given time.
+   *
+   * @param time time in milliseconds
+   * @param inactiveTransactionBound inactive transaction bound for the given time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
+      put.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound));
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Return inactive transaction bound for the given time.
+   *
+   * @param time time in milliseconds
+   * @return inactive transaction bound for the given time
+   * @throws IOException when not able to read the data from HBase
+   */
+  public long getInactiveTransactionBoundForTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
+      get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      return result == null ? -1 : Bytes.toLong(result);
+    }
+  }
+
+  /**
+   * Delete all inactive transaction bounds recorded for a time less than the given time
+   *
+   * @param time time in milliseconds
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
+                           INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+
+      try (ResultScanner scanner = stateTable.getScanner(scan)) {
+        Result next;
+        while ((next = scanner.next()) != null) {
+          stateTable.delete(new Delete(next.getRow()));
+        }
+      }
+    }
+  }
+
   private byte[] makeRegionKey(byte[] regionId) {
     return Bytes.add(REGION_KEY_PREFIX, regionId);
   }
 
+  private byte[] getRegionFromKey(byte[] regionKey) {
+    int prefixLen = REGION_KEY_PREFIX.length;
+    return Bytes.copy(regionKey, prefixLen, regionKey.length - prefixLen);
+  }
+
+  private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) {
+    return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
+  }
+
+  private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
+    return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
+  }
+
+  private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) {
+    int offset = REGION_TIME_KEY_PREFIX.length;
+    long time = getInvertedTime(Bytes.toLong(key, offset));
+    offset += Bytes.SIZEOF_LONG;
+    byte[] regionName = Bytes.copy(key, offset, key.length - offset);
+    return Maps.immutableEntry(time, regionName);
+  }
+
+  private long getInvertedTime(long time) {
+    return Long.MAX_VALUE - time;
+  }
+
   /**
    * Supplies table for persisting state
    */

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
new file mode 100644
index 0000000..f662e37
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * <h3>State storage:</h3>
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
+ * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
+ * In addition, the plugin also persists the following information on a run at time <i>t</i>
+ * <ul>
+ *   <li>
+ *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
+ *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
+ *     attached to them.
+ *   </li>
+ *   <li>
+ *     <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that
+ *     will not have writes in any HBase regions that are created after time <i>t</i>.
+ *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
+ *     and passed on to the plugin.
+ *   </li>
+ * </ul>
+ *
+ * <h3>Computing prune upper bound:</h3>
+ *
+ * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
+ * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
+ * <br/>
+ * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
+ * <ul>
+ *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
+ *   <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li>
+ * </ul>
+ *
+ * <p/>
+ * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
+ * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
+ * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
+ * TransactionProcessor is always the latest prune upper bound for a region.
+ * <br/>
+ * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
+ * inactive transaction bound at the time the region was created.
+ * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>,
+ * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any
+ * transactional region of this HBase instance.
+ *
+ * <p/>
+ * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
+ * then you may need to write a new plugin to compute prune upper bound for those tables.
+ */
+@SuppressWarnings("WeakerAccess")
+public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
+  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
+
+  protected Configuration conf;
+  protected Connection connection;
+  protected DataJanitorState dataJanitorState;
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+    this.conf = conf;
+    this.connection = ConnectionFactory.createConnection(conf);
+
+    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
+    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+      @Override
+      public Table get() throws IOException {
+        return connection.getTable(stateTable);
+      }
+    });
+  }
+
+  /**
+   * Determines prune upper bound for the data store as mentioned above.
+   */
+  @Override
+  public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
+    LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
+              time, inactiveTransactionBound);
+    if (time < 0 || inactiveTransactionBound < 0) {
+      return -1;
+    }
+
+    // Get all the current transactional regions
+    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
+    if (!transactionalRegions.isEmpty()) {
+      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
+      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
+      // Save inactive transaction bound for time as the final step.
+      // We can then use its existence to make sure that the data for a given time is complete or not
+      LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
+      dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
+    }
+
+    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
+  }
+
+  /**
+   * After invalid list has been pruned, this cleans up state information that is no longer required.
+   * This includes -
+   * <ul>
+   *   <li>
+   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
+   *     than maxPrunedInvalid
+   *   </li>
+   *   <li>
+   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
+   *     of maxPrunedInvalid
+   *   </li>
+   *   <li>
+   *     (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
+   *     information recorded on or before the start time of maxPrunedInvalid
+   *   </li>
+   * </ul>
+   */
+  @Override
+  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
+    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
+    if (time < 0 || maxPrunedInvalid < 0) {
+      return;
+    }
+
+    // Get regions for the current time, so as to not delete the prune upper bounds for them.
+    // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
+    // is done by this class. To avoid update/delete race condition, we only delete prune upper
+    // bounds for the stale regions.
+    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
+    if (regionsToExclude != null) {
+      LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
+      dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
+    } else {
+      LOG.warn("Cannot find saved regions on or before time {}", time);
+    }
+    long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
+    LOG.debug("Deleting regions recorded before time {}", pruneTime);
+    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+  }
+
+  @Override
+  public void destroy() {
+    LOG.info("Stopping plugin...");
+    try {
+      connection.close();
+    } catch (IOException e) {
+      LOG.error("Got exception while closing HBase connection", e);
+    }
+  }
+
+  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
+  }
+
+  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (Admin admin = connection.getAdmin()) {
+      HTableDescriptor[] tableDescriptors = admin.listTables();
+      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
+      if (tableDescriptors != null) {
+        for (HTableDescriptor tableDescriptor : tableDescriptors) {
+          if (isTransactionalTable(tableDescriptor)) {
+            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
+            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
+            if (tableRegions != null) {
+              for (HRegionInfo region : tableRegions) {
+                regions.add(region.getRegionName());
+              }
+            }
+          } else {
+            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
+          }
+        }
+      }
+    }
+    return regions;
+  }
+
+  /**
+   * Try to find the latest set of regions in which all regions have been major compacted, and
+   * compute prune upper bound from them. Starting from newest to oldest, this looks into the
+   * region set that has been saved periodically, and joins it with the prune upper bound data
+   * for a region recorded after a major compaction.
+   *
+   * @param timeRegions the latest set of regions
+   * @return prune upper bound
+   * @throws IOException when not able to talk to HBase
+   */
+  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+    do {
+      LOG.debug("Computing prune upper bound for {}", timeRegions);
+      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
+      long time = timeRegions.getTime();
+
+      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+      logPruneUpperBoundRegions(pruneUpperBoundRegions);
+      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
+      // across all regions
+      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+        long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+        LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
+        // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+        if (inactiveTransactionBound != -1) {
+          Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+          return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+                        "and hence the data must be incomplete", time);
+          }
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+          LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
+                    time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
+        }
+      }
+
+      timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
+    } while (timeRegions != null);
+    return -1;
+  }
+
+  private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got region - prune upper bound map: {}",
+                Iterables.transform(pruneUpperBoundRegions.entrySet(),
+                                    new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
+                                      @Override
+                                      public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
+                                        String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
+                                        return Maps.immutableEntry(regionName, input.getValue());
+                                      }
+                                    }));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java
new file mode 100644
index 0000000..813f5dd
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Contains information on the set of transactional regions recorded at a given time
+ */
+@SuppressWarnings("WeakerAccess")
+public class TimeRegions {
+  static final Function<byte[], String> BYTE_ARR_TO_STRING_FN =
+    new Function<byte[], String>() {
+      @Override
+      public String apply(byte[] input) {
+        return Bytes.toStringBinary(input);
+      }
+    };
+
+  private final long time;
+  private final SortedSet<byte[]> regions;
+
+  public TimeRegions(long time, SortedSet<byte[]> regions) {
+    this.time = time;
+    this.regions = regions;
+  }
+
+  public long getTime() {
+    return time;
+  }
+
+  public SortedSet<byte[]> getRegions() {
+    return regions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TimeRegions that = (TimeRegions) o;
+    return time == that.time &&
+      Objects.equals(regions, that.regions);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(time, regions);
+  }
+
+  @Override
+  public String toString() {
+    Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN);
+    return "TimeRegions{" +
+      "time=" + time +
+      ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
index cb8d695..560b0fe 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -40,9 +40,9 @@ import java.util.List;
  */
 @SuppressWarnings("WeakerAccess")
 public abstract class AbstractHBaseTableTest {
-  static HBaseTestingUtility testUtil;
-  static HBaseAdmin hBaseAdmin;
-  static Configuration conf;
+  protected static HBaseTestingUtility testUtil;
+  protected static HBaseAdmin hBaseAdmin;
+  protected static Configuration conf;
 
   @BeforeClass
   public static void startMiniCluster() throws Exception {
@@ -76,13 +76,13 @@ public abstract class AbstractHBaseTableTest {
     }
   }
 
-  static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+  protected static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
     return createTable(tableName, columnFamilies, false,
                        Collections.singletonList(TransactionProcessor.class.getName()));
   }
 
-  static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
-                             List<String> coprocessors) throws Exception {
+  protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+                                      List<String> coprocessors) throws Exception {
     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
     for (byte[] family : columnFamilies) {
       HColumnDescriptor columnDesc = new HColumnDescriptor(family);
@@ -94,7 +94,7 @@ public abstract class AbstractHBaseTableTest {
       desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
     }
     // Divide individually to prevent any overflow
-    int priority  = Coprocessor.PRIORITY_USER;
+    int priority = Coprocessor.PRIORITY_USER;
     // order in list is the same order that coprocessors will be invoked
     for (String coprocessor : coprocessors) {
       desc.addCoprocessor(coprocessor, null, ++priority, null);
@@ -103,5 +103,4 @@ public abstract class AbstractHBaseTableTest {
     testUtil.waitTableAvailable(tableName, 5000);
     return new HTable(testUtil.getConfiguration(), tableName);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
deleted file mode 100644
index ebf58eb..0000000
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tephra.hbase;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedMap;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.coprocessor.TransactionStateCache;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
-import org.apache.tephra.hbase.coprocessor.janitor.DataJanitorState;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.InMemoryTransactionStateStorage;
-import org.apache.tephra.persist.TransactionSnapshot;
-import org.apache.tephra.persist.TransactionStateStorage;
-import org.apache.tephra.persist.TransactionVisibilityState;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-
-/**
- * Test invalid list pruning
- */
-public class InvalidListPruneTest extends AbstractHBaseTableTest {
-  private static final byte[] family = Bytes.toBytes("f1");
-  private static final byte[] qualifier = Bytes.toBytes("col1");
-
-  private static TableName dataTable;
-  private static TableName pruneStateTable;
-
-  // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
-  @BeforeClass
-  public static void startMiniCluster() throws Exception {
-    // Setup the configuration to start HBase cluster with the invalid list pruning enabled
-    conf = HBaseConfiguration.create();
-    conf.setBoolean(TxConstants.DataJanitor.PRUNE_ENABLE, true);
-    AbstractHBaseTableTest.startMiniCluster();
-
-    TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
-    TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
-    txManager.startAndWait();
-
-    // Do some transactional data operations
-    dataTable = TableName.valueOf("invalidListPruneTestTable");
-    HTable hTable = createTable(dataTable.getName(), new byte[][]{family}, false,
-                                Collections.singletonList(TestTransactionProcessor.class.getName()));
-    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
-      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
-      txContext.start();
-      for (int i = 0; i < 10; ++i) {
-        txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i)));
-      }
-      txContext.finish();
-    }
-
-    testUtil.flush(dataTable);
-    txManager.stopAndWait();
-
-    pruneStateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
-                                                 TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
-  }
-
-  @AfterClass
-  public static void shutdownAfterClass() throws Exception {
-    hBaseAdmin.disableTable(dataTable);
-    hBaseAdmin.deleteTable(dataTable);
-  }
-
-  @Before
-  public void beforeTest() throws Exception {
-    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
-                               // Prune state table is a non-transactional table, hence no transaction co-processor
-                               Collections.<String>emptyList());
-    table.close();
-  }
-
-  @After
-  public void afterTest() throws Exception {
-    hBaseAdmin.disableTable(pruneStateTable);
-    hBaseAdmin.deleteTable(pruneStateTable);
-  }
-
-  @Test
-  public void testRecordCompactionState() throws Exception {
-    DataJanitorState dataJanitorState =
-      new DataJanitorState(new DataJanitorState.TableSupplier() {
-        @Override
-        public Table get() throws IOException {
-          return testUtil.getConnection().getTable(pruneStateTable);
-        }
-      });
-
-    // No prune upper bound initially
-    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
-
-    // Create a new transaction snapshot
-    InMemoryTransactionStateCache.setTransactionSnapshot(
-      new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
-                              ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
-    // Run minor compaction
-    testUtil.compact(dataTable, false);
-    // No prune upper bound after minor compaction too
-    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
-
-    // Run major compaction, and verify prune upper bound
-    testUtil.compact(dataTable, true);
-    Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
-
-    // Run major compaction again with same snapshot, prune upper bound should not change
-    testUtil.compact(dataTable, true);
-    Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
-
-    // Create a new transaction snapshot
-    InMemoryTransactionStateCache.setTransactionSnapshot(
-      new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
-                              ImmutableSortedMap.of(
-                                105L, new TransactionManager.InProgressTx(100, 30, TransactionType.SHORT)
-                              )
-    ));
-    Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
-
-    // Run major compaction again, now prune upper bound should change
-    testUtil.compact(dataTable, true);
-    Assert.assertEquals(104, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
-  }
-
-  private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
-    HRegionLocation regionLocation =
-      testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
-    return regionLocation.getRegionInfo().getRegionName();
-  }
-
-  /**
-   * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing
-   */
-  @SuppressWarnings("WeakerAccess")
-  public static class TestTransactionProcessor extends TransactionProcessor {
-    @Override
-    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
-      return new Supplier<TransactionStateCache>() {
-        @Override
-        public TransactionStateCache get() {
-          return new InMemoryTransactionStateCache();
-        }
-      };
-    }
-  }
-
-  /**
-   * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor} for testing
-   */
-  @SuppressWarnings("WeakerAccess")
-  public static class InMemoryTransactionStateCache extends TransactionStateCache {
-    private static TransactionVisibilityState transactionSnapshot;
-
-    public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot) {
-      InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot;
-    }
-
-    @Override
-    protected void startUp() throws Exception {
-      // Nothing to do
-    }
-
-    @Override
-    protected void shutDown() throws Exception {
-      // Nothing to do
-    }
-
-    @Override
-    public TransactionVisibilityState getLatestState() {
-      return transactionSnapshot;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorStateTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorStateTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorStateTest.java
new file mode 100644
index 0000000..f1cff82
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorStateTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Test methods of {@link DataJanitorState}
+ */
+// TODO: Group all the tests that need HBase mini cluster into a suite, so that we start the mini-cluster only once
+public class DataJanitorStateTest extends AbstractHBaseTableTest {
+
+  private TableName pruneStateTable;
+  private DataJanitorState dataJanitorState;
+
+  @Before
+  public void beforeTest() throws Exception {
+    pruneStateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+                                                 TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+                               // Prune state table is a non-transactional table, hence no transaction co-processor
+                               Collections.<String>emptyList());
+    table.close();
+
+    dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    hBaseAdmin.disableTable(pruneStateTable);
+    hBaseAdmin.deleteTable(pruneStateTable);
+  }
+
+  @Test
+  public void testSavePruneUpperBound() throws Exception {
+    int max = 20;
+
+    // Nothing should be present in the beginning
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L)));
+
+    // Save some region - prune upper bound values
+    // We should have values for regions 0, 2, 4, 6, ..., max-2 after this
+    for (long i = 0; i < max; i += 2) {
+      dataJanitorState.savePruneUpperBoundForRegion(Bytes.toBytes(i), i);
+    }
+
+    Assert.assertEquals(10L, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L)));
+
+    // Verify all the saved values
+    for (long i = 0; i < max; ++i) {
+      long expected = i % 2 == 0 ? i : -1;
+      Assert.assertEquals(expected, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(i)));
+    }
+    // Regions not present should give -1
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(max + 50L)));
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes((max + 10L) * -1)));
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(3L)));
+
+    SortedSet<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    Map<byte[], Long> expectedMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (long i = 0; i < max; ++i) {
+      allRegions.add(Bytes.toBytes(i));
+      if (i % 2 == 0) {
+        expectedMap.put(Bytes.toBytes(i), i);
+      }
+    }
+    Assert.assertEquals(max / 2, expectedMap.size());
+    Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions));
+
+    SortedSet<byte[]> regions = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+      .add(Bytes.toBytes((max + 20L) * -1))
+      .add(Bytes.toBytes(6L))
+      .add(Bytes.toBytes(15L))
+      .add(Bytes.toBytes(18L))
+      .add(Bytes.toBytes(max + 33L))
+      .build();
+    expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR)
+      .put(Bytes.toBytes(6L), 6L)
+      .put(Bytes.toBytes(18L), 18L)
+      .build();
+    Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(regions));
+
+    // Delete regions that have prune upper bound before 15 and not in set (4, 8)
+    ImmutableSortedSet<byte[]> excludeRegions =
+      ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR).add(Bytes.toBytes(4L)).add(Bytes.toBytes(8L)).build();
+    dataJanitorState.deletePruneUpperBounds(15, excludeRegions);
+    // Regions 0, 2, 6 and 10 should have been deleted now
+    expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR)
+      .put(Bytes.toBytes(4L), 4L)
+      .put(Bytes.toBytes(8L), 8L)
+      .put(Bytes.toBytes(16L), 16L)
+      .put(Bytes.toBytes(18L), 18L)
+      .build();
+    Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions));
+  }
+
+  @Test
+  public void testSaveRegionTime() throws Exception {
+    int maxTime = 100;
+
+    // Nothing should be present in the beginning
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(maxTime));
+
+    // Save regions for time
+    Map<Long, SortedSet<byte[]>> regionsTime = new TreeMap<>();
+    for (long time = 0; time < maxTime; time += 10) {
+      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+      for (long region = 0; region < 10; region += 2) {
+        regions.add(Bytes.toBytes((time * 10) + region));
+      }
+      regionsTime.put(time, regions);
+      dataJanitorState.saveRegionsForTime(time, regions);
+    }
+
+    // Verify saved regions
+    Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+    Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
+    Assert.assertEquals(new TimeRegions(90, regionsTime.get(90L)),
+                        dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+
+    // Delete regions saved on or before time 30
+    dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
+    // Values on or before time 30 should be deleted
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+    // Values after time 30 should still exist
+    Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+  }
+
+  @Test
+  public void testSaveInactiveTransactionBoundTime() throws Exception {
+    int maxTime = 100;
+
+    // Nothing sould be present in the beginning
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
+
+    // Save inactive transaction bounds for various time values
+    for (long time = 0; time < maxTime; time += 10) {
+      dataJanitorState.saveInactiveTransactionBoundForTime(time, time + 2);
+    }
+
+    // Verify written values
+    Assert.assertEquals(2, dataJanitorState.getInactiveTransactionBoundForTime(0));
+    Assert.assertEquals(12, dataJanitorState.getInactiveTransactionBoundForTime(10));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(15));
+    Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(maxTime + 100));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime((maxTime + 55) * -1L));
+
+    // Delete values saved on or before time 20
+    dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(20);
+    // Values on or before time 20 should be deleted
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(0));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(20));
+    // Values after time 20 should still exist
+    Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
+    Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0b256202/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java
new file mode 100644
index 0000000..955d2de
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Test invalid list pruning
+ */
+public class InvalidListPruneTest extends AbstractHBaseTableTest {
+  private static final byte[] family = Bytes.toBytes("f1");
+  private static final byte[] qualifier = Bytes.toBytes("col1");
+  private static final int MAX_ROWS = 1000;
+
+  private static TableName txDataTable1;
+  private static TableName pruneStateTable;
+
+  // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
+  @BeforeClass
+  public static void startMiniCluster() throws Exception {
+    // Setup the configuration to start HBase cluster with the invalid list pruning enabled
+    conf = HBaseConfiguration.create();
+    conf.setBoolean(TxConstants.DataJanitor.PRUNE_ENABLE, true);
+    AbstractHBaseTableTest.startMiniCluster();
+
+    TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
+    TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+    txManager.startAndWait();
+
+    // Do some transactional data operations
+    txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
+    HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
+                                Collections.singletonList(TestTransactionProcessor.class.getName()));
+    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+      txContext.start();
+      for (int i = 0; i < MAX_ROWS; ++i) {
+        txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i)));
+      }
+      txContext.finish();
+    }
+
+    testUtil.flush(txDataTable1);
+    txManager.stopAndWait();
+
+    pruneStateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+                                                 TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+  }
+
+  @AfterClass
+  public static void shutdownAfterClass() throws Exception {
+    hBaseAdmin.disableTable(txDataTable1);
+    hBaseAdmin.deleteTable(txDataTable1);
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    createPruneStateTable();
+    InMemoryTransactionStateCache.setTransactionSnapshot(null);
+  }
+
+  private void createPruneStateTable() throws Exception {
+    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+                               // Prune state table is a non-transactional table, hence no transaction co-processor
+                               Collections.<String>emptyList());
+    table.close();
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    deletePruneStateTable();
+  }
+
+  private void deletePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      hBaseAdmin.disableTable(pruneStateTable);
+      hBaseAdmin.deleteTable(pruneStateTable);
+    }
+  }
+
+  @Test
+  public void testRecordCompactionState() throws Exception {
+    DataJanitorState dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
+    // No prune upper bound initially
+    Assert.assertEquals(-1,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+                              ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+    // Run minor compaction
+    testUtil.compact(txDataTable1, false);
+    // No prune upper bound after minor compaction too
+    Assert.assertEquals(-1,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+    // Run major compaction, and verify prune upper bound
+    testUtil.compact(txDataTable1, true);
+    Assert.assertEquals(50,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+    // Run major compaction again with same snapshot, prune upper bound should not change
+    testUtil.compact(txDataTable1, true);
+    Assert.assertEquals(50,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
+                              ImmutableSortedMap.of(
+                                105L, new TransactionManager.InProgressTx(100, 30, TransactionType.SHORT)
+                              )
+    ));
+    Assert.assertEquals(50,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+    // Run major compaction again, now prune upper bound should change
+    testUtil.compact(txDataTable1, true);
+    Assert.assertEquals(104,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+  }
+
+  @Test
+  public void testRecordCompactionStateNoTable() throws Exception {
+    // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
+    // and make sure a major compaction succeeds
+    deletePruneStateTable();
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+                              ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+    // Run major compaction, and verify it completes
+    long now = System.currentTimeMillis();
+    testUtil.compact(txDataTable1, true);
+    long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
+    Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
+                      lastMajorCompactionTime >= now);
+  }
+
+  @Test
+  public void testRecordCompactionStateNoTxSnapshot() throws Exception {
+    // Test recording state without having a transaction snapshot to make sure we don't disrupt
+    // major compaction in that case
+    InMemoryTransactionStateCache.setTransactionSnapshot(null);
+    // Run major compaction, and verify it completes
+    long now = System.currentTimeMillis();
+    testUtil.compact(txDataTable1, true);
+    long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
+    Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
+                      lastMajorCompactionTime >= now);
+  }
+
+  @Test
+  public void testPruneUpperBound() throws Exception {
+    DataJanitorState dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
+    TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+    try {
+      // Run without a transaction snapshot first
+      long now1 = 200;
+      long inactiveTxTimeNow1 = 150 * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound1 = -1;
+      // fetch prune upper bound
+      long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+
+      TimeRegions expectedRegions1 =
+        new TimeRegions(now1,
+                        ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+                          .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+                          .build());
+      // Assert prune state is recorded correctly
+      Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1,
+                          dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      // Run prune complete
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Assert prune state was cleaned up correctly based on the prune time
+      Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1,
+                          dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      // Create a new transaction snapshot, and run major compaction on txDataTable1
+      // And run all assertions again
+      long now2 = 300;
+      long inactiveTxTimeNow2 = 250 * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = 200 * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      TimeRegions expectedRegions2 =
+        new TimeRegions(now2,
+                        ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+                          .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+                          .build());
+      testUtil.compact(txDataTable1, true);
+      long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+
+      Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2));
+      Assert.assertEquals(expectedPruneUpperBound2,
+                          dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2));
+      Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      transactionPruningPlugin.pruneComplete(now2, pruneUpperBound2);
+      Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2));
+      Assert.assertEquals(expectedPruneUpperBound2,
+                          dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2));
+      Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+    } finally {
+      transactionPruningPlugin.destroy();
+    }
+  }
+
+  private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
+    HRegionLocation regionLocation =
+      testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
+    return regionLocation.getRegionInfo().getRegionName();
+  }
+
+  /**
+   * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static class TestTransactionProcessor extends TransactionProcessor {
+    private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
+
+    @Override
+    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+      return new Supplier<TransactionStateCache>() {
+        @Override
+        public TransactionStateCache get() {
+          return new InMemoryTransactionStateCache();
+        }
+      };
+    }
+
+    @Override
+    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
+                            CompactionRequest request) throws IOException {
+      super.postCompact(e, store, resultFile, request);
+      lastMajorCompactionTime.set(System.currentTimeMillis());
+    }
+  }
+
+  /**
+   * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor} for testing
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static class InMemoryTransactionStateCache extends TransactionStateCache {
+    private static TransactionVisibilityState transactionSnapshot;
+
+    public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot) {
+      InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      // Nothing to do
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      // Nothing to do
+    }
+
+    @Override
+    public TransactionVisibilityState getLatestState() {
+      return transactionSnapshot;
+    }
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public static class TestTransactionPruningPlugin extends HBaseTransactionPruningPlugin {
+    @Override
+    protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+      return tableDescriptor.hasCoprocessor(TestTransactionProcessor.class.getName());
+    }
+  }
+}


Mime
View raw message