tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From poo...@apache.org
Subject [4/4] incubator-tephra git commit: TEPHRA-203 Invalid transaction pruning service
Date Wed, 28 Dec 2016 10:38:29 GMT
TEPHRA-203 Invalid transaction pruning service

Signed-off-by: poorna <poorna@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/79b97198
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/79b97198
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/79b97198

Branch: refs/heads/master
Commit: 79b97198ca92655b26f09d79304b798007a5dc45
Parents: 7c8267c
Author: poorna <poorna@cask.co>
Authored: Tue Dec 6 01:55:55 2016 -0800
Committer: poorna <poorna@apache.org>
Committed: Wed Dec 28 16:08:11 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/tephra/TxConstants.java     |  26 +-
 .../tephra/distributed/TransactionService.java  |  18 +
 .../janitor/TransactionPruningPlugin.java       |  91 -----
 .../txprune/TransactionPruningPlugin.java       |  88 +++++
 .../txprune/TransactionPruningRunnable.java     | 128 +++++++
 .../txprune/TransactionPruningService.java      | 144 ++++++++
 .../java/org/apache/tephra/util/TxUtils.java    |  12 +
 .../txprune/TransactionPruningServiceTest.java  | 337 +++++++++++++++++
 .../hbase/coprocessor/TransactionProcessor.java |  12 +-
 .../coprocessor/janitor/CompactionState.java    |  92 -----
 .../coprocessor/janitor/DataJanitorState.java   | 362 -------------------
 .../janitor/HBaseTransactionPruningPlugin.java  | 299 ---------------
 .../hbase/coprocessor/janitor/TimeRegions.java  |  85 -----
 .../tephra/hbase/txprune/CompactionState.java   |  94 +++++
 .../tephra/hbase/txprune/DataJanitorState.java  | 362 +++++++++++++++++++
 .../txprune/HBaseTransactionPruningPlugin.java  | 299 +++++++++++++++
 .../tephra/hbase/txprune/TimeRegions.java       |  85 +++++
 .../janitor/DataJanitorStateTest.java           | 205 -----------
 .../janitor/InvalidListPruneTest.java           | 361 ------------------
 .../hbase/txprune/DataJanitorStateTest.java     | 205 +++++++++++
 .../hbase/txprune/InvalidListPruneTest.java     | 361 ++++++++++++++++++
 21 files changed, 2163 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index bc02936..512e93c 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -354,13 +354,35 @@ public class TxConstants {
   }
 
   /**
-   * Configuration for data janitor
+   * Configuration for invalid transaction pruning
    */
