tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From go...@apache.org
Subject incubator-tephra git commit: TEPHRA-152 Using ReferenceCounting for TransactionStateCache refresh thread, so that it can be stopped
Date Thu, 18 May 2017 00:16:40 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master 3af3da869 -> 9b63985fc


TEPHRA-152 Using ReferenceCounting for TransactionStateCache refresh thread, so that it can be stopped

This closes #41 from GitHub.

Signed-off-by: Gokul Gunasekaran <gokul@cask.co>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/9b63985f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/9b63985f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/9b63985f

Branch: refs/heads/master
Commit: 9b63985fcfbe25469270c77d2bfb17e21e27ce5c
Parents: 3af3da8
Author: Gokul Gunasekaran <gokul@cask.co>
Authored: Tue Mar 21 15:37:48 2017 -0700
Committer: Gokul Gunasekaran <gokul@cask.co>
Committed: Wed May 17 17:16:32 2017 -0700

----------------------------------------------------------------------
 .../tephra/coprocessor/CacheSupplier.java       | 43 +++++++++
 .../coprocessor/ReferenceCountedSupplier.java   | 94 ++++++++++++++++++++
 .../TransactionStateCacheSupplier.java          | 43 +++++----
 .../hbase/coprocessor/TransactionProcessor.java | 15 +++-
 .../hbase/txprune/PruneUpperBoundWriter.java    |  4 +
 .../txprune/PruneUpperBoundWriterSupplier.java  | 66 ++++----------
 .../hbase/txprune/InvalidListPruneTest.java     | 11 ++-
 .../hbase/coprocessor/TransactionProcessor.java | 12 ++-
 .../hbase/txprune/PruneUpperBoundWriter.java    |  4 +
 .../txprune/PruneUpperBoundWriterSupplier.java  | 66 ++++----------
 .../hbase/txprune/InvalidListPruneTest.java     | 11 ++-
 .../hbase/coprocessor/TransactionProcessor.java | 15 +++-
 .../hbase/txprune/PruneUpperBoundWriter.java    |  4 +
 .../txprune/PruneUpperBoundWriterSupplier.java  | 66 ++++----------
 .../hbase/txprune/InvalidListPruneTest.java     | 11 ++-
 .../hbase/coprocessor/TransactionProcessor.java | 15 +++-
 .../hbase/txprune/PruneUpperBoundWriter.java    |  4 +
 .../txprune/PruneUpperBoundWriterSupplier.java  | 66 ++++----------
 .../hbase/txprune/InvalidListPruneTest.java     | 11 ++-
 .../hbase/coprocessor/TransactionProcessor.java | 15 +++-
 .../hbase/txprune/PruneUpperBoundWriter.java    |  4 +
 .../txprune/PruneUpperBoundWriterSupplier.java  | 66 ++++----------
 .../hbase/txprune/InvalidListPruneTest.java     | 11 ++-
 23 files changed, 357 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java
