tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/2] incubator-tephra git commit: (TEPHRA-241) Add a way to limit the size of a transaction
Date Sat, 09 Sep 2017 07:12:25 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master 8532076f8 -> ae6ce2b5e


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
index 9719bcc..9e57de8 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -22,8 +22,10 @@ import com.google.inject.Inject;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
 import org.slf4j.Logger;
@@ -67,6 +69,15 @@ public class InMemoryTxSystemClient implements TransactionSystemClient
{
 
   @Override
   public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException
{
+    try {
+      return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
+    } catch (TransactionSizeException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws
TransactionFailureException {
     return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
index b54e57f..de46f27 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -61,6 +61,11 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
   }
 
   @Override
+  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+    return true;
+  }
+
+  @Override
   public boolean commit(Transaction tx) {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index 0e05244..729e035 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -76,6 +76,8 @@ service TTransactionServer {
   TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws
(1:TGenericException e),
   TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e),
   TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException
e),
+  TBoolean canCommitOrThrow(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException
e,
+                                                                                2:TGenericException
g,),
   TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e),
   void abortTx(1: TTransaction tx),
   bool invalidateTx(1: i64 tx),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
index f4437c2..0d7d9bf 100644
--- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -52,7 +52,6 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class ThriftTransactionSystemTest extends TransactionSystemTest {
@@ -74,13 +73,12 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest
{
     zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
     zkServer.startAndWait();
 
-    Configuration conf = new Configuration();
+    Configuration conf = getCommonConfiguration();
     conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
     conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
     // we want to use a retry strategy that lets us query the number of times it retried:
     conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, CountingRetryStrategyProvider.class.getName());
     conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 2);
-    conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5));
// very long limit
 
     Injector injector = Guice.createInjector(
       new ConfigModule(conf),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index 56a9076..5f4675b 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -634,6 +634,11 @@ public class TransactionContextTest {
     }
 
     @Override
+    public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws
TransactionFailureException {
+      return canCommit(tx, changeIds);
+    }
+
+    @Override
     public boolean commit(Transaction tx) throws TransactionNotInProgressException {
       if (failCommits-- > 0) {
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
index b96b779..676774c 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
@@ -548,6 +548,11 @@ public class TransactionExecutorTest {
     }
 
     @Override
+    public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws
TransactionFailureException {
+      return canCommit(tx, changeIds);
+    }
+
+    @Override
     public boolean commit(Transaction tx) throws TransactionNotInProgressException {
       if (failCommits-- > 0) {
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
index 1fca773..819a981 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
@@ -26,11 +26,11 @@ import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.InMemoryTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
@@ -39,10 +39,10 @@ import java.util.concurrent.TimeUnit;
  */
 public class TransactionManagerTest extends TransactionSystemTest {
 
-  static Configuration conf = new Configuration();
+  private static Configuration conf;
 
-  TransactionManager txManager = null;
-  TransactionStateStorage txStateStorage = null;
+  private static TransactionManager txManager = null;
+  private static TransactionStateStorage txStateStorage = null;
 
   @Override
   protected TransactionSystemClient getClient() {
@@ -54,10 +54,10 @@ public class TransactionManagerTest extends TransactionSystemTest {
     return txStateStorage;
   }
 
-  @Before
-  public void before() {
+  @BeforeClass
+  public static void beforeClass() {
+    conf = getCommonConfiguration();
     conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
-    conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5));
// very long limit
     // todo should create two sets of tests, one with LocalFileTxStateStorage and one with
InMemoryTxStateStorage
     txStateStorage = new InMemoryTransactionStateStorage();
     txManager = new TransactionManager
@@ -65,104 +65,24 @@ public class TransactionManagerTest extends TransactionSystemTest {
     txManager.startAndWait();
   }
 
-  @After
-  public void after() {
+  @AfterClass
+  public static void afterClass() {
     txManager.stopAndWait();
   }
 
-  @Test
-  public void testCheckpointing() throws TransactionNotInProgressException {
-    // create a few transactions
-    Transaction tx1 = txManager.startShort();
-    Transaction tx2 = txManager.startShort();
-    Transaction tx3 = txManager.startShort();
-
-    // start and commit a few
-    for (int i = 0; i < 5; i++) {
-      Transaction tx = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte)
i })));
-      Assert.assertTrue(txManager.commit(tx));
-    }
-
-    // checkpoint the transactions
-    Transaction tx3c = txManager.checkpoint(tx3);
-    Transaction tx2c = txManager.checkpoint(tx2);
-    Transaction tx1c = txManager.checkpoint(tx1);
-
-    // start and commit a few (this moves the read pointer past all checkpoint write versions)
-    for (int i = 5; i < 10; i++) {
-      Transaction tx = txManager.startShort();
-      Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte)
i })));
-      Assert.assertTrue(txManager.commit(tx));
-    }
-
-    // start new tx and validate all write pointers are excluded
-    Transaction tx = txManager.startShort();
-    validateSorted(tx.getInProgress());
-    validateSorted(tx.getInvalids());
-    Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
-    txManager.abort(tx);
-
-    // abort one of the checkpoints
-    txManager.abort(tx1c);
-
-    // start new tx and validate all write pointers are excluded
-    tx = txManager.startShort();
-    validateSorted(tx.getInProgress());
-    validateSorted(tx.getInvalids());
-    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
-    txManager.abort(tx);
-
-    // invalidate one of the checkpoints
-    txManager.invalidate(tx2c.getTransactionId());
-
-    // start new tx and validate all write pointers are excluded
-    tx = txManager.startShort();
-    validateSorted(tx.getInProgress());
-    validateSorted(tx.getInvalids());
-    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
-    txManager.abort(tx);
-
-    // commit the last checkpoint
-    Assert.assertTrue(txManager.canCommit(tx3, Collections.<byte[]>emptyList()));
-    Assert.assertTrue(txManager.commit(tx3c));
-
-    // start new tx and validate all write pointers are excluded
-    tx = txManager.startShort();
-    validateSorted(tx.getInProgress());
-    validateSorted(tx.getInvalids());
-    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
-    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
-    txManager.abort(tx);
-  }
-
-  private void validateSorted(long[] array) {
-    Long lastSeen = null;
-    for (long value : array) {
-      Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
-                        lastSeen == null || lastSeen < value);
-      lastSeen = value;
-    }
+  @After
+  public void after() {
+    txManager.resetState();
   }
 
   @Test
   public void testTransactionCleanup() throws Exception {
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
-    conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
+    Configuration config = new Configuration(conf);
+    config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+    config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
     // using a new tx manager that cleans up
     TransactionManager txm = new TransactionManager
-      (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+      (config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
     txm.startAndWait();
     try {
       Assert.assertEquals(0, txm.getInvalidSize());
@@ -250,11 +170,12 @@ public class TransactionManagerTest extends TransactionSystemTest {
 
   @Test
   public void testLongTransactionCleanup() throws Exception {
-    conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
-    conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
+    Configuration config = new Configuration(conf);
+    config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+    config.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
     // using a new tx manager that cleans up
     TransactionManager txm = new TransactionManager
-      (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+      (config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
     txm.startAndWait();
     try {
       Assert.assertEquals(0, txm.getInvalidSize());

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
index 24798ca..5448052 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
@@ -19,17 +19,21 @@
 package org.apache.tephra;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.persist.TransactionSnapshot;
 import org.apache.tephra.persist.TransactionStateStorage;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
- *
+ * Base class for testing implementations of {@link TransactionSystemClient}.
  */
 public abstract class TransactionSystemTest {
 
@@ -38,6 +42,20 @@ public abstract class TransactionSystemTest {
   private static final byte[] C3 = new byte[] { 'c', '3' };
   private static final byte[] C4 = new byte[] { 'c', '4' };
 
+  /**
+   * Sets up the common properties required for the test cases defined here.
+   * Subclasses can call this and add more properties as needed.
+   */
+  static Configuration getCommonConfiguration() {
+    Configuration conf = new Configuration();
+    conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5));
// very long limit
+    conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT, 50);
+    conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD, 40);
+    conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT, 2048);
+    conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD, 1024);
+    return conf;
+  }
+
   protected abstract TransactionSystemClient getClient() throws Exception;
 
   protected abstract TransactionStateStorage getStateStorage() throws Exception;
@@ -63,6 +81,52 @@ public abstract class TransactionSystemTest {
   }
 
   @Test
+  public void testLargeChangeSet() throws Exception {
+    TransactionSystemClient client = getClient();
+    // first try with 50 changes (the max allowed)
+    List<byte[]> fiftyChanges = new ArrayList<>(51);
+    for (byte i = 0; i < 50; i++) {
+      fiftyChanges.add(new byte[] { i });
+    }
+    Transaction tx = client.startShort();
+    client.canCommitOrThrow(tx, fiftyChanges);
+    client.commit(tx);
+
+    // now try another transaction with 51 changes
+    fiftyChanges.add(new byte[] { 50 });
+    tx = client.startShort();
+    try {
+      client.canCommitOrThrow(tx, fiftyChanges);
+      Assert.fail("Expected " + TransactionSizeException.class.getName());
+    } catch (TransactionSizeException e) {
+      // expected
+    }
+    client.abort(tx);
+
+    // now try a change set that is just within the size limit
+    List<byte[]> changes2k = new ArrayList<>(51);
+    for (byte i = 0; i < 8; i++) {
+      byte[] change = new byte[256];
+      change[0] = i;
+      changes2k.add(change);
+    }
+    tx = client.startShort();
+    client.canCommitOrThrow(tx, changes2k);
+    client.commit(tx);
+
+    // now add another byte to the change set to exceed the limit
+    changes2k.add(new byte[] { 0 });
+    tx = client.startShort();
+    try {
+      client.canCommitOrThrow(tx, changes2k);
+      Assert.fail("Expected " + TransactionSizeException.class.getName());
+    } catch (TransactionSizeException e) {
+      // expected
+    }
+    client.abort(tx);
+  }
+
+  @Test
   public void testCommitRaceHandling() throws Exception {
     TransactionSystemClient client1 = getClient();
     TransactionSystemClient client2 = getClient();
@@ -70,9 +134,9 @@ public abstract class TransactionSystemTest {
     Transaction tx1 = client1.startShort();
     Transaction tx2 = client2.startShort();
 
-    Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2)));
+    Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1, C2)));
     // second one also can commit even thought there are conflicts with first since first
one hasn't committed yet
-    Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3)));
+    Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2, C3)));
 
     Assert.assertTrue(client1.commit(tx1));
 
