Repository: incubator-tephra Updated Branches: refs/heads/master 8532076f8 -> ae6ce2b5e http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java index 9719bcc..9e57de8 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java @@ -22,8 +22,10 @@ import com.google.inject.Inject; import org.apache.tephra.InvalidTruncateTimeException; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionCouldNotTakeSnapshotException; +import org.apache.tephra.TransactionFailureException; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionNotInProgressException; +import org.apache.tephra.TransactionSizeException; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; import org.slf4j.Logger; @@ -67,6 +69,15 @@ public class InMemoryTxSystemClient implements TransactionSystemClient { @Override public boolean canCommit(Transaction tx, Collection changeIds) throws TransactionNotInProgressException { + try { + return changeIds.isEmpty() || txManager.canCommit(tx, changeIds); + } catch (TransactionSizeException e) { + return false; + } + } + + @Override + public boolean canCommitOrThrow(Transaction tx, Collection changeIds) throws TransactionFailureException { return changeIds.isEmpty() || txManager.canCommit(tx, changeIds); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java index b54e57f..de46f27 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java @@ -61,6 +61,11 @@ public class MinimalTxSystemClient implements TransactionSystemClient { } @Override + public boolean canCommitOrThrow(Transaction tx, Collection changeIds) { + return true; + } + + @Override public boolean commit(Transaction tx) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/thrift/transaction.thrift ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift index 0e05244..729e035 100644 --- a/tephra-core/src/main/thrift/transaction.thrift +++ b/tephra-core/src/main/thrift/transaction.thrift @@ -76,6 +76,8 @@ service TTransactionServer { TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws (1:TGenericException e), TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e), TBoolean canCommitTx(1: TTransaction tx, 2: set changes) throws (1:TTransactionNotInProgressException e), + TBoolean canCommitOrThrow(1: TTransaction tx, 2: set changes) throws (1:TTransactionNotInProgressException e, + 2:TGenericException g,), TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e), void abortTx(1: TTransaction tx), bool invalidateTx(1: i64 tx), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java index f4437c2..0d7d9bf 100644 --- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java @@ -52,7 +52,6 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ThriftTransactionSystemTest extends TransactionSystemTest { @@ -74,13 +73,12 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest { zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); zkServer.startAndWait(); - Configuration conf = new Configuration(); + Configuration conf = getCommonConfiguration(); conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); // we want to use a retry strategy that lets us query the number of times it retried: conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, CountingRetryStrategyProvider.class.getName()); conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 2); - conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit Injector injector = Guice.createInjector( new ConfigModule(conf), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java index 56a9076..5f4675b 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java @@ -634,6 +634,11 @@ public class TransactionContextTest { } @Override + public boolean canCommitOrThrow(Transaction tx, Collection changeIds) throws TransactionFailureException { + return canCommit(tx, changeIds); + } + + @Override public boolean commit(Transaction tx) throws TransactionNotInProgressException { if (failCommits-- > 0) { return false; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java index b96b779..676774c 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java @@ -548,6 +548,11 @@ public class TransactionExecutorTest { } @Override + public boolean canCommitOrThrow(Transaction tx, Collection changeIds) throws TransactionFailureException { + return canCommit(tx, changeIds); + } + + @Override public boolean commit(Transaction tx) throws TransactionNotInProgressException { if (failCommits-- > 0) { return false; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java index 1fca773..819a981 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java @@ -26,11 +26,11 @@ import org.apache.tephra.metrics.TxMetricsCollector; import org.apache.tephra.persist.InMemoryTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; -import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -39,10 +39,10 @@ import java.util.concurrent.TimeUnit; */ public class TransactionManagerTest extends TransactionSystemTest { - static Configuration conf = new Configuration(); + private static Configuration conf; - TransactionManager txManager = null; - TransactionStateStorage txStateStorage = null; + private static TransactionManager txManager = null; + private static TransactionStateStorage txStateStorage = null; @Override protected TransactionSystemClient getClient() { @@ -54,10 +54,10 @@ public class TransactionManagerTest extends TransactionSystemTest { return txStateStorage; } - @Before - public void before() { + @BeforeClass + public static void beforeClass() { + conf = getCommonConfiguration(); conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread - conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit // todo should create two sets of tests, one with LocalFileTxStateStorage and one with InMemoryTxStateStorage txStateStorage = new InMemoryTransactionStateStorage(); txManager = new TransactionManager @@ -65,104 +65,24 @@ public class TransactionManagerTest extends TransactionSystemTest { txManager.startAndWait(); } - @After - public void after() { + @AfterClass + public static void afterClass() { txManager.stopAndWait(); } - @Test - public void testCheckpointing() throws TransactionNotInProgressException { - // create a few transactions - Transaction tx1 = txManager.startShort(); - Transaction tx2 = txManager.startShort(); - Transaction tx3 = txManager.startShort(); - - // start and commit a few - for (int i = 0; i < 5; i++) { - Transaction tx = txManager.startShort(); - Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); - Assert.assertTrue(txManager.commit(tx)); - } - - // checkpoint the transactions - Transaction tx3c = txManager.checkpoint(tx3); - Transaction tx2c = txManager.checkpoint(tx2); - Transaction tx1c = txManager.checkpoint(tx1); - - // start and commit a few (this moves the read pointer past all checkpoint write versions) - for (int i = 5; i < 10; i++) { - Transaction tx = txManager.startShort(); - Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); - Assert.assertTrue(txManager.commit(tx)); - } - - // start new tx and validate all write pointers are excluded - Transaction tx = txManager.startShort(); - validateSorted(tx.getInProgress()); - validateSorted(tx.getInvalids()); - Assert.assertFalse(tx.isVisible(tx1.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx1c.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); - txManager.abort(tx); - - // abort one of the checkpoints - txManager.abort(tx1c); - - // start new tx and validate all write pointers are excluded - tx = txManager.startShort(); - validateSorted(tx.getInProgress()); - validateSorted(tx.getInvalids()); - Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); - txManager.abort(tx); - - // invalidate one of the checkpoints - txManager.invalidate(tx2c.getTransactionId()); - - // start new tx and validate all write pointers are excluded - tx = txManager.startShort(); - validateSorted(tx.getInProgress()); - validateSorted(tx.getInvalids()); - Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); - txManager.abort(tx); - - // commit the last checkpoint - Assert.assertTrue(txManager.canCommit(tx3, Collections.emptyList())); - Assert.assertTrue(txManager.commit(tx3c)); - - // start new tx and validate all write pointers are excluded - tx = txManager.startShort(); - validateSorted(tx.getInProgress()); - validateSorted(tx.getInvalids()); - Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); - Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); - txManager.abort(tx); - } - - private void validateSorted(long[] array) { - Long lastSeen = null; - for (long value : array) { - Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)), - lastSeen == null || lastSeen < value); - lastSeen = value; - } + @After + public void after() { + txManager.resetState(); } @Test public void testTransactionCleanup() throws Exception { - conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); - conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2); + Configuration config = new Configuration(conf); + config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); + config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2); // using a new tx manager that cleans up TransactionManager txm = new TransactionManager - (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); + (config, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); txm.startAndWait(); try { Assert.assertEquals(0, txm.getInvalidSize()); @@ -250,11 +170,12 @@ public class TransactionManagerTest extends TransactionSystemTest { @Test public void testLongTransactionCleanup() throws Exception { - conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); - conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2); + Configuration config = new Configuration(conf); + config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); + config.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2); // using a new tx manager that cleans up TransactionManager txm = new TransactionManager - (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); + (config, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); txm.startAndWait(); try { Assert.assertEquals(0, txm.getInvalidSize()); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java index 24798ca..5448052 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java @@ -19,17 +19,21 @@ package org.apache.tephra; import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; import org.apache.tephra.persist.TransactionSnapshot; import org.apache.tephra.persist.TransactionStateStorage; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; /** - * + * Base class for testing implementations of {@link TransactionSystemClient}. */ public abstract class TransactionSystemTest { @@ -38,6 +42,20 @@ public abstract class TransactionSystemTest { private static final byte[] C3 = new byte[] { 'c', '3' }; private static final byte[] C4 = new byte[] { 'c', '4' }; + /** + * Sets up the common properties required for the test cases defined here. + * Subclasses can call this and add more properties as needed. + */ + static Configuration getCommonConfiguration() { + Configuration conf = new Configuration(); + conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit + conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT, 50); + conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD, 40); + conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT, 2048); + conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD, 1024); + return conf; + } + protected abstract TransactionSystemClient getClient() throws Exception; protected abstract TransactionStateStorage getStateStorage() throws Exception; @@ -63,6 +81,52 @@ public abstract class TransactionSystemTest { } @Test + public void testLargeChangeSet() throws Exception { + TransactionSystemClient client = getClient(); + // first try with 50 changes (the max allowed) + List fiftyChanges = new ArrayList<>(51); + for (byte i = 0; i < 50; i++) { + fiftyChanges.add(new byte[] { i }); + } + Transaction tx = client.startShort(); + client.canCommitOrThrow(tx, fiftyChanges); + client.commit(tx); + + // now try another transaction with 51 changes + fiftyChanges.add(new byte[] { 50 }); + tx = client.startShort(); + try { + client.canCommitOrThrow(tx, fiftyChanges); + Assert.fail("Expected " + TransactionSizeException.class.getName()); + } catch (TransactionSizeException e) { + // expected + } + client.abort(tx); + + // now try a change set that is just within the size limit + List changes2k = new ArrayList<>(51); + for (byte i = 0; i < 8; i++) { + byte[] change = new byte[256]; + change[0] = i; + changes2k.add(change); + } + tx = client.startShort(); + client.canCommitOrThrow(tx, changes2k); + client.commit(tx); + + // now add another byte to the change set to exceed the limit + changes2k.add(new byte[] { 0 }); + tx = client.startShort(); + try { + client.canCommitOrThrow(tx, changes2k); + Assert.fail("Expected " + TransactionSizeException.class.getName()); + } catch (TransactionSizeException e) { + // expected + } + client.abort(tx); + } + + @Test public void testCommitRaceHandling() throws Exception { TransactionSystemClient client1 = getClient(); TransactionSystemClient client2 = getClient(); @@ -70,9 +134,9 @@ public abstract class TransactionSystemTest { Transaction tx1 = client1.startShort(); Transaction tx2 = client2.startShort(); - Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2))); + Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1, C2))); // second one also can commit even thought there are conflicts with first since first one hasn't committed yet - Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3))); + Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2, C3))); Assert.assertTrue(client1.commit(tx1)); @@ -97,16 +161,16 @@ public abstract class TransactionSystemTest { Transaction tx4 = client4.startShort(); Transaction tx5 = client5.startShort(); - Assert.assertTrue(client1.canCommit(tx1, asList(C1))); + Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1))); Assert.assertTrue(client1.commit(tx1)); - Assert.assertTrue(client2.canCommit(tx2, asList(C2))); + Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2))); Assert.assertTrue(client2.commit(tx2)); // verifying conflicts detection - Assert.assertFalse(client3.canCommit(tx3, asList(C1))); - Assert.assertFalse(client4.canCommit(tx4, asList(C2))); - Assert.assertTrue(client5.canCommit(tx5, asList(C3))); + Assert.assertFalse(client3.canCommitOrThrow(tx3, asList(C1))); + Assert.assertFalse(client4.canCommitOrThrow(tx4, asList(C2))); + Assert.assertTrue(client5.canCommitOrThrow(tx5, asList(C3))); } @Test @@ -114,7 +178,7 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); + Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2))); Assert.assertTrue(client.commit(tx)); // cannot commit twice same tx try { @@ -130,7 +194,7 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); + Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2))); client.abort(tx); // abort of not active tx has no affect client.abort(tx); @@ -141,11 +205,11 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); + Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2))); Assert.assertTrue(client.commit(tx)); // can't re-use same tx again try { - client.canCommit(tx, asList(C3, C4)); + client.canCommitOrThrow(tx, asList(C3, C4)); Assert.fail(); } catch (TransactionNotInProgressException e) { // expected @@ -172,7 +236,7 @@ public abstract class TransactionSystemTest { new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT); try { - Assert.assertFalse(client.canCommit(txOld, asList(C3, C4))); + Assert.assertFalse(client.canCommitOrThrow(txOld, asList(C3, C4))); Assert.fail(); } catch (TransactionNotInProgressException e) { // expected @@ -191,7 +255,7 @@ public abstract class TransactionSystemTest { new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT); try { - Assert.assertFalse(client.canCommit(txNew, asList(C3, C4))); + Assert.assertFalse(client.canCommitOrThrow(txNew, asList(C3, C4))); Assert.fail(); } catch (TransactionNotInProgressException e) { // expected @@ -211,7 +275,7 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); Transaction tx = client.startShort(); - Assert.assertTrue(client.canCommit(tx, asList(C1, C2))); + Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2))); Assert.assertTrue(client.commit(tx)); // abort of not active tx has no affect client.abort(tx); @@ -223,11 +287,11 @@ public abstract class TransactionSystemTest { TransactionSystemClient client = getClient(); // Invalidate an in-progress tx Transaction tx1 = client.startShort(); - client.canCommit(tx1, asList(C1, C2)); + client.canCommitOrThrow(tx1, asList(C1, C2)); Assert.assertTrue(client.invalidate(tx1.getTransactionId())); // Cannot invalidate a committed tx Transaction tx2 = client.startShort(); - client.canCommit(tx2, asList(C3, C4)); + client.canCommitOrThrow(tx2, asList(C3, C4)); client.commit(tx2); Assert.assertFalse(client.invalidate(tx2.getTransactionId())); } @@ -241,9 +305,9 @@ public abstract class TransactionSystemTest { Transaction tx1 = client.startShort(); Transaction tx2 = client.startShort(); - client.canCommit(tx1, asList(C1, C2)); + client.canCommitOrThrow(tx1, asList(C1, C2)); client.commit(tx1); - client.canCommit(tx2, asList(C3, C4)); + client.canCommitOrThrow(tx2, asList(C3, C4)); Transaction txPreReset = client.startShort(); long currentTs = System.currentTimeMillis(); @@ -334,6 +398,93 @@ public abstract class TransactionSystemTest { Assert.assertEquals(3, client.getInvalidSize()); } + @Test + public void testCheckpointing() throws Exception { + TransactionSystemClient client = getClient(); + // create a few transactions + Transaction tx1 = client.startShort(); + Transaction tx2 = client.startShort(); + Transaction tx3 = client.startShort(); + + // start and commit a few + for (int i = 0; i < 5; i++) { + Transaction tx = client.startShort(); + Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); + Assert.assertTrue(client.commit(tx)); + } + + // checkpoint the transactions + Transaction tx3c = client.checkpoint(tx3); + Transaction tx2c = client.checkpoint(tx2); + Transaction tx1c = client.checkpoint(tx1); + + // start and commit a few (this moves the read pointer past all checkpoint write versions) + for (int i = 5; i < 10; i++) { + Transaction tx = client.startShort(); + Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); + Assert.assertTrue(client.commit(tx)); + } + + // start new tx and validate all write pointers are excluded + Transaction tx = client.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx1.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx1c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + client.abort(tx); + + // abort one of the checkpoints + client.abort(tx1c); + + // start new tx and validate all write pointers are excluded + tx = client.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + client.abort(tx); + + // invalidate one of the checkpoints + client.invalidate(tx2c.getTransactionId()); + + // start new tx and validate all write pointers are excluded + tx = client.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + client.abort(tx); + + // commit the last checkpoint + Assert.assertTrue(client.canCommit(tx3, Collections.emptyList())); + Assert.assertTrue(client.commit(tx3c)); + + // start new tx and validate all write pointers are excluded + tx = client.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + client.abort(tx); + } + + private void validateSorted(long[] array) { + Long lastSeen = null; + for (long value : array) { + Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)), + lastSeen == null || lastSeen < value); + lastSeen = value; + } + } + private Collection asList(byte[]... val) { return Arrays.asList(val); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java index 18f81c8..9c565ba 100644 --- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java @@ -20,8 +20,6 @@ package org.apache.tephra.snapshot; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -31,8 +29,8 @@ import com.google.inject.Injector; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionFailureException; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionNotInProgressException; import org.apache.tephra.TxConstants; import org.apache.tephra.persist.TransactionSnapshot; import org.apache.tephra.persist.TransactionStateStorage; @@ -49,7 +47,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -290,7 +287,7 @@ public class SnapshotCodecTest { } @Test - public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException { + public void testSnapshotCodecV4() throws IOException, TransactionFailureException { File testDir = tmpDir.newFolder("testSnapshotCodecV4"); Configuration conf = new Configuration(); conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());