-  public static final class DataJanitor {
+  public static final class TransactionPruning {
+    /**
+     * Flag to enable automatic invalid transaction pruning.
+     */
     public static final String PRUNE_ENABLE = "data.tx.prune.enable";
+    /**
+     * The table used to store intermediate state when pruning is enabled.
+     */
     public static final String PRUNE_STATE_TABLE = "data.tx.prune.state.table";
+    /**
+     * Interval in seconds to schedule prune run.
+     */
+    public static final String PRUNE_INTERVAL = "data.tx.prune.interval";
+    /**
+     * Comma separated list of invalid transaction pruning plugins to load
+     */
+    public static final String PLUGINS = "data.tx.prune.plugins";
+    /**
+     * Class name for the plugins will be plugin-name + ".class" suffix
+     */
+    public static final String PLUGIN_CLASS_SUFFIX = ".class";
 
     public static final boolean DEFAULT_PRUNE_ENABLE = false;
     public static final String DEFAULT_PRUNE_STATE_TABLE = "data_tx_janitor_state";
+    public static final long DEFAULT_PRUNE_INTERVAL = TimeUnit.HOURS.toSeconds(6);
+    public static final String DEFAULT_PLUGIN = "data.tx.prune.plugin.default";
+    public static final String DEFAULT_PLUGIN_CLASS =
+      "org.apache.tephra.hbase.txprune.HBaseTransactionPruningPlugin";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
index 4061c4d..d4a0f87 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
@@ -19,6 +19,8 @@
 package org.apache.tephra.distributed;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.inject.Inject;
@@ -28,6 +30,7 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.distributed.thrift.TTransactionServer;
 import org.apache.tephra.inmemory.InMemoryTransactionService;
 import org.apache.tephra.rpc.ThriftRPCServer;
+import org.apache.tephra.txprune.TransactionPruningService;
 import org.apache.twill.api.ElectionHandler;
 import org.apache.twill.discovery.DiscoveryService;
 import org.apache.twill.internal.ServiceListenerAdapter;
@@ -50,9 +53,11 @@ import javax.annotation.Nullable;
 public final class TransactionService extends InMemoryTransactionService {
   private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class);
   private LeaderElection leaderElection;
+  private final Configuration conf;
   private final ZKClient zkClient;
 
   private ThriftRPCServer<TransactionServiceThriftHandler, TTransactionServer> server;
+  private TransactionPruningService pruningService;
 
   @Inject
   public TransactionService(Configuration conf,
@@ -60,6 +65,7 @@ public final class TransactionService extends InMemoryTransactionService {
                             DiscoveryService discoveryService,
                             Provider<TransactionManager> txManagerProvider) {
     super(conf, discoveryService, txManagerProvider);
+    this.conf = conf;
     this.zkClient = zkClient;
   }
 
@@ -91,6 +97,8 @@ public final class TransactionService extends InMemoryTransactionService {
           }
         }, MoreExecutors.sameThreadExecutor());
 
+        pruningService = new TransactionPruningService(conf, txManager);
+
         server = ThriftRPCServer.builder(TTransactionServer.class)
           .setHost(address)
           .setPort(port)
@@ -100,6 +108,7 @@ public final class TransactionService extends InMemoryTransactionService {
           .build(new TransactionServiceThriftHandler(txManager));
         try {
           server.startAndWait();
+          pruningService.startAndWait();
           doRegister();
           LOG.info("Transaction Thrift Service started successfully on " + getAddress());
         } catch (Throwable t) {
@@ -111,12 +120,21 @@ public final class TransactionService extends InMemoryTransactionService {
 
       @Override
       public void follower() {
+        ListenableFuture<State> stopFuture = null;
         // First stop the transaction server as un-registering from discovery can block sometimes.
         // That can lead to multiple transaction servers being active at the same time.
         if (server != null && server.isRunning()) {
           server.stopAndWait();
         }
+        if (pruningService != null && pruningService.isRunning()) {
+          // Wait for pruning service to stop after un-registering from discovery
+          stopFuture = pruningService.stop();
+        }
         undoRegister();
+
+        if (stopFuture != null) {
+          Futures.getUnchecked(stopFuture);
+        }
       }
     });
     leaderElection.start();

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java b/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java
deleted file mode 100644
index 7ccceec..0000000
--- a/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tephra.janitor;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-
-/**
- * Data janitor interface to manage the invalid transaction list.
- *
- * <p/>
- * An invalid transaction can only be removed from the invalid list after the data written
- * by the invalid transactions has been removed from all the data stores.
- * The term data store is used here to represent a set of tables in a database that have
- * the same data clean up policy, like all Apache Phoenix tables in an HBase instance.
- *
- * <p/>
- * Typically every data store will have a background job which cleans up the data written by invalid transactions.
- * Prune upper bound for a data store is defined as the largest invalid transaction whose data has been
- * cleaned up from that data store.
- * <pre>
- * prune-upper-bound = min(max(invalid list), min(in-progress list) - 1)
- * </pre>
- * where invalid list and in-progress list are from the transaction snapshot used to clean up the invalid data in the
- * data store.
- *
- * <p/>
- * There will be one such plugin per data store. The plugins will be executed as part of the Transaction Service.
- * Each plugin will be invoked periodically to fetch the prune upper bound for its data store.
- * Invalid transaction list can pruned up to the minimum of prune upper bounds returned by all the plugins.
- */
-public interface TransactionPruningPlugin {
-  /**
-   * Called once when the Transaction Service starts up.
-   *
-   * @param conf configuration for the plugin
-   */
-  void initialize(Configuration conf) throws IOException;
-
-  /**
-   * Called periodically to fetch prune upper bound for a data store. The plugin examines the state of data cleanup
-   * in the data store, and determines an upper bound for invalid transactions such that any invalid transaction
-   * smaller than or equal to this upper bound is guaranteed to have all its writes removed from the data store.
-   * It then returns this upper bound as the prune upper bound for this data store.
-   *
-   * @param time start time of this prune iteration in milliseconds
-   * @param inactiveTransactionBound the largest invalid transaction that can be possibly removed
-   *                                 from the invalid list for the given time. This is an upper bound determined
-   *                                 by the Transaction Service, based on its knowledge of in-progress and invalid
-   *                                 transactions that may still have active processes and therefore future writes.
-   *                                 The plugin will typically return a reduced upper bound based on the state of
-   *                                 the invalid transaction data clean up in the data store.
-   * @return prune upper bound for the data store
-   */
-  long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException;
-
-  /**
-   * Called after successfully pruning the invalid list using the prune upper bound returned by
-   * {@link #fetchPruneUpperBound(long, long)}.
-   * The largest invalid transaction that was removed from the invalid list is passed as a parameter in this call.
-   * The plugin can use this information to clean up its state.
-   *
-   * @param time start time of this prune iteration in milliseconds (same value as passed to
-   *             {@link #fetchPruneUpperBound(long, long)} in the same run)
-   * @param maxPrunedInvalid the largest invalid transaction that was removed from the invalid list
-   */
-  void pruneComplete(long time, long maxPrunedInvalid) throws IOException;
-
-  /**
-   * Called once during the shutdown of the Transaction Service.
-   */
-  void destroy();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningPlugin.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningPlugin.java
new file mode 100644
index 0000000..2261331
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningPlugin.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.txprune;
+
+import com.google.common.annotations.Beta;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Interface to manage the invalid transaction list.
+ *
+ * <p/>
+ * An invalid transaction can only be removed from the invalid list after the data written
+ * by the invalid transactions has been removed from all the data stores.
+ * The term data store is used here to represent a set of tables in a database that have
+ * the same data clean up policy, like all Apache Phoenix tables in an HBase instance.
+ *
+ * <p/>
+ * Typically every data store will have a background job which cleans up the data written by invalid transactions.
+ * Prune upper bound for a data store is defined as the largest invalid transaction whose data has been
+ * cleaned up from that data store.
+ *
+ * <p/>
+ * There will be one such plugin per data store. The plugins will be executed as part of the Transaction Service.
+ * Each plugin will be invoked periodically to fetch the prune upper bound for its data store.
+ * Invalid transaction list can pruned up to the minimum of prune upper bounds returned by all the plugins.
+ */
+@Beta
+public interface TransactionPruningPlugin {
+  /**
+   * Called once when the Transaction Service starts up.
+   *
+   * @param conf configuration for the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Called periodically to fetch prune upper bound for a data store. The plugin examines the state of data cleanup
+   * in the data store, and determines an upper bound for invalid transactions such that any invalid transaction
+   * smaller than or equal to this upper bound is guaranteed to have all its writes removed from the data store.
+   * It then returns this upper bound as the prune upper bound for this data store.
+   *
+   * @param time start time of this prune iteration in milliseconds
+   * @param inactiveTransactionBound the largest invalid transaction that can be possibly removed
+   *                                 from the invalid list for the given time. This is an upper bound determined
+   *                                 by the Transaction Service, based on its knowledge of in-progress and invalid
+   *                                 transactions that may still have active processes and therefore future writes.
+   *                                 The plugin will typically return a reduced upper bound based on the state of
+   *                                 the invalid transaction data clean up in the data store.
+   * @return prune upper bound for the data store
+   */
+  long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException;
+
+  /**
+   * Called after successfully pruning the invalid list using the prune upper bound returned by
+   * {@link #fetchPruneUpperBound(long, long)}.
+   * The largest invalid transaction that was removed from the invalid list is passed as a parameter in this call.
+   * The plugin can use this information to clean up its state.
+   *
+   * @param time start time of this prune iteration in milliseconds (same value as passed to
+   *             {@link #fetchPruneUpperBound(long, long)} in the same run)
+   * @param maxPrunedInvalid the largest invalid transaction that was removed from the invalid list
+   */
+  void pruneComplete(long time, long maxPrunedInvalid) throws IOException;
+
+  /**
+   * Called once during the shutdown of the Transaction Service.
+   */
+  void destroy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
new file mode 100644
index 0000000..8ea5a11
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.txprune;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+/**
+ * This class executes one run of transaction pruning every time it is invoked.
+ * Typically, this class will be scheduled to run periodically.
+ */
+public class TransactionPruningRunnable implements Runnable {
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionPruningRunnable.class);
+
+  private final TransactionManager txManager;
+  private final Map<String, TransactionPruningPlugin> plugins;
+  private final long txMaxLifetimeMillis;
+
+  public TransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins,
+                                    long txMaxLifetimeMillis) {
+    this.txManager = txManager;
+    this.plugins = plugins;
+    this.txMaxLifetimeMillis = txMaxLifetimeMillis;
+  }
+
+  @Override
+  public void run() {
+    try {
+      // TODO: TEPHRA-159 Start a read only transaction here
+      Transaction tx = txManager.startShort();
+      txManager.abort(tx);
+
+      long now = getTime();
+      long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis);
+      LOG.info("Starting invalid prune run for time {} and inactive transaction bound {}",
+               now, inactiveTransactionBound);
+
+      List<Long> pruneUpperBounds = new ArrayList<>();
+      for (Map.Entry<String, TransactionPruningPlugin> entry : plugins.entrySet()) {
+        String name = entry.getKey();
+        TransactionPruningPlugin plugin = entry.getValue();
+        try {
+          LOG.debug("Fetching prune upper bound using plugin {}", name);
+          long pruneUpperBound = plugin.fetchPruneUpperBound(now, inactiveTransactionBound);
+          LOG.debug("Got prune upper bound {} from plugin {}", pruneUpperBound, name);
+          pruneUpperBounds.add(pruneUpperBound);
+        } catch (Exception e) {
+          LOG.error("Aborting invalid prune run for time {} due to exception from plugin {}", now, name, e);
+          return;
+        }
+      }
+
+      long minPruneUpperBound = Collections.min(pruneUpperBounds);
+      LOG.info("Got minimum prune upper bound {} across all plugins", minPruneUpperBound);
+      if (minPruneUpperBound <= 0) {
+        LOG.info("Not pruning invalid list since minimum prune upper bound ({}) is less than 1", minPruneUpperBound);
+        return;
+      }
+
+      long[] invalids = tx.getInvalids();
+      TreeSet<Long> toTruncate = new TreeSet<>();
+      LOG.debug("Invalid list: {}", invalids);
+      for (long invalid : invalids) {
+        if (invalid <= minPruneUpperBound) {
+          toTruncate.add(invalid);
+        }
+      }
+      if (toTruncate.isEmpty()) {
+        LOG.info("Not pruning invalid list since no invalid id is less than or equal to the minimum prune upper bound");
+        return;
+      }
+
+      LOG.debug("Removing the following invalid ids from the invalid list", toTruncate);
+      txManager.truncateInvalidTx(toTruncate);
+      LOG.info("Removed {} invalid ids from the invalid list", toTruncate.size());
+
+      // Call prune complete on all plugins
+      Long maxPrunedInvalid = toTruncate.last();
+      for (Map.Entry<String, TransactionPruningPlugin> entry : plugins.entrySet()) {
+        String name = entry.getKey();
+        TransactionPruningPlugin plugin = entry.getValue();
+        try {
+          LOG.debug("Calling prune complete on plugin {}", name);
+          plugin.pruneComplete(now, maxPrunedInvalid);
+        } catch (Exception e) {
+          // Ignore any exceptions and continue with other plugins
+          LOG.error("Got error while calling prune complete on plugin {}", name, e);
+        }
+      }
+
+      LOG.info("Invalid prune run for time {} is complete", now);
+    } catch (Exception e) {
+      LOG.error("Got exception during invalid list prune run", e);
+    }
+  }
+
+  @VisibleForTesting
+  long getTime() {
+    return System.currentTimeMillis();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
new file mode 100644
index 0000000..52d7279
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.txprune;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.twill.internal.utils.Instances;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service to prune the invalid list periodically.
+ */
+public class TransactionPruningService extends AbstractIdleService {
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionPruningService.class);
+
+  private final Configuration conf;
+  private final TransactionManager txManager;
+  private final long scheduleInterval;
+  private final boolean pruneEnabled;
+  private ScheduledExecutorService scheduledExecutorService;
+
+  public TransactionPruningService(Configuration conf, TransactionManager txManager) {
+    this.conf = conf;
+    this.txManager = txManager;
+    this.pruneEnabled = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                        TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+    this.scheduleInterval = conf.getLong(TxConstants.TransactionPruning.PRUNE_INTERVAL,
+                                         TxConstants.TransactionPruning.DEFAULT_PRUNE_INTERVAL);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    if (!pruneEnabled) {
+      LOG.info("Transaction pruning is not enabled");
+      return;
+    }
+
+    LOG.info("Starting {}...", this.getClass().getSimpleName());
+    scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+
+    Map<String, TransactionPruningPlugin> plugins = initializePlugins();
+    long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                     TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    scheduledExecutorService.scheduleAtFixedRate(
+      getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis),
+      scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
+    LOG.info("Scheduled {} plugins with interval {} seconds", plugins.size(), scheduleInterval);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    if (!pruneEnabled) {
+      return;
+    }
+
+    LOG.info("Stopping {}...", this.getClass().getSimpleName());
+    try {
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      scheduledExecutorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+    LOG.info("Stopped {}", this.getClass().getSimpleName());
+  }
+
+  @VisibleForTesting
+  TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager,
+                                                Map<String, TransactionPruningPlugin> plugins,
+                                                long txMaxLifetimeMillis) {
+    return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis);
+  }
+
+  private Map<String, TransactionPruningPlugin> initializePlugins()
+    throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException,
+    InstantiationException, IOException {
+    Map<String, TransactionPruningPlugin> initializedPlugins = new HashMap<>();
+
+    // Read set of plugin names from configuration
+    Set<String> plugins =
+      new HashSet<>(Arrays.asList(conf.getTrimmedStrings(TxConstants.TransactionPruning.PLUGINS,
+                                                         TxConstants.TransactionPruning.DEFAULT_PLUGIN)));
+
+    LOG.info("Initializing invalid list prune plugins {}", plugins);
+    for (String plugin : plugins) {
+      // Load the class for the plugin
+      // TODO: TEPHRA-205 classloader isolation for plugins
+      Class<? extends TransactionPruningPlugin> clazz = null;
+      if (TxConstants.TransactionPruning.DEFAULT_PLUGIN.equals(plugin)) {
+        Class<?> defaultClass = Class.forName(TxConstants.TransactionPruning.DEFAULT_PLUGIN_CLASS);
+        if (TransactionPruningPlugin.class.isAssignableFrom(defaultClass)) {
+          //noinspection unchecked
+          clazz = (Class<? extends TransactionPruningPlugin>) defaultClass;
+        }
+      } else {
+        clazz = conf.getClass(plugin + TxConstants.TransactionPruning.PLUGIN_CLASS_SUFFIX,
+                              null, TransactionPruningPlugin.class);
+      }
+      if (clazz == null) {
+        throw new IllegalStateException("No class specified in configuration for invalid pruning plugin " + plugin);
+      }
+      LOG.debug("Got class {} for plugin {}", clazz.getName(), plugin);
+
+      TransactionPruningPlugin instance = Instances.newInstance(clazz);
+      instance.initialize(conf);
+      LOG.debug("Plugin {} initialized", plugin);
+      initializedPlugins.put(plugin, instance);
+    }
+
+    return initializedPlugins;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
index 3a1a071..aaca23d 100644
--- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
+++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
@@ -168,6 +168,18 @@ public class TxUtils {
   }
 
   /**
+   * Returns the greatest transaction that has passed the maximum duration a transaction can be used for data writes.
+   * In other words, at <code>timeMills</code> there can be no writes from transactions equal to or smaller
+   * than the returned bound.
+   *
+   * @param timeMills time in milliseconds for which the inactive transaction bound needs to be determined
+   * @param txMaxLifetimeMillis maximum duration a transaction can be used for data writes, in milliseconds
+   */
+  public static long getInactiveTxBound(long timeMills, long txMaxLifetimeMillis) {
+    return (timeMills - txMaxLifetimeMillis) * TxConstants.MAX_TX_PER_MS - 1;
+  }
+
+  /**
    * Returns the timestamp at which the given transaction id was generated.
    *
    * @param txId transaction id

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
new file mode 100644
index 0000000..9c23ab7
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.txprune;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test {@link TransactionPruningService}.
+ */
+public class TransactionPruningServiceTest {
+
+  @Before
+  public void resetData() {
+    MockTxManager.getPrunedInvalidsList().clear();
+
+    MockPlugin1.getInactiveTransactionBoundList().clear();
+    MockPlugin1.getMaxPrunedInvalidList().clear();
+
+    MockPlugin2.getInactiveTransactionBoundList().clear();
+    MockPlugin2.getMaxPrunedInvalidList().clear();
+  }
+
+  @Test
+  public void testTransactionPruningService() throws Exception {
+    // Setup plugins
+    Configuration conf = new Configuration();
+    conf.set(TxConstants.TransactionPruning.PLUGINS,
+             "data.tx.txprune.plugin.mockPlugin1, data.tx.txprune.plugin.mockPlugin2");
+    conf.set("data.tx.txprune.plugin.mockPlugin1.class",
+             "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin1");
+    conf.set("data.tx.txprune.plugin.mockPlugin2.class",
+             "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2");
+    // Setup schedule to run every second
+    conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
+    conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
+
+    // Setup mock data
+    long m = 1000;
+    long n = m * TxConstants.MAX_TX_PER_MS;
+    // Current time to be returned
+    Iterator<Long> currentTime = Iterators.cycle(120L * m, 220L * m);
+    // Transaction objects to be returned by mock tx manager
+    Iterator<Transaction> txns =
+      Iterators.cycle(new Transaction(100 * n, 110 * n, new long[]{40 * n, 50 * n, 60 * n, 70 * n},
+                                      new long[]{80 * n, 90 * n}, 80 * n),
+                      new Transaction(200 * n, 210 * n, new long[]{60 * n, 75 * n, 78 * n, 100 * n, 110 * n, 120 * n},
+                                      new long[]{80 * n, 90 * n}, 80 * n));
+    // Prune upper bounds to be returned by the mock plugins
+    Iterator<Long> pruneUpperBoundsPlugin1 = Iterators.cycle(60L * n, 80L * n);
+    Iterator<Long> pruneUpperBoundsPlugin2 = Iterators.cycle(70L * n, 77L * n);
+
+    TestTransactionPruningRunnable.setCurrentTime(currentTime);
+    MockTxManager.setTxIter(txns);
+    MockPlugin1.setPruneUpperBoundIter(pruneUpperBoundsPlugin1);
+    MockPlugin2.setPruneUpperBoundIter(pruneUpperBoundsPlugin2);
+
+    MockTxManager mockTxManager = new MockTxManager(conf);
+    TransactionPruningService pruningService = new TestTransactionPruningService(conf, mockTxManager);
+    pruningService.startAndWait();
+    // This will cause the pruning run to happen three times,
+    // but we are interested in only first two runs for the assertions later
+    TimeUnit.SECONDS.sleep(3);
+    pruningService.stopAndWait();
+
+    // Assert inactive transaction bound that the plugins receive.
+    // Both the plugins should get the same inactive transaction bound since it is
+    // computed and passed by the transaction service
+    Assert.assertEquals(ImmutableList.of(110L * n - 1, 210L * n - 1),
+                        limitTwo(MockPlugin1.getInactiveTransactionBoundList()));
+    Assert.assertEquals(ImmutableList.of(110L * n - 1, 210L * n - 1),
+                        limitTwo(MockPlugin2.getInactiveTransactionBoundList()));
+
+    // Assert invalid list entries that got pruned
+    // The min prune upper bound for the first run should be 60, and for the second run 77
+    Assert.assertEquals(ImmutableList.of(ImmutableSet.of(40L * n, 50L * n, 60L * n), ImmutableSet.of(60L * n, 75L * n)),
+                        limitTwo(MockTxManager.getPrunedInvalidsList()));
+
+    // Assert max invalid tx pruned that the plugins receive for the prune complete call
+    // Both the plugins should get the same max invalid tx pruned value since it is
+    // computed and passed by the transaction service
+    Assert.assertEquals(ImmutableList.of(60L * n, 75L * n), limitTwo(MockPlugin1.getMaxPrunedInvalidList()));
+    Assert.assertEquals(ImmutableList.of(60L * n, 75L * n), limitTwo(MockPlugin2.getMaxPrunedInvalidList()));
+  }
+
+  @Test
+  public void testNoPruning() throws Exception {
+    // Setup plugins
+    Configuration conf = new Configuration();
+    conf.set(TxConstants.TransactionPruning.PLUGINS,
+             "data.tx.txprune.plugin.mockPlugin1, data.tx.txprune.plugin.mockPlugin2");
+    conf.set("data.tx.txprune.plugin.mockPlugin1.class",
+             "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin1");
+    conf.set("data.tx.txprune.plugin.mockPlugin2.class",
+             "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2");
+    // Setup schedule to run every second
+    conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
+    conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
+
+    // Setup mock data
+    long m = 1000;
+    long n = m * TxConstants.MAX_TX_PER_MS;
+    // Current time to be returned
+    Iterator<Long> currentTime = Iterators.cycle(120L * m, 220L * m);
+    // Transaction objects to be returned by mock tx manager
+    Iterator<Transaction> txns =
+      Iterators.cycle(new Transaction(100 * n, 110 * n, new long[]{40 * n, 50 * n, 60 * n, 70 * n},
+                                      new long[]{80 * n, 90 * n}, 80 * n),
+                      new Transaction(200 * n, 210 * n, new long[]{60 * n, 75 * n, 78 * n, 100 * n, 110 * n, 120 * n},
+                                      new long[]{80 * n, 90 * n}, 80 * n));
+    // Prune upper bounds to be returned by the mock plugins
+    Iterator<Long> pruneUpperBoundsPlugin1 = Iterators.cycle(35L * n, -1L);
+    Iterator<Long> pruneUpperBoundsPlugin2 = Iterators.cycle(70L * n, 100L * n);
+
+    TestTransactionPruningRunnable.setCurrentTime(currentTime);
+    MockTxManager.setTxIter(txns);
+    MockPlugin1.setPruneUpperBoundIter(pruneUpperBoundsPlugin1);
+    MockPlugin2.setPruneUpperBoundIter(pruneUpperBoundsPlugin2);
+
+    MockTxManager mockTxManager = new MockTxManager(conf);
+    TransactionPruningService pruningService = new TestTransactionPruningService(conf, mockTxManager);
+    pruningService.startAndWait();
+    // This will cause the pruning run to happen three times,
+    // but we are interested in only first two runs for the assertions later
+    TimeUnit.SECONDS.sleep(3);
+    pruningService.stopAndWait();
+
+    // Assert inactive transaction bound
+    Assert.assertEquals(ImmutableList.of(110L * n - 1, 210L * n - 1),
+                        limitTwo(MockPlugin1.getInactiveTransactionBoundList()));
+    Assert.assertEquals(ImmutableList.of(110L * n - 1, 210L * n - 1),
+                        limitTwo(MockPlugin2.getInactiveTransactionBoundList()));
+
+    // Invalid entries should not be pruned in any run
+    Assert.assertEquals(ImmutableList.of(), MockTxManager.getPrunedInvalidsList());
+
+    // No max invalid tx pruned for any run
+    Assert.assertEquals(ImmutableList.of(), limitTwo(MockPlugin1.getMaxPrunedInvalidList()));
+    Assert.assertEquals(ImmutableList.of(), limitTwo(MockPlugin2.getMaxPrunedInvalidList()));
+  }
+
+  /**
+   * Mock transaction manager for testing
+   */
+  private static class MockTxManager extends TransactionManager {
+    private static Iterator<Transaction> txIter;
+    private static List<Set<Long>> prunedInvalidsList = new ArrayList<>();
+
+    MockTxManager(Configuration config) {
+      super(config);
+    }
+
+    @Override
+    public Transaction startShort() {
+      return txIter.next();
+    }
+
+    @Override
+    public void abort(Transaction tx) {
+      // do nothing
+    }
+
+    @Override
+    public boolean truncateInvalidTx(Set<Long> invalidTxIds) {
+      prunedInvalidsList.add(invalidTxIds);
+      return true;
+    }
+
+    static void setTxIter(Iterator<Transaction> txIter) {
+      MockTxManager.txIter = txIter;
+    }
+
+    static List<Set<Long>> getPrunedInvalidsList() {
+      return prunedInvalidsList;
+    }
+  }
+
+  /**
+   * Extends {@link TransactionPruningService} to use mock time to help in testing.
+   */
+  private static class TestTransactionPruningService extends TransactionPruningService {
+    TestTransactionPruningService(Configuration conf, TransactionManager txManager) {
+      super(conf, txManager);
+    }
+
+    @Override
+    TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager,
+                                                  Map<String, TransactionPruningPlugin> plugins,
+                                                  long txMaxLifetimeMillis) {
+      return new TestTransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis);
+    }
+  }
+
+  /**
+   * Extends {@link TransactionPruningRunnable} to use mock time to help in testing.
+   */
+  private static class TestTransactionPruningRunnable extends TransactionPruningRunnable {
+    private static Iterator<Long> currentTime;
+    TestTransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins,
+                                   long txMaxLifetimeMillis) {
+      super(txManager, plugins, txMaxLifetimeMillis);
+    }
+
+    @Override
+    long getTime() {
+      return currentTime.next();
+    }
+
+    static void setCurrentTime(Iterator<Long> currentTime) {
+      TestTransactionPruningRunnable.currentTime = currentTime;
+    }
+  }
+
+  /**
+   * Mock transaction pruning plugin for testing.
+   */
+  private static class MockPlugin1 implements TransactionPruningPlugin {
+    private static Iterator<Long> pruneUpperBoundIter;
+    private static List<Long> inactiveTransactionBoundList = new ArrayList<>();
+    private static List<Long> maxPrunedInvalidList = new ArrayList<>();
+
+    @Override
+    public void initialize(Configuration conf) throws IOException {
+      // Nothing to do
+    }
+
+    @Override
+    public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
+      inactiveTransactionBoundList.add(inactiveTransactionBound);
+      return pruneUpperBoundIter.next();
+    }
+
+    @Override
+    public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
+      maxPrunedInvalidList.add(maxPrunedInvalid);
+    }
+
+    @Override
+    public void destroy() {
+      // Nothing to do
+    }
+
+    static void setPruneUpperBoundIter(Iterator<Long> pruneUpperBoundIter) {
+      MockPlugin1.pruneUpperBoundIter = pruneUpperBoundIter;
+    }
+
+    static List<Long> getInactiveTransactionBoundList() {
+      return inactiveTransactionBoundList;
+    }
+
+    static List<Long> getMaxPrunedInvalidList() {
+      return maxPrunedInvalidList;
+    }
+  }
+
+  /**
+   * Mock transaction pruning plugin for testing.
+   */
+  private static class MockPlugin2 implements TransactionPruningPlugin {
+    private static Iterator<Long> pruneUpperBoundIter;
+    private static List<Long> inactiveTransactionBoundList = new ArrayList<>();
+    private static List<Long> maxPrunedInvalidList = new ArrayList<>();
+
+    @Override
+    public void initialize(Configuration conf) throws IOException {
+      // Nothing to do
+    }
+
+    @Override
+    public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
+      inactiveTransactionBoundList.add(inactiveTransactionBound);
+      return pruneUpperBoundIter.next();
+    }
+
+    @Override
+    public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
+      maxPrunedInvalidList.add(maxPrunedInvalid);
+    }
+
+    @Override
+    public void destroy() {
+      // Nothing to do
+    }
+
+    static void setPruneUpperBoundIter(Iterator<Long> pruneUpperBoundIter) {
+      MockPlugin2.pruneUpperBoundIter = pruneUpperBoundIter;
+    }
+
+    static List<Long> getInactiveTransactionBoundList() {
+      return inactiveTransactionBoundList;
+    }
+
+    static List<Long> getMaxPrunedInvalidList() {
+      return maxPrunedInvalidList;
+    }
+  }
+
+  private static <T> List<T> limitTwo(Iterable<T> iterable) {
+    return ImmutableList.copyOf(Iterables.limit(iterable, 2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 132c157..e495692 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -57,7 +57,7 @@ import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
-import org.apache.tephra.hbase.coprocessor.janitor.CompactionState;
+import org.apache.tephra.hbase.txprune.CompactionState;
 import org.apache.tephra.persist.TransactionVisibilityState;
 import org.apache.tephra.util.TxUtils;
 
@@ -151,12 +151,12 @@ public class TransactionProcessor extends BaseRegionObserver {
         TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
                                                                 TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
 
-      boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.DataJanitor.PRUNE_ENABLE,
-                                                               TxConstants.DataJanitor.DEFAULT_PRUNE_ENABLE);
+      boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                                               TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
       if (pruneEnabled) {
-        String pruneTable = env.getConfiguration().get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
-                                                       TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(env, TableName.valueOf(pruneTable));
+        String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis);
         LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
                     pruneTable);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
deleted file mode 100644
index 7412a18..0000000
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor.janitor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.persist.TransactionVisibilityState;
-import org.apache.tephra.util.TxUtils;
-
-import java.io.IOException;
-import javax.annotation.Nullable;
-
-/**
- * Record compaction state for invalid list pruning
- */
-public class CompactionState {
-  private static final Log LOG = LogFactory.getLog(CompactionState.class);
-
-  private final byte[] regionName;
-  private final String regionNameAsString;
-  private final TableName stateTable;
-  private final DataJanitorState dataJanitorState;
-  private volatile long pruneUpperBound = -1;
-
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
-    this.regionName = env.getRegionInfo().getRegionName();
-    this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.stateTable = stateTable;
-    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
-      @Override
-      public Table get() throws IOException {
-        return env.getTable(stateTable);
-      }
-    });
-  }
-
-  /**
-   * Records the transaction state used for a compaction. This method is called when the compaction starts.
-   *
-   * @param request {@link CompactionRequest} for the compaction
-   * @param snapshot transaction state that will be used for the compaction
-   */
-  public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) {
-    if (request.isMajor() && snapshot != null) {
-      Transaction tx = TxUtils.createDummyTransaction(snapshot);
-      pruneUpperBound = TxUtils.getPruneUpperBound(tx);
-      LOG.debug(
-        String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
-                      pruneUpperBound, request, snapshot.getTimestamp()));
-    } else {
-      pruneUpperBound = -1;
-    }
-  }
-
-  /**
-   * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
-   * This method is called after the compaction has successfully completed.
-   */
-  public void persist() {
-    if (pruneUpperBound != -1) {
-      try {
-        dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
-        LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
-      } catch (IOException e) {
-        LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
-                               stateTable, regionNameAsString), e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
deleted file mode 100644
index 7daac94..0000000
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor.janitor;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-
-/**
- * Persist data janitor state into an HBase table.
- * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin}
- * to persist and read the compaction state.
- */
-@SuppressWarnings("WeakerAccess")
-public class DataJanitorState {
-  public static final byte[] FAMILY = {'f'};
-
-  private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
-  private static final byte[] REGION_TIME_COL = {'r'};
-  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
-
-  private static final byte[] REGION_KEY_PREFIX = {0x1};
-  private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
-
-  private static final byte[] REGION_TIME_KEY_PREFIX = {0x2};
-  private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3};
-
-  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
-  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
-
-  private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
-
-  private final TableSupplier stateTableSupplier;
-
-
-  public DataJanitorState(TableSupplier stateTableSupplier) {
-    this.stateTableSupplier = stateTableSupplier;
-  }
-
-  // ----------------------------------------------------------------
-  // ------- Methods for prune upper bound for a given region -------
-  // ----------------------------------------------------------------
-  // The data is stored in the following format -
-  // Key: 0x1<region-id>
-  // Col 'u': <prune upper bound>
-  // ----------------------------------------------------------------
-
-  /**
-   * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor}
-   * after major compaction.
-   *
-   * @param regionId region id
-   * @param pruneUpperBound the latest prune upper bound for the region
-   * @throws IOException when not able to persist the data to HBase
-   */
-  public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException {
-    try (Table stateTable = stateTableSupplier.get()) {
-      Put put = new Put(makeRegionKey(regionId));
-      put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound));
-      stateTable.put(put);
-    }
-  }
-
-  /**
-   * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no
-   * longer has writes in this region.
-   *
-   * @param regionId region id
-   * @return latest prune upper bound for the region
-   * @throws IOException when not able to read the data from HBase
-   */
-  public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException {
-    try (Table stateTable = stateTableSupplier.get()) {
-      Get get = new Get(makeRegionKey(regionId));
-      get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
-      byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
-      return result == null ? -1 : Bytes.toLong(result);
-    }
-  }
-
-  /**
-   * Get latest prune upper bounds for given regions. This is a batch operation of method
-   * {@link #getPruneUpperBoundForRegion(byte[])}
-   *
-   * @param regions a set of regions
-   * @return a map containing region id and its latest prune upper bound value
-   * @throws IOException when not able to read the data from HBase
-   */
-  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
-    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    try (Table stateTable = stateTableSupplier.get()) {
-      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
-      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
-
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          byte[] region = getRegionFromKey(next.getRow());
-          if (regions.contains(region)) {
-            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
-            if (timeBytes != null) {
-              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
-              resultMap.put(region, pruneUpperBoundRegion);
-            }
-          }
-        }
-      }
-      return resultMap;
-    }
-  }
-
-  /**
-   * Delete prune upper bounds for the regions that are not in the given exclude set, and the
-   * prune upper bound is less than the given value.
-   * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
-   * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
-   * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
-   *
-   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
-   * @param excludeRegions set of regions that should not be deleted
-   * @throws IOException when not able to delete data in HBase
-   */
-  public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
-    throws IOException {
-    try (Table stateTable = stateTableSupplier.get()) {
-      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
-      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
-
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          byte[] region = getRegionFromKey(next.getRow());
-          if (!excludeRegions.contains(region)) {
-            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
-            if (timeBytes != null) {
-              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
-              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
-                stateTable.delete(new Delete(next.getRow()));
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
-  // ---------------------------------------------------
-  // ------- Methods for regions at a given time -------
-  // ---------------------------------------------------
-  // Key: 0x2<time><region-id>
-  // Col 't': <empty byte array>
-  // ---------------------------------------------------
-
-  /**
-   * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
-   * transactional regions existing in the HBase instance periodically.
-   *
-   * @param time timestamp in milliseconds
-   * @param regions set of regions at the time
-   * @throws IOException when not able to persist the data to HBase
-   */
-  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
-    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
-    try (Table stateTable = stateTableSupplier.get()) {
-      for (byte[] region : regions) {
-        Put put = new Put(makeTimeRegionKey(timeBytes, region));
-        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
-        stateTable.put(put);
-      }
-    }
-  }
-
-  /**
-   * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
-   * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
-   * older than that.
-   *
-   * @param time timestamp in milliseconds
-   * @return set of regions and time at which they were recorded, or null if no regions found
-   * @throws IOException when not able to read the data from HBase
-   */
-  @Nullable
-  public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
-    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
-    try (Table stateTable = stateTableSupplier.get()) {
-      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, REGION_TIME_COL);
-
-      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-      long currentRegionTime = -1;
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
-          // Stop if reached next time value
-          if (currentRegionTime == -1) {
-            currentRegionTime = timeRegion.getKey();
-          } else if (timeRegion.getKey() < currentRegionTime) {
-            break;
-          } else if (timeRegion.getKey() > currentRegionTime) {
-            throw new IllegalStateException(
-              String.format("Got out of order time %d when expecting time less than or equal to %d",
-                            timeRegion.getKey(), currentRegionTime));
-          }
-          regions.add(timeRegion.getValue());
-        }
-      }
-      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
-    }
-  }
-
-  /**
-   * Delete all the regions that were recorded for all times equal or less than the given time.
-   *
-   * @param time timestamp in milliseconds
-   * @throws IOException when not able to delete data in HBase
-   */
-  public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
-    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
-    try (Table stateTable = stateTableSupplier.get()) {
-      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, REGION_TIME_COL);
-
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
-        }
-      }
-    }
-  }
-
-  // ---------------------------------------------------------------------
-  // ------- Methods for inactive transaction bound for given time -------
-  // ---------------------------------------------------------------------
-  // Key: 0x3<inverted time>
-  // Col 'p': <inactive transaction bound>
-  // ---------------------------------------------------------------------
-
-  /**
-   * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that
-   * will not have writes in any HBase regions that are created after the given time.
-   *
-   * @param time time in milliseconds
-   * @param inactiveTransactionBound inactive transaction bound for the given time
-   * @throws IOException when not able to persist the data to HBase
-   */
-  public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException {
-    try (Table stateTable = stateTableSupplier.get()) {
-      Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
-      put.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound));
-      stateTable.put(put);
-    }
-  }
-
-  /**
-   * Return inactive transaction bound for the given time.
-   *
-   * @param time time in milliseconds
-   * @return inactive transaction bound for the given time
-   * @throws IOException when not able to read the data from HBase
-   */
-  public long getInactiveTransactionBoundForTime(long time) throws IOException {
-    try (Table stateTable = stateTableSupplier.get()) {
-      Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
-      get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
-      byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
-      return result == null ? -1 : Bytes.toLong(result);
-    }
-  }
-
-  /**
-   * Delete all inactive transaction bounds recorded for a time less than the given time
-   *
-   * @param time time in milliseconds
-   * @throws IOException when not able to delete data in HBase
-   */
-  public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException {
-    try (Table stateTable = stateTableSupplier.get()) {
-      Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
-                           INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
-      scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
-
-      try (ResultScanner scanner = stateTable.getScanner(scan)) {
-        Result next;
-        while ((next = scanner.next()) != null) {
-          stateTable.delete(new Delete(next.getRow()));
-        }
-      }
-    }
-  }
-
-  private byte[] makeRegionKey(byte[] regionId) {
-    return Bytes.add(REGION_KEY_PREFIX, regionId);
-  }
-
-  private byte[] getRegionFromKey(byte[] regionKey) {
-    int prefixLen = REGION_KEY_PREFIX.length;
-    return Bytes.copy(regionKey, prefixLen, regionKey.length - prefixLen);
-  }
-
-  private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) {
-    return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
-  }
-
-  private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
-    return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
-  }
-
-  private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) {
-    int offset = REGION_TIME_KEY_PREFIX.length;
-    long time = getInvertedTime(Bytes.toLong(key, offset));
-    offset += Bytes.SIZEOF_LONG;
-    byte[] regionName = Bytes.copy(key, offset, key.length - offset);
-    return Maps.immutableEntry(time, regionName);
-  }
-
-  private long getInvertedTime(long time) {
-    return Long.MAX_VALUE - time;
-  }
-
-  /**
-   * Supplies table for persisting state
-   */
-  public interface TableSupplier {
-    Table get() throws IOException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
deleted file mode 100644
index f662e37..0000000
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor.janitor;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
-import org.apache.tephra.janitor.TransactionPruningPlugin;
-import org.apache.tephra.util.TxUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- * Default implementation of the {@link TransactionPruningPlugin} for HBase.
- *
- * This plugin determines the prune upper bound for transactional HBase tables that use
- * coprocessor {@link TransactionProcessor}.
- *
- * <h3>State storage:</h3>
- *
- * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
- * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
- * In addition, the plugin also persists the following information on a run at time <i>t</i>
- * <ul>
- *   <li>
- *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
- *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
- *     attached to them.
- *   </li>
- *   <li>
- *     <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that
- *     will not have writes in any HBase regions that are created after time <i>t</i>.
- *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
- *     and passed on to the plugin.
- *   </li>
- * </ul>
- *
- * <h3>Computing prune upper bound:</h3>
- *
- * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
- * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
- * Since the prune upper bound will get recorded for a region only after a major compaction,
- * using only the latest set of regions we may not be able to find the
- * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
- * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
- * to determine the prune upper bound.
- *
- * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
- * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
- * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
- * <br/>
- * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
- * <ul>
- *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
- *   <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li>
- * </ul>
- *
- * <p/>
- * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
- * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
- * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
- * TransactionProcessor is always the latest prune upper bound for a region.
- * <br/>
- * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
- * inactive transaction bound at the time the region was created.
- * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>,
- * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any
- * transactional region of this HBase instance.
- *
- * <p/>
- * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
- * then you may need to write a new plugin to compute prune upper bound for those tables.
- */
-@SuppressWarnings("WeakerAccess")
-public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
-  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
-
-  protected Configuration conf;
-  protected Connection connection;
-  protected DataJanitorState dataJanitorState;
-
-  @Override
-  public void initialize(Configuration conf) throws IOException {
-    this.conf = conf;
-    this.connection = ConnectionFactory.createConnection(conf);
-
-    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
-                                                            TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
-    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
-    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
-      @Override
-      public Table get() throws IOException {
-        return connection.getTable(stateTable);
-      }
-    });
-  }
-
-  /**
-   * Determines prune upper bound for the data store as mentioned above.
-   */
-  @Override
-  public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
-    LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
-              time, inactiveTransactionBound);
-    if (time < 0 || inactiveTransactionBound < 0) {
-      return -1;
-    }
-
-    // Get all the current transactional regions
-    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
-    if (!transactionalRegions.isEmpty()) {
-      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
-      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
-      // Save inactive transaction bound for time as the final step.
-      // We can then use its existence to make sure that the data for a given time is complete or not
-      LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
-      dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
-    }
-
-    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
-  }
-
-  /**
-   * After invalid list has been pruned, this cleans up state information that is no longer required.
-   * This includes -
-   * <ul>
-   *   <li>
-   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
-   *     than maxPrunedInvalid
-   *   </li>
-   *   <li>
-   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
-   *     of maxPrunedInvalid
-   *   </li>
-   *   <li>
-   *     (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
-   *     information recorded on or before the start time of maxPrunedInvalid
-   *   </li>
-   * </ul>
-   */
-  @Override
-  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
-    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
-    if (time < 0 || maxPrunedInvalid < 0) {
-      return;
-    }
-
-    // Get regions for the current time, so as to not delete the prune upper bounds for them.
-    // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
-    // is done by this class. To avoid update/delete race condition, we only delete prune upper
-    // bounds for the stale regions.
-    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
-    if (regionsToExclude != null) {
-      LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
-      dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
-    } else {
-      LOG.warn("Cannot find saved regions on or before time {}", time);
-    }
-    long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
-    LOG.debug("Deleting regions recorded before time {}", pruneTime);
-    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
-    LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
-    dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
-  }
-
-  @Override
-  public void destroy() {
-    LOG.info("Stopping plugin...");
-    try {
-      connection.close();
-    } catch (IOException e) {
-      LOG.error("Got exception while closing HBase connection", e);
-    }
-  }
-
-  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
-    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
-  }
-
-  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
-    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-    try (Admin admin = connection.getAdmin()) {
-      HTableDescriptor[] tableDescriptors = admin.listTables();
-      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
-      if (tableDescriptors != null) {
-        for (HTableDescriptor tableDescriptor : tableDescriptors) {
-          if (isTransactionalTable(tableDescriptor)) {
-            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
-            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
-            if (tableRegions != null) {
-              for (HRegionInfo region : tableRegions) {
-                regions.add(region.getRegionName());
-              }
-            }
-          } else {
-            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
-          }
-        }
-      }
-    }
-    return regions;
-  }
-
-  /**
-   * Try to find the latest set of regions in which all regions have been major compacted, and
-   * compute prune upper bound from them. Starting from newest to oldest, this looks into the
-   * region set that has been saved periodically, and joins it with the prune upper bound data
-   * for a region recorded after a major compaction.
-   *
-   * @param timeRegions the latest set of regions
-   * @return prune upper bound
-   * @throws IOException when not able to talk to HBase
-   */
-  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
-    do {
-      LOG.debug("Computing prune upper bound for {}", timeRegions);
-      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
-      long time = timeRegions.getTime();
-
-      Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
-      logPruneUpperBoundRegions(pruneUpperBoundRegions);
-      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
-      // across all regions
-      if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
-        long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
-        LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
-        // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
-        if (inactiveTransactionBound != -1) {
-          Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
-          return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
-                        "and hence the data must be incomplete", time);
-          }
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
-          LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
-                    time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
-        }
-      }
-
-      timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
-    } while (timeRegions != null);
-    return -1;
-  }
-
-  private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Got region - prune upper bound map: {}",
-                Iterables.transform(pruneUpperBoundRegions.entrySet(),
-                                    new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
-                                      @Override
-                                      public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
-                                        String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
-                                        return Maps.immutableEntry(regionName, input.getValue());
-                                      }
-                                    }));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java
deleted file mode 100644
index 813f5dd..0000000
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tephra.hbase.coprocessor.janitor;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.util.Objects;
-import java.util.SortedSet;
-
-/**
- * Contains information on the set of transactional regions recorded at a given time
- */
-@SuppressWarnings("WeakerAccess")
-public class TimeRegions {
-  static final Function<byte[], String> BYTE_ARR_TO_STRING_FN =
-    new Function<byte[], String>() {
-      @Override
-      public String apply(byte[] input) {
-        return Bytes.toStringBinary(input);
-      }
-    };
-
-  private final long time;
-  private final SortedSet<byte[]> regions;
-
-  public TimeRegions(long time, SortedSet<byte[]> regions) {
-    this.time = time;
-    this.regions = regions;
-  }
-
-  public long getTime() {
-    return time;
-  }
-
-  public SortedSet<byte[]> getRegions() {
-    return regions;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    TimeRegions that = (TimeRegions) o;
-    return time == that.time &&
-      Objects.equals(regions, that.regions);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(time, regions);
-  }
-
-  @Override
-  public String toString() {
-    Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN);
-    return "TimeRegions{" +
-      "time=" + time +
-      ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" +
-      '}';
-  }
-}


Mime
View raw message