tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From go...@apache.org
Subject [1/5] incubator-tephra git commit: Porting Pruning changes to hbase-compat-0.96, hbase-compat-0.98, hbase-compat-1.0, hbase-compat-1.0-cdh [Forced Update!]
Date Thu, 19 Jan 2017 20:16:47 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master abf34e5f4 -> 92c61e7c4 (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
new file mode 100644
index 0000000..560b0fe
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Base class for tests that need a HBase cluster
+ */
+@SuppressWarnings("WeakerAccess")
+public abstract class AbstractHBaseTableTest {
+  protected static HBaseTestingUtility testUtil;
+  protected static HBaseAdmin hBaseAdmin;
+  protected static Configuration conf;
+
+  @BeforeClass
+  public static void startMiniCluster() throws Exception {
+    testUtil = conf == null ? new HBaseTestingUtility() : new HBaseTestingUtility(conf);
+    conf = testUtil.getConfiguration();
+
+    // Tune down the connection thread pool size
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    // Tunn down handler threads in regionserver
+    conf.setInt("hbase.regionserver.handler.count", 10);
+
+    // Set to random port
+    conf.setInt("hbase.master.port", 0);
+    conf.setInt("hbase.master.info.port", 0);
+    conf.setInt("hbase.regionserver.port", 0);
+    conf.setInt("hbase.regionserver.info.port", 0);
+
+    testUtil.startMiniCluster();
+    hBaseAdmin = testUtil.getHBaseAdmin();
+  }
+
+  @AfterClass
+  public static void shutdownMiniCluster() throws Exception {
+    try {
+      if (hBaseAdmin != null) {
+        hBaseAdmin.close();
+      }
+    } finally {
+      testUtil.shutdownMiniCluster();
+    }
+  }
+
+  protected static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception
{
+    return createTable(tableName, columnFamilies, false,
+                       Collections.singletonList(TransactionProcessor.class.getName()));
+  }
+
+  protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean
existingData,
+                                      List<String> coprocessors) throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+    for (byte[] family : columnFamilies) {
+      HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+      columnDesc.setMaxVersions(Integer.MAX_VALUE);
+      columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
+      desc.addFamily(columnDesc);
+    }
+    if (existingData) {
+      desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
+    }
+    // Divide individually to prevent any overflow
+    int priority = Coprocessor.PRIORITY_USER;
+    // order in list is the same order that coprocessors will be invoked
+    for (String coprocessor : coprocessors) {
+      desc.addCoprocessor(coprocessor, null, ++priority, null);
+    }
+    hBaseAdmin.createTable(desc);
+    testUtil.waitTableAvailable(tableName, 5000);
+    return new HTable(testUtil.getConfiguration(), tableName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 6dc7c28..e2fadbd 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,21 +19,14 @@ package org.apache.tephra.hbase;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -76,6 +69,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -89,14 +83,11 @@ import static org.junit.Assert.fail;
 /**
  * Tests for TransactionAwareHTables.
  */
-public class TransactionAwareHTableTest {
+public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);
 
-  private static HBaseTestingUtility testUtil;
-  private static HBaseAdmin hBaseAdmin;
-  private static TransactionStateStorage txStateStorage;
-  private static TransactionManager txManager;
-  private static Configuration conf;
+  static TransactionStateStorage txStateStorage;
+  static TransactionManager txManager;
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
   private HTable hTable;
@@ -146,23 +137,6 @@ public class TransactionAwareHTableTest {
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
-    testUtil = new HBaseTestingUtility();
-    conf = testUtil.getConfiguration();
-
-    // Tune down the connection thread pool size
-    conf.setInt("hbase.hconnection.threads.core", 5);
-    conf.setInt("hbase.hconnection.threads.max", 10);
-    // Tunn down handler threads in regionserver
-    conf.setInt("hbase.regionserver.handler.count", 10);
-
-    // Set to random port
-    conf.setInt("hbase.master.port", 0);
-    conf.setInt("hbase.master.info.port", 0);
-    conf.setInt("hbase.regionserver.port", 0);
-    conf.setInt("hbase.regionserver.info.port", 0);
-
-    testUtil.startMiniCluster();
-    hBaseAdmin = testUtil.getHBaseAdmin();
     txStateStorage = new InMemoryTransactionStateStorage();
     txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
     txManager.startAndWait();
@@ -170,8 +144,9 @@ public class TransactionAwareHTableTest {
 
   @AfterClass
   public static void shutdownAfterClass() throws Exception {
-    testUtil.shutdownMiniCluster();
-    hBaseAdmin.close();
+    if (txManager != null) {
+      txManager.stopAndWait();
+    }
   }
 
   @Before
@@ -186,34 +161,6 @@ public class TransactionAwareHTableTest {
     hBaseAdmin.disableTable(TestBytes.table);
     hBaseAdmin.deleteTable(TestBytes.table);
   }
-  
-  private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception
{
-    return createTable(tableName, columnFamilies, false, Collections.<String>emptyList());
-  }
-
-  private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,

-    List<String> coprocessors) throws Exception {
-    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-    for (byte[] family : columnFamilies) {
-      HColumnDescriptor columnDesc = new HColumnDescriptor(family);
-      columnDesc.setMaxVersions(Integer.MAX_VALUE);
-      columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
-      desc.addFamily(columnDesc);
-    }
-    if (existingData) {
-      desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
-    }
-    // Divide individually to prevent any overflow
-    int priority  = Coprocessor.PRIORITY_USER; 
-    desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null);
-    // order in list is the same order that coprocessors will be invoked  
-    for (String coprocessor : coprocessors) {
-      desc.addCoprocessor(coprocessor, null, ++priority, null);
-    }
-    hBaseAdmin.createTable(desc);
-    testUtil.waitTableAvailable(tableName, 5000);
-    return new HTable(testUtil.getConfiguration(), tableName);
-  }
 
   /**
    * Test transactional put and get requests.
@@ -409,7 +356,7 @@ public class TransactionAwareHTableTest {
   public void testAttributesPreserved() throws Exception {
     HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
         new byte[][]{TestBytes.family, TestBytes.family2}, false,
-        Lists.newArrayList(TestRegionObserver.class.getName()));
+        Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
     try {
       TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager),
txTable);
@@ -1123,7 +1070,7 @@ public class TransactionAwareHTableTest {
 
     TransactionAwareHTable txTable =
       new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family},
true, 
-      Collections.<String>emptyList()));
+      Collections.singletonList(TransactionProcessor.class.getName())));
     TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager),
txTable);
 
     // Add some pre-existing, non-transactional data
@@ -1272,8 +1219,9 @@ public class TransactionAwareHTableTest {
 
   @Test
   public void testVisibilityAll() throws Exception {
-    HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"),
-      new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList());
+    HTable nonTxTable =
+      createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
+                  true, Collections.singletonList(TransactionProcessor.class.getName()));
     TransactionAwareHTable txTable =
       new TransactionAwareHTable(nonTxTable,
                                  TxConstants.ConflictDetection.ROW); // ROW conflict detection
to verify family deletes
@@ -1549,6 +1497,66 @@ public class TransactionAwareHTableTest {
     transactionContext.finish();
   }
 
+  @Test
+  public void testTxLifetime() throws Exception {
+    // Add some initial values
+    transactionContext.start();
+    Put put = new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    transactionAwareHTable.put(put);
+    put = new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value);
+    transactionAwareHTable.put(put);
+    transactionContext.finish();
+
+    // Simulate writing with a transaction past its max lifetime
+    transactionContext.start();
+    Transaction currentTx = transactionContext.getCurrentTransaction();
+    Assert.assertNotNull(currentTx);
+
+    // Create a transaction that is past the max lifetime
+    long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                     TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    long oldTxId = currentTx.getTransactionId() - ((txMaxLifetimeMillis + 10000) * TxConstants.MAX_TX_PER_MS);
+    Transaction oldTx = new Transaction(currentTx.getReadPointer(), oldTxId,
+                                        currentTx.getInvalids(), currentTx.getInProgress(),
+                                        currentTx.getFirstShortInProgress());
+    transactionAwareHTable.updateTx(oldTx);
+    // Put with the old transaction should fail
+    put = new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    try {
+      transactionAwareHTable.put(put);
+      Assert.fail("Excepted exception with old transaction!");
+    } catch (IOException e) {
+      // Expected exception
+    }
+
+    // Delete with the old transaction should also fail
+    Delete delete = new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier);
+    try {
+      transactionAwareHTable.delete(delete);
+      Assert.fail("Excepted exception with old transaction!");
+    } catch (IOException e) {
+      // Expected exception
+    }
+
+    // Now update the table to use the current transaction
+    transactionAwareHTable.updateTx(currentTx);
+    put = new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value);
+    transactionAwareHTable.put(put);
+    delete = new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier2);
+    transactionAwareHTable.delete(delete);
+
+    // Verify values with the same transaction since we cannot commit the old transaction
+    verifyRow(transactionAwareHTable,
+              new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), TestBytes.value);
+    verifyRow(transactionAwareHTable,
+              new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), null);
+    verifyRow(transactionAwareHTable,
+              new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null);
+    verifyRow(transactionAwareHTable,
+              new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2),
TestBytes.value);
+    transactionContext.finish();
+  }
+
   /**
    * Tests that transaction co-processor works with older clients
    *

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
new file mode 100644
index 0000000..402892f
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Test methods of {@link DataJanitorState}
+ */
+// TODO: Group all the tests that need HBase mini cluster into a suite, so that we start
the mini-cluster only once
+public class DataJanitorStateTest extends AbstractHBaseTableTest {
+
+  private TableName pruneStateTable;
+  private DataJanitorState dataJanitorState;
+
+  @Before
+  public void beforeTest() throws Exception {
+    pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                 TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY},
false,
+                               // Prune state table is a non-transactional table, hence no
transaction co-processor
+                               Collections.<String>emptyList());
+    table.close();
+
+    dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    hBaseAdmin.disableTable(pruneStateTable);
+    hBaseAdmin.deleteTable(pruneStateTable);
+  }
+
+  @Test
+  public void testSavePruneUpperBound() throws Exception {
+    int max = 20;
+
+    // Nothing should be present in the beginning
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L)));
+
+    // Save some region - prune upper bound values
+    // We should have values for regions 0, 2, 4, 6, ..., max-2 after this
+    for (long i = 0; i < max; i += 2) {
+      dataJanitorState.savePruneUpperBoundForRegion(Bytes.toBytes(i), i);
+    }
+
+    Assert.assertEquals(10L, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L)));
+
+    // Verify all the saved values
+    for (long i = 0; i < max; ++i) {
+      long expected = i % 2 == 0 ? i : -1;
+      Assert.assertEquals(expected, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(i)));
+    }
+    // Regions not present should give -1
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(max
+ 50L)));
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes((max
+ 10L) * -1)));
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(3L)));
+
+    SortedSet<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    Map<byte[], Long> expectedMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (long i = 0; i < max; ++i) {
+      allRegions.add(Bytes.toBytes(i));
+      if (i % 2 == 0) {
+        expectedMap.put(Bytes.toBytes(i), i);
+      }
+    }
+    Assert.assertEquals(max / 2, expectedMap.size());
+    Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions));
+
+    SortedSet<byte[]> regions = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+      .add(Bytes.toBytes((max + 20L) * -1))
+      .add(Bytes.toBytes(6L))
+      .add(Bytes.toBytes(15L))
+      .add(Bytes.toBytes(18L))
+      .add(Bytes.toBytes(max + 33L))
+      .build();
+    expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR)
+      .put(Bytes.toBytes(6L), 6L)
+      .put(Bytes.toBytes(18L), 18L)
+      .build();
+    Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(regions));
+
+    // Delete regions that have prune upper bound before 15 and not in set (4, 8)
+    ImmutableSortedSet<byte[]> excludeRegions =
+      ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR).add(Bytes.toBytes(4L)).add(Bytes.toBytes(8L)).build();
+    dataJanitorState.deletePruneUpperBounds(15, excludeRegions);
+    // Regions 0, 2, 6 and 10 should have been deleted now
+    expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR)
+      .put(Bytes.toBytes(4L), 4L)
+      .put(Bytes.toBytes(8L), 8L)
+      .put(Bytes.toBytes(16L), 16L)
+      .put(Bytes.toBytes(18L), 18L)
+      .build();
+    Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions));
+  }
+
+  @Test
+  public void testSaveRegionTime() throws Exception {
+    int maxTime = 100;
+
+    // Nothing should be present in the beginning
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(maxTime));
+
+    // Save regions for time
+    Map<Long, SortedSet<byte[]>> regionsTime = new TreeMap<>();
+    for (long time = 0; time < maxTime; time += 10) {
+      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+      for (long region = 0; region < 10; region += 2) {
+        regions.add(Bytes.toBytes((time * 10) + region));
+      }
+      regionsTime.put(time, regions);
+      dataJanitorState.saveRegionsForTime(time, regions);
+    }
+
+    // Verify saved regions
+    Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+    Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
+    Assert.assertEquals(new TimeRegions(90, regionsTime.get(90L)),
+                        dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+
+    // Delete regions saved on or before time 30
+    dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
+    // Values on or before time 30 should be deleted
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
+    Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+    // Values after time 30 should still exist
+    Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+  }
+
+  @Test
+  public void testSaveInactiveTransactionBoundTime() throws Exception {
+    int maxTime = 100;
+
+    // Nothing sould be present in the beginning
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
+
+    // Save inactive transaction bounds for various time values
+    for (long time = 0; time < maxTime; time += 10) {
+      dataJanitorState.saveInactiveTransactionBoundForTime(time, time + 2);
+    }
+
+    // Verify written values
+    Assert.assertEquals(2, dataJanitorState.getInactiveTransactionBoundForTime(0));
+    Assert.assertEquals(12, dataJanitorState.getInactiveTransactionBoundForTime(10));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(15));
+    Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(maxTime +
100));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime((maxTime
+ 55) * -1L));
+
+    // Delete values saved on or before time 20
+    dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(20);
+    // Values on or before time 20 should be deleted
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(0));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
+    Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(20));
+    // Values after time 20 should still exist
+    Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
+    Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
new file mode 100644
index 0000000..310c710
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Test invalid list pruning
+ */
+public class InvalidListPruneTest extends AbstractHBaseTableTest {
+  private static final byte[] family = Bytes.toBytes("f1");
+  private static final byte[] qualifier = Bytes.toBytes("col1");
+  private static final int MAX_ROWS = 1000;
+
+  private static TableName txDataTable1;
+  private static TableName pruneStateTable;
+
+  // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
+  @BeforeClass
+  public static void startMiniCluster() throws Exception {
+    // Setup the configuration to start HBase cluster with the invalid list pruning enabled
+    conf = HBaseConfiguration.create();
+    conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    AbstractHBaseTableTest.startMiniCluster();
+
+    TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
+    TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+    txManager.startAndWait();
+
+    // Do some transactional data operations
+    txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
+    HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
+                                Collections.singletonList(TestTransactionProcessor.class.getName()));
+    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW))
{
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager),
txTable);
+      txContext.start();
+      for (int i = 0; i < MAX_ROWS; ++i) {
+        txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i)));
+      }
+      txContext.finish();
+    }
+
+    testUtil.flush(txDataTable1);
+    txManager.stopAndWait();
+
+    pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                 TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+  }
+
+  @AfterClass
+  public static void shutdownAfterClass() throws Exception {
+    hBaseAdmin.disableTable(txDataTable1);
+    hBaseAdmin.deleteTable(txDataTable1);
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    createPruneStateTable();
+    InMemoryTransactionStateCache.setTransactionSnapshot(null);
+  }
+
+  private void createPruneStateTable() throws Exception {
+    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY},
false,
+                               // Prune state table is a non-transactional table, hence no
transaction co-processor
+                               Collections.<String>emptyList());
+    table.close();
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    deletePruneStateTable();
+  }
+
+  private void deletePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      hBaseAdmin.disableTable(pruneStateTable);
+      hBaseAdmin.deleteTable(pruneStateTable);
+    }
+  }
+
+  @Test
+  public void testRecordCompactionState() throws Exception {
+    DataJanitorState dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
+    // No prune upper bound initially
+    Assert.assertEquals(-1,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+                              ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+    // Run minor compaction
+    testUtil.compact(txDataTable1, false);
+    // No prune upper bound after minor compaction too
+    Assert.assertEquals(-1,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+
+    // Run major compaction, and verify prune upper bound
+    testUtil.compact(txDataTable1, true);
+    Assert.assertEquals(50,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+
+    // Run major compaction again with same snapshot, prune upper bound should not change
+    testUtil.compact(txDataTable1, true);
+    Assert.assertEquals(50,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
+                              ImmutableSortedMap.of(105L, new TransactionManager.InProgressTx(
+                                100, 30, TransactionManager.InProgressType.SHORT))));
+    Assert.assertEquals(50,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+
+    // Run major compaction again, now prune upper bound should change
+    testUtil.compact(txDataTable1, true);
+    Assert.assertEquals(104,
+                        dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+  }
+
+  @Test
+  public void testRecordCompactionStateNoTable() throws Exception {
+    // To make sure we don't disrupt major compaction prune state table is not present, delete
the prune state table
+    // and make sure a major compaction succeeds
+    deletePruneStateTable();
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+                              ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+    // Run major compaction, and verify it completes
+    long now = System.currentTimeMillis();
+    testUtil.compact(txDataTable1, true);
+    long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
+    Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
+                      lastMajorCompactionTime >= now);
+  }
+
+  @Test
+  public void testRecordCompactionStateNoTxSnapshot() throws Exception {
+    // Test recording state without having a transaction snapshot to make sure we don't disrupt
+    // major compaction in that case
+    InMemoryTransactionStateCache.setTransactionSnapshot(null);
+    // Run major compaction, and verify it completes
+    long now = System.currentTimeMillis();
+    testUtil.compact(txDataTable1, true);
+    long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
+    Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
+                      lastMajorCompactionTime >= now);
+  }
+
+  @Test
+  public void testPruneUpperBound() throws Exception {
+    DataJanitorState dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
+    TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+    try {
+      // Run without a transaction snapshot first
+      long now1 = 200;
+      long inactiveTxTimeNow1 = 150 * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound1 = -1;
+      // fetch prune upper bound
+      long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+
+      TimeRegions expectedRegions1 =
+        new TimeRegions(now1,
+                        ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+                          .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+                          .build());
+      // Assert prune state is recorded correctly
+      Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1,
+                          dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      // Run prune complete
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Assert prune state was cleaned up correctly based on the prune time
+      Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1,
+                          dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      // Create a new transaction snapshot, and run major compaction on txDataTable1
+      // And run all assertions again
+      long now2 = 300;
+      long inactiveTxTimeNow2 = 250 * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = 200 * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+      TimeRegions expectedRegions2 =
+        new TimeRegions(now2,
+                        ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+                          .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+                          .build());
+      testUtil.compact(txDataTable1, true);
+      long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+
+      Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2));
+      Assert.assertEquals(expectedPruneUpperBound2,
+                          dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2));
+      Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      transactionPruningPlugin.pruneComplete(now2, pruneUpperBound2);
+      Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2));
+      Assert.assertEquals(expectedPruneUpperBound2,
+                          dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1,
Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2));
+      Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+    } finally {
+      transactionPruningPlugin.destroy();
+    }
+  }
+
+  private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
+    HRegionLocation regionLocation =
+      testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
+    return regionLocation.getRegionInfo().getRegionName();
+  }
+
+  /**
+   * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static class TestTransactionProcessor extends TransactionProcessor {
+    private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
+
+    @Override
+    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment
env) {
+      return new Supplier<TransactionStateCache>() {
+        @Override
+        public TransactionStateCache get() {
+          return new InMemoryTransactionStateCache();
+        }
+      };
+    }
+
+    @Override
+    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store
store, StoreFile resultFile,
+                            CompactionRequest request) throws IOException {
+      super.postCompact(e, store, resultFile, request);
+      lastMajorCompactionTime.set(System.currentTimeMillis());
+    }
+  }
+
+  /**
+   * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor}
for testing
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static class InMemoryTransactionStateCache extends TransactionStateCache {
+    private static TransactionVisibilityState transactionSnapshot;
+
+    public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot)
{
+      InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      // Nothing to do
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      // Nothing to do
+    }
+
+    @Override
+    public TransactionVisibilityState getLatestState() {
+      return transactionSnapshot;
+    }
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public static class TestTransactionPruningPlugin extends HBaseTransactionPruningPlugin
{
+    @Override
+    protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+      return tableDescriptor.hasCoprocessor(TestTransactionProcessor.class.getName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index e495692..ceffa4c 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -308,8 +308,7 @@ public class TransactionProcessor extends BaseRegionObserver {
       compactionState.record(request, snapshot);
     }
     // Also make sure to use the same snapshot for the compaction
-    return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners,
-                              scanType, earliestPutTs);
+    return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners,
scanType, earliestPutTs);
   }
 
   @Override



Mime
View raw message