Repository: incubator-tephra
Updated Branches:
refs/heads/master 8532076f8 -> ae6ce2b5e
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
index 9719bcc..9e57de8 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -22,8 +22,10 @@ import com.google.inject.Inject;
import org.apache.tephra.InvalidTruncateTimeException;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSizeException;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.slf4j.Logger;
@@ -67,6 +69,15 @@ public class InMemoryTxSystemClient implements TransactionSystemClient
{
@Override
public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException
{
+ try {
+ return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
+ } catch (TransactionSizeException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws
TransactionFailureException {
return changeIds.isEmpty() || txManager.canCommit(tx, changeIds);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
index b54e57f..de46f27 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -61,6 +61,11 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
}
@Override
+ public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) {
+ return true;
+ }
+
+ @Override
public boolean commit(Transaction tx) {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index 0e05244..729e035 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -76,6 +76,8 @@ service TTransactionServer {
TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws
(1:TGenericException e),
TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e),
TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException
e),
+ TBoolean canCommitOrThrow(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException
e,
+ 2:TGenericException
g,),
TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e),
void abortTx(1: TTransaction tx),
bool invalidateTx(1: i64 tx),
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
index f4437c2..0d7d9bf 100644
--- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -52,7 +52,6 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ThriftTransactionSystemTest extends TransactionSystemTest {
@@ -74,13 +73,12 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest
{
zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
zkServer.startAndWait();
- Configuration conf = new Configuration();
+ Configuration conf = getCommonConfiguration();
conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
// we want to use a retry strategy that lets us query the number of times it retried:
conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, CountingRetryStrategyProvider.class.getName());
conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 2);
- conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5));
// very long limit
Injector injector = Guice.createInjector(
new ConfigModule(conf),
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index 56a9076..5f4675b 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -634,6 +634,11 @@ public class TransactionContextTest {
}
@Override
+ public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws
TransactionFailureException {
+ return canCommit(tx, changeIds);
+ }
+
+ @Override
public boolean commit(Transaction tx) throws TransactionNotInProgressException {
if (failCommits-- > 0) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
index b96b779..676774c 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
@@ -548,6 +548,11 @@ public class TransactionExecutorTest {
}
@Override
+ public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws
TransactionFailureException {
+ return canCommit(tx, changeIds);
+ }
+
+ @Override
public boolean commit(Transaction tx) throws TransactionNotInProgressException {
if (failCommits-- > 0) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
index 1fca773..819a981 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
@@ -26,11 +26,11 @@ import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.InMemoryTransactionStateStorage;
import org.apache.tephra.persist.TransactionStateStorage;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@@ -39,10 +39,10 @@ import java.util.concurrent.TimeUnit;
*/
public class TransactionManagerTest extends TransactionSystemTest {
- static Configuration conf = new Configuration();
+ private static Configuration conf;
- TransactionManager txManager = null;
- TransactionStateStorage txStateStorage = null;
+ private static TransactionManager txManager = null;
+ private static TransactionStateStorage txStateStorage = null;
@Override
protected TransactionSystemClient getClient() {
@@ -54,10 +54,10 @@ public class TransactionManagerTest extends TransactionSystemTest {
return txStateStorage;
}
- @Before
- public void before() {
+ @BeforeClass
+ public static void beforeClass() {
+ conf = getCommonConfiguration();
conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread
- conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5));
// very long limit
// todo should create two sets of tests, one with LocalFileTxStateStorage and one with
InMemoryTxStateStorage
txStateStorage = new InMemoryTransactionStateStorage();
txManager = new TransactionManager
@@ -65,104 +65,24 @@ public class TransactionManagerTest extends TransactionSystemTest {
txManager.startAndWait();
}
- @After
- public void after() {
+ @AfterClass
+ public static void afterClass() {
txManager.stopAndWait();
}
- @Test
- public void testCheckpointing() throws TransactionNotInProgressException {
- // create a few transactions
- Transaction tx1 = txManager.startShort();
- Transaction tx2 = txManager.startShort();
- Transaction tx3 = txManager.startShort();
-
- // start and commit a few
- for (int i = 0; i < 5; i++) {
- Transaction tx = txManager.startShort();
- Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte)
i })));
- Assert.assertTrue(txManager.commit(tx));
- }
-
- // checkpoint the transactions
- Transaction tx3c = txManager.checkpoint(tx3);
- Transaction tx2c = txManager.checkpoint(tx2);
- Transaction tx1c = txManager.checkpoint(tx1);
-
- // start and commit a few (this moves the read pointer past all checkpoint write versions)
- for (int i = 5; i < 10; i++) {
- Transaction tx = txManager.startShort();
- Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte)
i })));
- Assert.assertTrue(txManager.commit(tx));
- }
-
- // start new tx and validate all write pointers are excluded
- Transaction tx = txManager.startShort();
- validateSorted(tx.getInProgress());
- validateSorted(tx.getInvalids());
- Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
- txManager.abort(tx);
-
- // abort one of the checkpoints
- txManager.abort(tx1c);
-
- // start new tx and validate all write pointers are excluded
- tx = txManager.startShort();
- validateSorted(tx.getInProgress());
- validateSorted(tx.getInvalids());
- Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
- txManager.abort(tx);
-
- // invalidate one of the checkpoints
- txManager.invalidate(tx2c.getTransactionId());
-
- // start new tx and validate all write pointers are excluded
- tx = txManager.startShort();
- validateSorted(tx.getInProgress());
- validateSorted(tx.getInvalids());
- Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
- txManager.abort(tx);
-
- // commit the last checkpoint
- Assert.assertTrue(txManager.canCommit(tx3, Collections.<byte[]>emptyList()));
- Assert.assertTrue(txManager.commit(tx3c));
-
- // start new tx and validate all write pointers are excluded
- tx = txManager.startShort();
- validateSorted(tx.getInProgress());
- validateSorted(tx.getInvalids());
- Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
- Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
- txManager.abort(tx);
- }
-
- private void validateSorted(long[] array) {
- Long lastSeen = null;
- for (long value : array) {
- Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
- lastSeen == null || lastSeen < value);
- lastSeen = value;
- }
+ @After
+ public void after() {
+ txManager.resetState();
}
@Test
public void testTransactionCleanup() throws Exception {
- conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
- conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
+ Configuration config = new Configuration(conf);
+ config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+ config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
// using a new tx manager that cleans up
TransactionManager txm = new TransactionManager
- (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+ (config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
txm.startAndWait();
try {
Assert.assertEquals(0, txm.getInvalidSize());
@@ -250,11 +170,12 @@ public class TransactionManagerTest extends TransactionSystemTest {
@Test
public void testLongTransactionCleanup() throws Exception {
- conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
- conf.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
+ Configuration config = new Configuration(conf);
+ config.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
+ config.setInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, 2);
// using a new tx manager that cleans up
TransactionManager txm = new TransactionManager
- (conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+ (config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
txm.startAndWait();
try {
Assert.assertEquals(0, txm.getInvalidSize());
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
index 24798ca..5448052 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionSystemTest.java
@@ -19,17 +19,21 @@
package org.apache.tephra;
import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.persist.TransactionSnapshot;
import org.apache.tephra.persist.TransactionStateStorage;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
- *
+ * Base class for testing implementations of {@link TransactionSystemClient}.
*/
public abstract class TransactionSystemTest {
@@ -38,6 +42,20 @@ public abstract class TransactionSystemTest {
private static final byte[] C3 = new byte[] { 'c', '3' };
private static final byte[] C4 = new byte[] { 'c', '4' };
+ /**
+ * Sets up the common properties required for the test cases defined here.
+ * Subclasses can call this and add more properties as needed.
+ */
+ static Configuration getCommonConfiguration() {
+ Configuration conf = new Configuration();
+ conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5));
// very long limit
+ conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT, 50);
+ conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD, 40);
+ conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT, 2048);
+ conf.setInt(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD, 1024);
+ return conf;
+ }
+
protected abstract TransactionSystemClient getClient() throws Exception;
protected abstract TransactionStateStorage getStateStorage() throws Exception;
@@ -63,6 +81,52 @@ public abstract class TransactionSystemTest {
}
@Test
+ public void testLargeChangeSet() throws Exception {
+ TransactionSystemClient client = getClient();
+ // first try with 50 changes (the max allowed)
+ List<byte[]> fiftyChanges = new ArrayList<>(51);
+ for (byte i = 0; i < 50; i++) {
+ fiftyChanges.add(new byte[] { i });
+ }
+ Transaction tx = client.startShort();
+ client.canCommitOrThrow(tx, fiftyChanges);
+ client.commit(tx);
+
+ // now try another transaction with 51 changes
+ fiftyChanges.add(new byte[] { 50 });
+ tx = client.startShort();
+ try {
+ client.canCommitOrThrow(tx, fiftyChanges);
+ Assert.fail("Expected " + TransactionSizeException.class.getName());
+ } catch (TransactionSizeException e) {
+ // expected
+ }
+ client.abort(tx);
+
+ // now try a change set that is just within the size limit
+ List<byte[]> changes2k = new ArrayList<>(51);
+ for (byte i = 0; i < 8; i++) {
+ byte[] change = new byte[256];
+ change[0] = i;
+ changes2k.add(change);
+ }
+ tx = client.startShort();
+ client.canCommitOrThrow(tx, changes2k);
+ client.commit(tx);
+
+ // now add another byte to the change set to exceed the limit
+ changes2k.add(new byte[] { 0 });
+ tx = client.startShort();
+ try {
+ client.canCommitOrThrow(tx, changes2k);
+ Assert.fail("Expected " + TransactionSizeException.class.getName());
+ } catch (TransactionSizeException e) {
+ // expected
+ }
+ client.abort(tx);
+ }
+
+ @Test
public void testCommitRaceHandling() throws Exception {
TransactionSystemClient client1 = getClient();
TransactionSystemClient client2 = getClient();
@@ -70,9 +134,9 @@ public abstract class TransactionSystemTest {
Transaction tx1 = client1.startShort();
Transaction tx2 = client2.startShort();
- Assert.assertTrue(client1.canCommit(tx1, asList(C1, C2)));
+ Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1, C2)));
// second one also can commit even thought there are conflicts with first since first
one hasn't committed yet
- Assert.assertTrue(client2.canCommit(tx2, asList(C2, C3)));
+ Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2, C3)));
Assert.assertTrue(client1.commit(tx1));
@@ -97,16 +161,16 @@ public abstract class TransactionSystemTest {
Transaction tx4 = client4.startShort();
Transaction tx5 = client5.startShort();
- Assert.assertTrue(client1.canCommit(tx1, asList(C1)));
+ Assert.assertTrue(client1.canCommitOrThrow(tx1, asList(C1)));
Assert.assertTrue(client1.commit(tx1));
- Assert.assertTrue(client2.canCommit(tx2, asList(C2)));
+ Assert.assertTrue(client2.canCommitOrThrow(tx2, asList(C2)));
Assert.assertTrue(client2.commit(tx2));
// verifying conflicts detection
- Assert.assertFalse(client3.canCommit(tx3, asList(C1)));
- Assert.assertFalse(client4.canCommit(tx4, asList(C2)));
- Assert.assertTrue(client5.canCommit(tx5, asList(C3)));
+ Assert.assertFalse(client3.canCommitOrThrow(tx3, asList(C1)));
+ Assert.assertFalse(client4.canCommitOrThrow(tx4, asList(C2)));
+ Assert.assertTrue(client5.canCommitOrThrow(tx5, asList(C3)));
}
@Test
@@ -114,7 +178,7 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+ Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
Assert.assertTrue(client.commit(tx));
// cannot commit twice same tx
try {
@@ -130,7 +194,7 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+ Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
client.abort(tx);
// abort of not active tx has no affect
client.abort(tx);
@@ -141,11 +205,11 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+ Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
Assert.assertTrue(client.commit(tx));
// can't re-use same tx again
try {
- client.canCommit(tx, asList(C3, C4));
+ client.canCommitOrThrow(tx, asList(C3, C4));
Assert.fail();
} catch (TransactionNotInProgressException e) {
// expected
@@ -172,7 +236,7 @@ public abstract class TransactionSystemTest {
new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
TransactionType.SHORT);
try {
- Assert.assertFalse(client.canCommit(txOld, asList(C3, C4)));
+ Assert.assertFalse(client.canCommitOrThrow(txOld, asList(C3, C4)));
Assert.fail();
} catch (TransactionNotInProgressException e) {
// expected
@@ -191,7 +255,7 @@ public abstract class TransactionSystemTest {
new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS,
TransactionType.SHORT);
try {
- Assert.assertFalse(client.canCommit(txNew, asList(C3, C4)));
+ Assert.assertFalse(client.canCommitOrThrow(txNew, asList(C3, C4)));
Assert.fail();
} catch (TransactionNotInProgressException e) {
// expected
@@ -211,7 +275,7 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
Transaction tx = client.startShort();
- Assert.assertTrue(client.canCommit(tx, asList(C1, C2)));
+ Assert.assertTrue(client.canCommitOrThrow(tx, asList(C1, C2)));
Assert.assertTrue(client.commit(tx));
// abort of not active tx has no affect
client.abort(tx);
@@ -223,11 +287,11 @@ public abstract class TransactionSystemTest {
TransactionSystemClient client = getClient();
// Invalidate an in-progress tx
Transaction tx1 = client.startShort();
- client.canCommit(tx1, asList(C1, C2));
+ client.canCommitOrThrow(tx1, asList(C1, C2));
Assert.assertTrue(client.invalidate(tx1.getTransactionId()));
// Cannot invalidate a committed tx
Transaction tx2 = client.startShort();
- client.canCommit(tx2, asList(C3, C4));
+ client.canCommitOrThrow(tx2, asList(C3, C4));
client.commit(tx2);
Assert.assertFalse(client.invalidate(tx2.getTransactionId()));
}
@@ -241,9 +305,9 @@ public abstract class TransactionSystemTest {
Transaction tx1 = client.startShort();
Transaction tx2 = client.startShort();
- client.canCommit(tx1, asList(C1, C2));
+ client.canCommitOrThrow(tx1, asList(C1, C2));
client.commit(tx1);
- client.canCommit(tx2, asList(C3, C4));
+ client.canCommitOrThrow(tx2, asList(C3, C4));
Transaction txPreReset = client.startShort();
long currentTs = System.currentTimeMillis();
@@ -334,6 +398,93 @@ public abstract class TransactionSystemTest {
Assert.assertEquals(3, client.getInvalidSize());
}
+ @Test
+ public void testCheckpointing() throws Exception {
+ TransactionSystemClient client = getClient();
+ // create a few transactions
+ Transaction tx1 = client.startShort();
+ Transaction tx2 = client.startShort();
+ Transaction tx3 = client.startShort();
+
+ // start and commit a few
+ for (int i = 0; i < 5; i++) {
+ Transaction tx = client.startShort();
+ Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i
})));
+ Assert.assertTrue(client.commit(tx));
+ }
+
+ // checkpoint the transactions
+ Transaction tx3c = client.checkpoint(tx3);
+ Transaction tx2c = client.checkpoint(tx2);
+ Transaction tx1c = client.checkpoint(tx1);
+
+ // start and commit a few (this moves the read pointer past all checkpoint write versions)
+ for (int i = 5; i < 10; i++) {
+ Transaction tx = client.startShort();
+ Assert.assertTrue(client.canCommit(tx, Collections.singleton(new byte[] { (byte) i
})));
+ Assert.assertTrue(client.commit(tx));
+ }
+
+ // start new tx and validate all write pointers are excluded
+ Transaction tx = client.startShort();
+ validateSorted(tx.getInProgress());
+ validateSorted(tx.getInvalids());
+ Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+ client.abort(tx);
+
+ // abort one of the checkpoints
+ client.abort(tx1c);
+
+ // start new tx and validate all write pointers are excluded
+ tx = client.startShort();
+ validateSorted(tx.getInProgress());
+ validateSorted(tx.getInvalids());
+ Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+ client.abort(tx);
+
+ // invalidate one of the checkpoints
+ client.invalidate(tx2c.getTransactionId());
+
+ // start new tx and validate all write pointers are excluded
+ tx = client.startShort();
+ validateSorted(tx.getInProgress());
+ validateSorted(tx.getInvalids());
+ Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+ client.abort(tx);
+
+ // commit the last checkpoint
+ Assert.assertTrue(client.canCommit(tx3, Collections.<byte[]>emptyList()));
+ Assert.assertTrue(client.commit(tx3c));
+
+ // start new tx and validate all write pointers are excluded
+ tx = client.startShort();
+ validateSorted(tx.getInProgress());
+ validateSorted(tx.getInvalids());
+ Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+ Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+ client.abort(tx);
+ }
+
+ private void validateSorted(long[] array) {
+ Long lastSeen = null;
+ for (long value : array) {
+ Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
+ lastSeen == null || lastSeen < value);
+ lastSeen = value;
+ }
+ }
+
private Collection<byte[]> asList(byte[]... val) {
return Arrays.asList(val);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae6ce2b5/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index 18f81c8..9c565ba 100644
--- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -20,8 +20,6 @@ package org.apache.tephra.snapshot;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@@ -31,8 +29,8 @@ import com.google.inject.Injector;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.ChangeId;
import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionNotInProgressException;
import org.apache.tephra.TxConstants;
import org.apache.tephra.persist.TransactionSnapshot;
import org.apache.tephra.persist.TransactionStateStorage;
@@ -49,7 +47,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -290,7 +287,7 @@ public class SnapshotCodecTest {
}
@Test
- public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException
{
+ public void testSnapshotCodecV4() throws IOException, TransactionFailureException {
File testDir = tmpDir.newFolder("testSnapshotCodecV4");
Configuration conf = new Configuration();
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
|