tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From go...@apache.org
Subject [1/2] incubator-tephra git commit: (TEPHRA-215) (TEPHRA-218) Use single thread across all regions in a region server to persist Prune Upper Bound info. Also don't refresh cache during startup of TransactionStateCache to avoid the possibility of Service s
Date Thu, 09 Feb 2017 04:39:08 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master aeeee00be -> 69a6cc6be


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 5a355e6..015077b 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -331,8 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
             conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
                          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
           compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable),
pruneFlushInterval);
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be
recorded in table "
-                      + pruneTable);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be
recorded in table "
+                        + pruneTable);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 58596be..db7880b 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,27 +38,25 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
-  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
-  private volatile long pruneUpperBound = -1;
+  private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
+  private final PruneUpperBoundWriter pruneUpperBoundWriter;
 
-  private PruneUpperBoundWriter pruneUpperBoundWriter;
+  private volatile long pruneUpperBound = -1;
 
   public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable,
long pruneFlushInterval) {
-    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+    DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier()
{
       @Override
       public Table get() throws IOException {
         return env.getTable(stateTable);
       }
     });
-    this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
+    this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
+                                                                           pruneFlushInterval);
+    this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
   }
 
   /**
@@ -71,38 +69,33 @@ public class CompactionState {
     if (request.isMajor() && snapshot != null) {
       Transaction tx = TxUtils.createDummyTransaction(snapshot);
       pruneUpperBound = TxUtils.getPruneUpperBound(tx);
-      LOG.debug(
-        String.format("Computed prune upper bound %s for compaction request %s using transaction
state from time %s",
-                      pruneUpperBound, request, snapshot.getTimestamp()));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          String.format("Computed prune upper bound %s for compaction request %s using transaction
state from time %s",
+                        pruneUpperBound, request, snapshot.getTimestamp()));
+      }
     } else {
       pruneUpperBound = -1;
     }
   }
 
   /**
-   * Stops the current {@link PruneUpperBoundWriter}.
-   */
-  public void stop() {
-    if (pruneUpperBoundWriter != null) {
-      pruneUpperBoundWriter.stop();
-    }
-  }
-
-  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      if (!pruneUpperBoundWriter.isAlive()) {
-        pruneUpperBoundWriter = createPruneUpperBoundWriter();
+      pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound,
regionNameAsString));
       }
-      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
-      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound,
regionNameAsString));
     }
   }
 
-  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
-    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName,
pruneFlushInterval);
+  /**
+   * Releases the usage {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    pruneUpperBoundWriterSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 7bceaff..7e9d1a3 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -18,57 +18,62 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Thread that will write the the prune upper bound
+ * Thread that will write the the prune upper bound. An instance of this class should be
obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of
this instance.
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
+  private final TableName tableName;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable,
String regionNameAsString,
-                               byte[] regionName, long pruneFlushInterval) {
-    this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long
pruneFlushInterval) {
+    this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
-    this.regionName = regionName;
-    this.regionNameAsString = regionNameAsString;
     this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBound = new AtomicLong();
-    this.shouldFlush = new AtomicBoolean(false);
-    startFlushThread();
+    this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    // The number of entries in this map is bound by the number of regions in this region
server and thus it will not
+    // grow indefinitely
+    pruneEntries.put(regionName, pruneUpperBound);
   }
 
   public boolean isAlive() {
-    return flushThread.isAlive();
+    return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-    this.pruneUpperBound.set(pruneUpperBound);
-    this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting PruneUpperBoundWriter Thread.");
+    startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Stopping PruneUpperBoundWriter Thread.");
     if (flushThread != null) {
       flushThread.interrupt();
+      flushThread.join(TimeUnit.SECONDS.toMillis(1));
     }
   }
 
@@ -76,19 +81,21 @@ public class PruneUpperBoundWriter {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while (!isInterrupted()) {
+        while ((!isInterrupted()) && isRunning()) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
-            if (shouldFlush.compareAndSet(true, false)) {
-              // should flush data
-              try {
-                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
-              } catch (IOException ex) {
-                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString
+ " in the table " +
-                           pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting
region.", ex);
-                // Retry again
-                shouldFlush.set(true);
+            // should flush data
+            try {
+              while (pruneEntries.firstEntry() != null) {
+                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                // We can now remove the entry only if the key and value match with what
we wrote since it is
+                // possible that a new pruneUpperBound for the same key has been added
+                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+            } catch (IOException ex) {
+              LOG.warn("Cannot record prune upper bound for a region to table " +
+                         tableName.getNameWithNamespaceInclAsString(), ex);
             }
             lastChecked = now;
           }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..98f3334
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter>
{
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
+                                       long pruneFlushInterval) {
+    this.tableName = tableName;
+    this.dataJanitorState = dataJanitorState;
+    this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+    synchronized (lock) {
+      if (instance == null) {
+        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+        instance.startAndWait();
+      }
+      refCount++;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+      return instance;
+    }
+  }
+
+  public void release() {
+    synchronized (lock) {
+      refCount--;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+
+      if (refCount == 0) {
+        try {
+          instance.stopAndWait();
+        } catch (Exception ex) {
+          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
+        } finally {
+          // If the thread is still alive (might happen if the thread was blocked on HBase
PUT call), interrupt it again
+          if (instance.isAlive()) {
+            try {
+              instance.shutDown();
+            } catch (Exception e) {
+              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread.
", e);
+            }
+          }
+          instance = null;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
new file mode 100644
index 0000000..08d3b49
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final Logger LOG = LoggerFactory.getLogger(PruneUpperBoundWriterSupplierTest.class);
+  private static final int NUM_OPS = 10000;
+  private static final int NUM_THREADS = 50;
+
+  @Test
+  public void testSupplier() throws Exception {
+    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null,
null, 10L);
+    // Get one instance now, for later comparisons
+    final PruneUpperBoundWriter writer = supplier.get();
+    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+    final Random random = new Random(System.currentTimeMillis());
+
+    // Start threads that will 'get' PruneUpperBoundWriters
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future> futureList = new ArrayList<>();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // Perform NUM_OPS 'gets' of PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            PruneUpperBoundWriter newWriter = supplier.get();
+            Assert.assertTrue(newWriter == writer);
+            int waitTime = random.nextInt(10);
+            try {
+              TimeUnit.MICROSECONDS.sleep(waitTime);
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(5, TimeUnit.SECONDS);
+    }
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    futureList.clear();
+    numOps.set(NUM_OPS);
+    // Start thread that release PruneUpperBoundWriters
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown
of the single instance of
+          // PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            supplier.release();
+            try {
+              TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(1, TimeUnit.SECONDS);
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    // Verify that the PruneUpperBoundWriter is still running and the pruneThread is still
alive.
+    Assert.assertTrue(writer.isRunning());
+    Assert.assertTrue(writer.isAlive());
+
+    // Since we got one instance in the beginning, we need to release it
+    supplier.release();
+
+    // Verify that the PruneUpperBoundWriter is shutdown and the pruneThread is not alive
anymore.
+    Assert.assertFalse(writer.isRunning());
+    Assert.assertFalse(writer.isAlive());
+  }
+}


Mime
View raw message