@@ -97,16 +161,16 @@ public abstract class TransactionSystemTest {
     Transaction tx4 = client4.startShort();
     Transaction tx5 = client5.startShort();
 
-    Assert.assertTrue(client1.canCommit(tx1, asList(C1)));
+    Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1)));
     Assert.assertTrue(client1.commit(tx1));
 
-    Assert.assertTrue(client2.canCommit(tx2, asList(C2)));
+    Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2)));
     Assert.assertTrue(client2.commit(tx2));
 
     // verifying conflicts detection
-    Assert.assertFalse(client3.canCommit(tx3, asList(C1)));
-    Assert.assertFalse(client4.canCommit(tx4, asList(C2)));
-    Assert.assertTrue(client5.canCommit(tx5, asList(C3)));
+    Assert.assertFalse(client3.canCommitOrThrow(tx3, asList(C1)));
+    Assert.assertFalse(client4.canCommitOrThrow(tx4, asList(C2)));
+    Assert.assertTrue(client5.canCommitOrThrow(tx5, asList(C3)));
   }
 
   @Test
@@ -114,7 +178,7 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
     Assert.assertTrue(client.commit(tx));
     // cannot commit twice same tx
     try {
@@ -130,7 +194,7 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
     client.abort(tx);
     // abort of not active tx has no affect
     client.abort(tx);
@@ -141,11 +205,11 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
     Assert.assertTrue(client.commit(tx));
     // can't re-use same tx again
     try {
-      client.canCommit(tx, asList(C3, C4));
+      client.canCommitOrThrow(tx, asList(C3, C4));
       Assert.fail();
     } catch (TransactionNotInProgressException e) {
       // expected
@@ -172,7 +236,7 @@ public abstract class TransactionSystemTest {
                                         new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,

                                         TransactionType.SHORT);
     try {
-      Assert.assertFalse(client.canCommit(txOld, asList(C3, C4)));
+      Assert.assertFalse(client.canCommitOrThrow(txOld, asList(C3, C4)));
       Assert.fail();
     } catch (TransactionNotInProgressException e) {
       // expected
@@ -191,7 +255,7 @@ public abstract class TransactionSystemTest {
                                         new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,

                                         TransactionType.SHORT);
     try {
-      Assert.assertFalse(client.canCommit(txNew, asList(C3, C4)));
+      Assert.assertFalse(client.canCommitOrThrow(txNew, asList(C3, C4)));
       Assert.fail();
     } catch (TransactionNotInProgressException e) {
       // expected
@@ -211,7 +275,7 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     Transaction tx = client.startShort();
 
-    Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+    Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
     Assert.assertTrue(client.commit(tx));
     // abort of not active tx has no affect
     client.abort(tx);
@@ -223,11 +287,11 @@ public abstract class TransactionSystemTest {
     TransactionSystemClient client = getClient();
     // Invalidate an in-progress tx
     Transaction tx1 = client.startShort();
-    client.canCommit(tx1, asList(C1, C2));
+    client.canCommitOrThrow(tx1, asList(C1, C2));
     Assert.assertTrue(client.invalidate(tx1.getTransactionId()));
     // Cannot invalidate a committed tx
     Transaction tx2 = client.startShort();
-    client.canCommit(tx2, asList(C3, C4));
+    client.canCommitOrThrow(tx2, asList(C3, C4));
     client.commit(tx2);
     Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
   }
@@ -241,9 +305,9 @@ public abstract class TransactionSystemTest {
 
     Transaction tx1 = client.startShort();
     Transaction tx2 = client.startShort();
-    client.canCommit(tx1, asList(C1, C2));
+    client.canCommitOrThrow(tx1, asList(C1, C2));
     client.commit(tx1);
-    client.canCommit(tx2, asList(C3, C4));
+    client.canCommitOrThrow(tx2, asList(C3, C4));
 
     Transaction txPreReset = client.startShort();
     long currentTs = System.currentTimeMillis();
@@ -334,6 +398,93 @@ public abstract class TransactionSystemTest {
     Assert.assertEquals(3, client.getInvalidSize());
   }
 
+  @Test
+  public void testCheckpointing() throws Exception {
+    TransactionSystemClient client = getClient();
+    // create a few transactions
+    Transaction tx1 = client.startShort();
+    Transaction tx2 = client.startShort();
+    Transaction tx3 = client.startShort();
+
+    // start and commit a few
+    for (int i = 0; i < 5; i++) {
+      Transaction tx = client.startShort();
+      Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i
})));
+      Assert.assertTrue(client.commit(tx));
+    }
+
+    // checkpoint the transactions
+    Transaction tx3c = client.checkpoint(tx3);
+    Transaction tx2c = client.checkpoint(tx2);
+    Transaction tx1c = client.checkpoint(tx1);
+
+    // start and commit a few (this moves the read pointer past all checkpoint write versions)
+    for (int i = 5; i < 10; i++) {
+      Transaction tx = client.startShort();
+      Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i
})));
+      Assert.assertTrue(client.commit(tx));
+    }
+
+    // start new tx and validate all write pointers are excluded
+    Transaction tx = client.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    client.abort(tx);
+
+    // abort one of the checkpoints
+    client.abort(tx1c);
+
+    // start new tx and validate all write pointers are excluded
+    tx = client.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    client.abort(tx);
+
+    // invalidate one of the checkpoints
+    client.invalidate(tx2c.getTransactionId());
+
+    // start new tx and validate all write pointers are excluded
+    tx = client.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    client.abort(tx);
+
+    // commit the last checkpoint
+    Assert.assertTrue(client.canCommit(tx3, Collections.<byte[]>emptyList()));
+    Assert.assertTrue(client.commit(tx3c));
+
+    // start new tx and validate all write pointers are excluded
+    tx = client.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    client.abort(tx);
+  }
+
+  private void validateSorted(long[] array) {
+    Long lastSeen = null;
+    for (long value : array) {
+      Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
+                        lastSeen == null || lastSeen < value);
+      lastSeen = value;
+    }
+  }
+
   private Collection<byte[]> asList(byte[]... val) {
     return Arrays.asList(val);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index 18f81c8..9c565ba 100644
--- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -20,8 +20,6 @@ package org.apache.tephra.snapshot;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
@@ -31,8 +29,8 @@ import com.google.inject.Injector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionNotInProgressException;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.persist.TransactionSnapshot;
 import org.apache.tephra.persist.TransactionStateStorage;
@@ -49,7 +47,6 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -290,7 +287,7 @@ public class SnapshotCodecTest {
   }
 
   @Test
-  public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException
{
+  public void testSnapshotCodecV4() throws IOException, TransactionFailureException {
     File testDir = tmpDir.newFolder("testSnapshotCodecV4");
     Configuration conf = new Configuration();
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());


Mime
View raw message