new file mode 100644
index 0000000..db93965
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.coprocessor;
+
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Service;
+
+/**
+ * Provides ability to get and release objects
+ *
+ * @param <T> type of the object supplied
+ */
+public interface CacheSupplier<T extends Service> extends Supplier<T> {
+
+  /**
+   * @return Get an instance of T and if it is the first call, then the service will be started. Subsequent calls
+   * will get a reference to the same instance
+   */
+  @Override
+  T get();
+
+  /**
+   * Release the object obtained through {code Supplier#get()}. If this is the last release call, then the service will
+   * be stopped.
+   */
+  void release();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java
new file mode 100644
index 0000000..a0fa7ad
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.coprocessor;
+
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Reference counts the {@link Service} and manages the lifecycle of the {@link Service} instance.
+ *
+ * @param <T> type of {@link Service} that is reference counted
+ */
+public class ReferenceCountedSupplier<T extends Service> {
+  private static final Log LOG = LogFactory.getLog(ReferenceCountedSupplier.class);
+
+  private final AtomicReference<T> instance = new AtomicReference<>(null);
+  private final AtomicInteger refCount = new AtomicInteger(0);
+  private final Object lock = new Object();
+
+  private final String instanceName;
+
+  public ReferenceCountedSupplier(String instanceName) {
+    this.instanceName = instanceName;
+  }
+
+  public T getOrCreate(Supplier<T> instanceSupplier) {
+    synchronized (lock) {
+      if (instance.get() == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Creating and starting Service %s.", instanceName));
+        }
+
+        // Instance has not been instantiated
+        T serviceInstance = instanceSupplier.get();
+        instance.set(serviceInstance);
+        serviceInstance.start();
+      }
+      int newCount = refCount.incrementAndGet();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Incrementing reference count for Service %s: %d", instanceName, newCount));
+      }
+      return instance.get();
+    }
+  }
+
+  public void release() {
+    synchronized (lock) {
+      if (refCount.get() <= 0) {
+        LOG.warn(String.format("Reference Count for Service %s is already zero.", instanceName));
+        return;
+      }
+
+      int newCount = refCount.decrementAndGet();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Decrementing reference count for Service %s: %d", instanceName, newCount));
+      }
+
+      if (newCount == 0) {
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Reference Count for Service is 0. Stopping Service %s.", instanceName));
+          }
+
+          Service serviceInstance = instance.get();
+          serviceInstance.stopAndWait();
+          instance.set(null);
+        } catch (Exception ex) {
+          LOG.warn(String.format("Exception while trying to stop Service %s.", instanceName), ex);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
index d19da36..db0ca50 100644
--- a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
+++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
@@ -24,31 +24,40 @@ import org.apache.hadoop.conf.Configuration;
 /**
  * Supplies instances of {@link TransactionStateCache} implementations.
  */
-public class TransactionStateCacheSupplier implements Supplier<TransactionStateCache> {
-  protected static volatile TransactionStateCache instance;
-  protected static Object lock = new Object();
+public class TransactionStateCacheSupplier implements CacheSupplier<TransactionStateCache> {
 
-  protected final Configuration conf;
+  private static final ReferenceCountedSupplier<TransactionStateCache> referenceCountedSupplier =
+    new ReferenceCountedSupplier<>(TransactionStateCache.class.getSimpleName());
 
-  public TransactionStateCacheSupplier(Configuration conf) {
-    this.conf = conf;
+  private final Supplier<TransactionStateCache> supplier;
+
+  public TransactionStateCacheSupplier(Supplier<TransactionStateCache> supplier) {
+    this.supplier = supplier;
+  }
+
+  public TransactionStateCacheSupplier(final Configuration conf) {
+    this.supplier = new Supplier<TransactionStateCache>() {
+      @Override
+      public TransactionStateCache get() {
+        TransactionStateCache transactionStateCache = new TransactionStateCache();
+        transactionStateCache.setConf(conf);
+        return transactionStateCache;
+      }
+    };
   }
 
   /**
    * Returns a singleton instance of the transaction state cache, performing lazy initialization if necessary.
-   * @return A shared instance of the transaction state cache.
+   *
+   * @return A shared instance of the transaction state cache
    */
   @Override
   public TransactionStateCache get() {
-    if (instance == null) {
-      synchronized (lock) {
-        if (instance == null) {
-          instance = new TransactionStateCache();
-          instance.setConf(conf);
-          instance.start();
-        }
-      }
-    }
-    return instance;
+    return referenceCountedSupplier.getOrCreate(supplier);
+  }
+
+  @Override
+  public void release() {
+    referenceCountedSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index d2402a6..10ecfa4 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.tephra.hbase.coprocessor;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
 import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
   private volatile CompactionState compactionState;
+  private CacheSupplier<TransactionStateCache> cacheSupplier;
 
   protected volatile Boolean pruneEnable;
   protected volatile Long txMaxLifetimeMillis;
@@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void start(CoprocessorEnvironment e) throws IOException {
     if (e instanceof RegionCoprocessorEnvironment) {
       RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
-      Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+      this.cacheSupplier = getTransactionStateCacheSupplier(env);
       this.cache = cacheSupplier.get();
 
       HTableDescriptor tableDesc = env.getRegion().getTableDesc();
@@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver {
     return env.getConfiguration();
   }
 
-  protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+  protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    resetPruneState();
+    try {
+      resetPruneState();
+    } finally {
+      if (cacheSupplier != null) {
+        cacheSupplier.release();
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 1c26ef1..5e0d435 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      if (flushThread.isAlive()) {
+        flushThread.interrupt();
+        flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
 
 
 import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
 
 /**
  * Supplies instances of {@link PruneUpperBoundWriter} implementations.
  */
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
-  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
 
-  private static volatile PruneUpperBoundWriter instance;
-  private static volatile int refCount = 0;
-  private static final Object lock = new Object();
+  private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+    new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
 
-  private final TableName tableName;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
+  private final Supplier<PruneUpperBoundWriter> supplier;
 
-  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
-                                       long pruneFlushInterval) {
-    this.tableName = tableName;
-    this.dataJanitorState = dataJanitorState;
-    this.pruneFlushInterval = pruneFlushInterval;
+  public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+                                       final long pruneFlushInterval) {
+    this.supplier = new Supplier<PruneUpperBoundWriter>() {
+      @Override
+      public PruneUpperBoundWriter get() {
+        return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+      }
+    };
   }
 
   @Override
   public PruneUpperBoundWriter get() {
-    synchronized (lock) {
-      if (instance == null) {
-        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
-        instance.startAndWait();
-      }
-      refCount++;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-      return instance;
-    }
+    return referenceCountedSupplier.getOrCreate(supplier);
   }
 
   public void release() {
-    synchronized (lock) {
-      refCount--;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-
-      if (refCount == 0) {
-        try {
-          instance.stopAndWait();
-        } catch (Exception ex) {
-          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
-        } finally {
-          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
-          if (instance.isAlive()) {
-            try {
-              instance.shutDown();
-            } catch (Exception e) {
-              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
-            }
-          }
-          instance = null;
-        }
-      }
-    }
+    referenceCountedSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index e3f5c6b..91bbc1a 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.tephra.hbase.txprune;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.hbase.AbstractHBaseTableTest;
 import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -404,12 +404,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
 
     @Override
-    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
-      return new Supplier<TransactionStateCache>() {
+    protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+      return new CacheSupplier<TransactionStateCache>() {
         @Override
         public TransactionStateCache get() {
           return new InMemoryTransactionStateCache();
         }
+
+        @Override
+        public void release() {
+          // no-op
+        }
       };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 84776cf..30b69a1 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
 import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +109,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
   private volatile CompactionState compactionState;
+  private CacheSupplier<TransactionStateCache> cacheSupplier;
 
   protected volatile Boolean pruneEnable;
   protected volatile Long txMaxLifetimeMillis;
@@ -168,13 +170,19 @@ public class TransactionProcessor extends BaseRegionObserver {
     return env.getConfiguration();
   }
 
-  protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+  protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    resetPruneState();
+    try {
+      resetPruneState();
+    } finally {
+      if (cacheSupplier != null) {
+        cacheSupplier.release();
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 1c26ef1..5e0d435 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      if (flushThread.isAlive()) {
+        flushThread.interrupt();
+        flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
 
 
 import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
 
 /**
  * Supplies instances of {@link PruneUpperBoundWriter} implementations.
  */
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
-  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
 
-  private static volatile PruneUpperBoundWriter instance;
-  private static volatile int refCount = 0;
-  private static final Object lock = new Object();
+  private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+    new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
 
-  private final TableName tableName;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
+  private final Supplier<PruneUpperBoundWriter> supplier;
 
-  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
-                                       long pruneFlushInterval) {
-    this.tableName = tableName;
-    this.dataJanitorState = dataJanitorState;
-    this.pruneFlushInterval = pruneFlushInterval;
+  public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+                                       final long pruneFlushInterval) {
+    this.supplier = new Supplier<PruneUpperBoundWriter>() {
+      @Override
+      public PruneUpperBoundWriter get() {
+        return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+      }
+    };
   }
 
   @Override
   public PruneUpperBoundWriter get() {
-    synchronized (lock) {
-      if (instance == null) {
-        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
-        instance.startAndWait();
-      }
-      refCount++;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-      return instance;
-    }
+    return referenceCountedSupplier.getOrCreate(supplier);
   }
 
   public void release() {
-    synchronized (lock) {
-      refCount--;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-
-      if (refCount == 0) {
-        try {
-          instance.stopAndWait();
-        } catch (Exception ex) {
-          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
-        } finally {
-          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
-          if (instance.isAlive()) {
-            try {
-              instance.shutDown();
-            } catch (Exception e) {
-              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
-            }
-          }
-          instance = null;
-        }
-      }
-    }
+    referenceCountedSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index e3f5c6b..91bbc1a 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.tephra.hbase.txprune;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.hbase.AbstractHBaseTableTest;
 import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -404,12 +404,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
 
     @Override
-    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
-      return new Supplier<TransactionStateCache>() {
+    protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+      return new CacheSupplier<TransactionStateCache>() {
         @Override
         public TransactionStateCache get() {
           return new InMemoryTransactionStateCache();
         }
+
+        @Override
+        public void release() {
+          // no-op
+        }
       };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index b73bdc1..ca96052 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.tephra.hbase.coprocessor;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
 import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
   private volatile CompactionState compactionState;
+  private CacheSupplier<TransactionStateCache> cacheSupplier;
 
   protected volatile Boolean pruneEnable;
   protected volatile Long txMaxLifetimeMillis;
@@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void start(CoprocessorEnvironment e) throws IOException {
     if (e instanceof RegionCoprocessorEnvironment) {
       RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
-      Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+      this.cacheSupplier = getTransactionStateCacheSupplier(env);
       this.cache = cacheSupplier.get();
 
       HTableDescriptor tableDesc = env.getRegion().getTableDesc();
@@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver {
     return env.getConfiguration();
   }
 
-  protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+  protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    resetPruneState();
+    try {
+      resetPruneState();
+    } finally {
+      if (cacheSupplier != null) {
+        cacheSupplier.release();
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 1c26ef1..5e0d435 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      if (flushThread.isAlive()) {
+        flushThread.interrupt();
+        flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
 
 
 import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
 
 /**
  * Supplies instances of {@link PruneUpperBoundWriter} implementations.
  */
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
-  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
 
-  private static volatile PruneUpperBoundWriter instance;
-  private static volatile int refCount = 0;
-  private static final Object lock = new Object();
+  private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+    new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
 
-  private final TableName tableName;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
+  private final Supplier<PruneUpperBoundWriter> supplier;
 
-  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
-                                       long pruneFlushInterval) {
-    this.tableName = tableName;
-    this.dataJanitorState = dataJanitorState;
-    this.pruneFlushInterval = pruneFlushInterval;
+  public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+                                       final long pruneFlushInterval) {
+    this.supplier = new Supplier<PruneUpperBoundWriter>() {
+      @Override
+      public PruneUpperBoundWriter get() {
+        return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+      }
+    };
   }
 
   @Override
   public PruneUpperBoundWriter get() {
-    synchronized (lock) {
-      if (instance == null) {
-        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
-        instance.startAndWait();
-      }
-      refCount++;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-      return instance;
-    }
+    return referenceCountedSupplier.getOrCreate(supplier);
   }
 
   public void release() {
-    synchronized (lock) {
-      refCount--;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-
-      if (refCount == 0) {
-        try {
-          instance.stopAndWait();
-        } catch (Exception ex) {
-          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
-        } finally {
-          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
-          if (instance.isAlive()) {
-            try {
-              instance.shutDown();
-            } catch (Exception e) {
-              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
-            }
-          }
-          instance = null;
-        }
-      }
-    }
+    referenceCountedSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index c99904b..f2c1abc 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.tephra.hbase.txprune;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.hbase.AbstractHBaseTableTest;
 import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
 
     @Override
-    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
-      return new Supplier<TransactionStateCache>() {
+    protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+      return new CacheSupplier<TransactionStateCache>() {
         @Override
         public TransactionStateCache get() {
           return new InMemoryTransactionStateCache();
         }
+
+        @Override
+        public void release() {
+          // no-op
+        }
       };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index f9bb35e..263ee98 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.tephra.hbase.coprocessor;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
 import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
   private volatile CompactionState compactionState;
+  private CacheSupplier<TransactionStateCache> cacheSupplier;
 
   protected volatile Boolean pruneEnable;
   protected volatile Long txMaxLifetimeMillis;
@@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void start(CoprocessorEnvironment e) throws IOException {
     if (e instanceof RegionCoprocessorEnvironment) {
       RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
-      Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+      this.cacheSupplier = getTransactionStateCacheSupplier(env);
       this.cache = cacheSupplier.get();
 
       HTableDescriptor tableDesc = env.getRegion().getTableDesc();
@@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver {
     return env.getConfiguration();
   }
 
-  protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+  protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    resetPruneState();
+    try {
+      resetPruneState();
+    } finally {
+      if (cacheSupplier != null) {
+        cacheSupplier.release();
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 1c26ef1..5e0d435 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      if (flushThread.isAlive()) {
+        flushThread.interrupt();
+        flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
 
 
 import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
 
 /**
  * Supplies instances of {@link PruneUpperBoundWriter} implementations.
  */
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
-  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
 
-  private static volatile PruneUpperBoundWriter instance;
-  private static volatile int refCount = 0;
-  private static final Object lock = new Object();
+  private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+    new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
 
-  private final TableName tableName;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
+  private final Supplier<PruneUpperBoundWriter> supplier;
 
-  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
-                                       long pruneFlushInterval) {
-    this.tableName = tableName;
-    this.dataJanitorState = dataJanitorState;
-    this.pruneFlushInterval = pruneFlushInterval;
+  public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+                                       final long pruneFlushInterval) {
+    this.supplier = new Supplier<PruneUpperBoundWriter>() {
+      @Override
+      public PruneUpperBoundWriter get() {
+        return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+      }
+    };
   }
 
   @Override
   public PruneUpperBoundWriter get() {
-    synchronized (lock) {
-      if (instance == null) {
-        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
-        instance.startAndWait();
-      }
-      refCount++;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-      return instance;
-    }
+    return referenceCountedSupplier.getOrCreate(supplier);
   }
 
   public void release() {
-    synchronized (lock) {
-      refCount--;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-
-      if (refCount == 0) {
-        try {
-          instance.stopAndWait();
-        } catch (Exception ex) {
-          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
-        } finally {
-          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
-          if (instance.isAlive()) {
-            try {
-              instance.shutDown();
-            } catch (Exception e) {
-              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
-            }
-          }
-          instance = null;
-        }
-      }
-    }
+    referenceCountedSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 07746d8..ac5e923 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.tephra.hbase.txprune;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.hbase.AbstractHBaseTableTest;
 import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
 
     @Override
-    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
-      return new Supplier<TransactionStateCache>() {
+    protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+      return new CacheSupplier<TransactionStateCache>() {
         @Override
         public TransactionStateCache get() {
           return new InMemoryTransactionStateCache();
         }
+
+        @Override
+        public void release() {
+          // no-op
+        }
       };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 02e2dac..553f598 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.tephra.hbase.coprocessor;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
 import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
   private volatile CompactionState compactionState;
+  private CacheSupplier<TransactionStateCache> cacheSupplier;
 
   protected volatile Boolean pruneEnable;
   protected volatile Long txMaxLifetimeMillis;
@@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void start(CoprocessorEnvironment e) throws IOException {
     if (e instanceof RegionCoprocessorEnvironment) {
       RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
-      Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+      this.cacheSupplier = getTransactionStateCacheSupplier(env);
       this.cache = cacheSupplier.get();
 
       HTableDescriptor tableDesc = env.getRegion().getTableDesc();
@@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver {
     return env.getConfiguration();
   }
 
-  protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+  protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    resetPruneState();
+    try {
+      resetPruneState();
+    } finally {
+      if (cacheSupplier != null) {
+        cacheSupplier.release();
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 6bd8bab..677710b 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      if (flushThread.isAlive()) {
+        flushThread.interrupt();
+        flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
 
 
 import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
 
 /**
  * Supplies instances of {@link PruneUpperBoundWriter} implementations.
  */
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
-  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
 
-  private static volatile PruneUpperBoundWriter instance;
-  private static volatile int refCount = 0;
-  private static final Object lock = new Object();
+  private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+    new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
 
-  private final TableName tableName;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
+  private final Supplier<PruneUpperBoundWriter> supplier;
 
-  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
-                                       long pruneFlushInterval) {
-    this.tableName = tableName;
-    this.dataJanitorState = dataJanitorState;
-    this.pruneFlushInterval = pruneFlushInterval;
+  public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+                                       final long pruneFlushInterval) {
+    this.supplier = new Supplier<PruneUpperBoundWriter>() {
+      @Override
+      public PruneUpperBoundWriter get() {
+        return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+      }
+    };
   }
 
   @Override
   public PruneUpperBoundWriter get() {
-    synchronized (lock) {
-      if (instance == null) {
-        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
-        instance.startAndWait();
-      }
-      refCount++;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-      return instance;
-    }
+    return referenceCountedSupplier.getOrCreate(supplier);
   }
 
   public void release() {
-    synchronized (lock) {
-      refCount--;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
-      }
-
-      if (refCount == 0) {
-        try {
-          instance.stopAndWait();
-        } catch (Exception ex) {
-          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
-        } finally {
-          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
-          if (instance.isAlive()) {
-            try {
-              instance.shutDown();
-            } catch (Exception e) {
-              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
-            }
-          }
-          instance = null;
-        }
-      }
-    }
+    referenceCountedSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 07746d8..ac5e923 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.tephra.hbase.txprune;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.hbase.AbstractHBaseTableTest;
 import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
 
     @Override
-    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
-      return new Supplier<TransactionStateCache>() {
+    protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+      return new CacheSupplier<TransactionStateCache>() {
         @Override
         public TransactionStateCache get() {
           return new InMemoryTransactionStateCache();
         }
+
+        @Override
+        public void release() {
+          // no-op
+        }
       };
     }
 


Mime
View raw message