tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject incubator-tephra git commit: (TEPHRA-201) Store checkpoints in in-progress list to avoid having to sort every time a tx is created.
Date Thu, 08 Dec 2016 20:53:12 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master 7b45a6e1a -> e80e89feb


(TEPHRA-201) Store checkpoints in in-progress list to avoid having to sort every time a tx is created.

This closes #23 from GitHub.

Signed-off-by: anew <anew@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/e80e89fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/e80e89fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/e80e89fe

Branch: refs/heads/master
Commit: e80e89febf22c01022d26a592a860c8af1a94eae
Parents: 7b45a6e
Author: anew <anew@apache.org>
Authored: Wed Dec 7 14:19:19 2016 -0800
Committer: anew <anew@apache.org>
Committed: Thu Dec 8 12:52:16 2016 -0800

----------------------------------------------------------------------
 .../org/apache/tephra/TransactionManager.java   | 137 +++++++++++++------
 .../apache/tephra/snapshot/SnapshotCodecV2.java |   5 +-
 .../apache/tephra/snapshot/SnapshotCodecV4.java |   7 +-
 .../apache/tephra/TransactionManagerTest.java   | 129 +++++++++++++++--
 .../AbstractTransactionStateStorageTest.java    |   6 +-
 .../LocalTransactionStateStorageTest.java       |  13 +-
 .../tephra/snapshot/SnapshotCodecTest.java      |  32 +++--
 .../coprocessor/TransactionProcessorTest.java   |   5 +-
 .../coprocessor/TransactionProcessorTest.java   |   5 +-
 .../coprocessor/TransactionProcessorTest.java   |   5 +-
 .../coprocessor/TransactionProcessorTest.java   |   5 +-
 .../coprocessor/TransactionProcessorTest.java   |   5 +-
 .../janitor/InvalidListPruneTest.java           |   7 +-
 13 files changed, 263 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 7faf63a..0b90d7f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -340,35 +340,37 @@ public class TransactionManager extends AbstractService {
 
   private void cleanupTimedOutTransactions() {
     List<TransactionEdit> invalidEdits = null;
-    this.logReadLock.lock();
+    logReadLock.lock();
     try {
       synchronized (this) {
         if (!isRunning()) {
           return;
         }
-
         long currentTime = System.currentTimeMillis();
-        List<Long> timedOut = Lists.newArrayList();
+        Map<Long, InProgressType> timedOut = Maps.newHashMap();
         for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) {
           long expiration = tx.getValue().getExpiration();
           if (expiration >= 0L && currentTime > expiration) {
             // timed out, remember tx id (can't remove while iterating over entries)
-            timedOut.add(tx.getKey());
+            timedOut.put(tx.getKey(), tx.getValue().getType());
             LOG.info("Tx invalid list: added tx {} because of timeout", tx.getKey());
           } else if (expiration < 0) {
             LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not " +
                        "migrated correctly, this transaction will be expired immediately",
                      tx.getKey(), expiration);
-            timedOut.add(tx.getKey());
+            timedOut.put(tx.getKey(), InProgressType.LONG);
           }
         }
         if (!timedOut.isEmpty()) {
           invalidEdits = Lists.newArrayListWithCapacity(timedOut.size());
-          invalid.addAll(timedOut);
-          for (long tx : timedOut) {
-            committingChangeSets.remove(tx);
-            inProgress.remove(tx);
-            invalidEdits.add(TransactionEdit.createInvalid(tx));
+          invalid.addAll(timedOut.keySet());
+          for (Map.Entry<Long, InProgressType> tx : timedOut.entrySet()) {
+            inProgress.remove(tx.getKey());
+            // checkpoints never go into the committing change sets or the edits
+            if (!InProgressType.CHECKPOINT.equals(tx.getValue())) {
+              committingChangeSets.remove(tx.getKey());
+              invalidEdits.add(TransactionEdit.createInvalid(tx.getKey()));
+            }
           }
 
           // todo: find a more efficient way to keep this sorted. Could it just be an array?
@@ -529,13 +531,13 @@ public class TransactionManager extends AbstractService {
         // handle null expiration
         long newExpiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout);
         InProgressTx compatTx =
-          new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, TransactionType.LONG,
+          new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, InProgressType.LONG,
               entry.getValue().getCheckpointWritePointers());
         entry.setValue(compatTx);
       } else if (entry.getValue().getType() == null) {
         InProgressTx compatTx =
           new InProgressTx(entry.getValue().getVisibilityUpperBound(), entry.getValue().getExpiration(),
-                           TransactionType.SHORT, entry.getValue().getCheckpointWritePointers());
+                           InProgressType.SHORT, entry.getValue().getCheckpointWritePointers());
         entry.setValue(compatTx);
       }
     }
@@ -620,7 +622,15 @@ public class TransactionManager extends AbstractService {
               if (type == null) {
                 InProgressTx inProgressTx = inProgress.get(edit.getWritePointer());
                 if (inProgressTx != null) {
-                  type = inProgressTx.getType();
+                  InProgressType inProgressType = inProgressTx.getType();
+                  if (InProgressType.CHECKPOINT.equals(inProgressType)) {
+                    // this should never happen, because checkpoints never go into the log edits;
+                    LOG.debug("Ignoring ABORTED edit for a checkpoint transaction {}", edit.getWritePointer());
+                    break;
+                  }
+                  if (inProgressType != null) {
+                    type = inProgressType.getTransactionType();
+                  }
                 } else {
                   // If transaction is not in-progress, then it has either been already aborted or invalidated.
                   // We cannot determine the transaction's state based on current information, to be safe invalidate it.
@@ -788,6 +798,11 @@ public class TransactionManager extends AbstractService {
 
   private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound,
                                        long expiration, TransactionType type) {
+    addInProgressAndAdvance(writePointer, visibilityUpperBound, expiration, InProgressType.of(type));
+  }
+
+  private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound,
+                                       long expiration, InProgressType type) {
     inProgress.put(writePointer, new InProgressTx(visibilityUpperBound, expiration, type));
     advanceWritePointer(writePointer);
   }
@@ -917,6 +932,13 @@ public class TransactionManager extends AbstractService {
         invalidArray = invalid.toLongArray();
         LOG.info("Tx invalid list: removed committed tx {}", transactionId);
       }
+    } else {
+      LongArrayList checkpointPointers = previous.getCheckpointWritePointers();
+      if (!checkpointPointers.isEmpty()) {
+        // adjust the write pointer to be the last checkpoint of the tx and remove all checkpoints from inProgress
+        writePointer = checkpointPointers.getLong(checkpointPointers.size() - 1);
+        inProgress.keySet().removeAll(previous.getCheckpointWritePointers());
+      }
     }
     // moving read pointer
     moveReadPointerIfNeeded(writePointer);
@@ -958,25 +980,30 @@ public class TransactionManager extends AbstractService {
     // makes tx visible (assumes that all operations were rolled back)
     // remove from in-progress set, so that it does not get excluded in the future
     InProgressTx removed = inProgress.remove(writePointer);
+    boolean removeInProgressCheckpoints = true;
     if (removed == null) {
       // tx was not in progress! perhaps it timed out and is invalid? try to remove it there.
       if (invalid.rem(writePointer)) {
+        // the tx and all its children were invalidated: no need to remove them from inProgress
+        removeInProgressCheckpoints = false;
         // remove any invalidated checkpoint pointers
         // this will only be present if the parent write pointer was also invalidated
         if (checkpointWritePointers != null) {
-          for (int i = 0; i < checkpointWritePointers.length; i++) {
-            invalid.rem(checkpointWritePointers[i]);
+          for (long checkpointWritePointer : checkpointWritePointers) {
+            invalid.rem(checkpointWritePointer);
           }
         }
         invalidArray = invalid.toLongArray();
         LOG.info("Tx invalid list: removed aborted tx {}", writePointer);
-        // removed a tx from excludes: must move read pointer
-        moveReadPointerIfNeeded(writePointer);
       }
-    } else {
-      // removed a tx from excludes: must move read pointer
-      moveReadPointerIfNeeded(writePointer);
     }
+    if (removeInProgressCheckpoints && checkpointWritePointers != null) {
+      for (long checkpointWritePointer : checkpointWritePointers) {
+        inProgress.remove(checkpointWritePointer);
+      }
+    }
+    // removed a tx from excludes: must move read pointer
+    moveReadPointerIfNeeded(writePointer);
   }
 
   public boolean invalidate(long tx) {
@@ -1011,10 +1038,9 @@ public class TransactionManager extends AbstractService {
       } else {
         // invalidate any checkpoint write pointers
         LongArrayList childWritePointers = previous.getCheckpointWritePointers();
-        if (childWritePointers != null) {
-          for (int i = 0; i < childWritePointers.size(); i++) {
-            invalid.add(childWritePointers.get(i));
-          }
+        if (!childWritePointers.isEmpty()) {
+          invalid.addAll(childWritePointers);
+          inProgress.keySet().removeAll(childWritePointers);
         }
       }
       LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer);
@@ -1152,7 +1178,8 @@ public class TransactionManager extends AbstractService {
   private void doCheckpoint(long newWritePointer, long parentWritePointer) {
     InProgressTx existingTx = inProgress.get(parentWritePointer);
     existingTx.addCheckpointWritePointer(newWritePointer);
-    advanceWritePointer(newWritePointer);
+    addInProgressAndAdvance(newWritePointer, existingTx.getVisibilityUpperBound(), existingTx.getExpiration(),
+                            InProgressType.CHECKPOINT);
   }
   
   // hack for exposing important metric
@@ -1223,18 +1250,10 @@ public class TransactionManager extends AbstractService {
     for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) {
       long txId = entry.getKey();
       inProgressIds.add(txId);
-      // add any checkpointed write pointers to the in-progress list
-      LongArrayList childIds = entry.getValue().getCheckpointWritePointers();
-      if (childIds != null) {
-        for (int i = 0; i < childIds.size(); i++) {
-          inProgressIds.add(childIds.get(i));
-        }
-      }
       if (firstShortTx == Transaction.NO_TX_IN_PROGRESS && !entry.getValue().isLongRunning()) {
         firstShortTx = txId;
       }
     }
-
     return new Transaction(readPointer, writePointer, invalidArray, inProgressIds.toLongArray(), firstShortTx, type);
   }
 
@@ -1319,20 +1338,60 @@ public class TransactionManager extends AbstractService {
   }
 
   /**
+   * Type of in-progress transaction.
+   */
+  public enum InProgressType {
+
+    /**
+     * Short transactions detect conflicts during commit.
+     */
+    SHORT(TransactionType.SHORT),
+
+    /**
+     * Long running transactions do not detect conflicts during commit.
+     */
+    LONG(TransactionType.LONG),
+
+    /**
+     * Check-pointed transactions are recorded as in-progress.
+     */
+    CHECKPOINT(null);
+
+    private final TransactionType transactionType;
+
+    InProgressType(TransactionType transactionType) {
+      this.transactionType = transactionType;
+    }
+
+    public static InProgressType of(TransactionType type) {
+      switch (type) {
+        case SHORT: return SHORT;
+        case LONG:  return LONG;
+        default: throw new IllegalArgumentException("Unknown TransactionType " + type);
+      }
+    }
+
+    @Nullable
+    public TransactionType getTransactionType() {
+      return transactionType;
+    }
+  }
+
+  /**
    * Represents some of the info on in-progress tx
    */
   public static final class InProgressTx {
     /** the oldest in progress tx at the time of this tx start */
     private final long visibilityUpperBound;
     private final long expiration;
-    private final TransactionType type;
-    private LongArrayList checkpointWritePointers = new LongArrayList();
+    private final InProgressType type;
+    private final LongArrayList checkpointWritePointers;
 
-    public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type) {
+    public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type) {
       this(visibilityUpperBound, expiration, type, new LongArrayList());
     }
 
-    public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type,
+    public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type,
                         LongArrayList checkpointWritePointers) {
       this.visibilityUpperBound = visibilityUpperBound;
       this.expiration = expiration;
@@ -1355,7 +1414,7 @@ public class TransactionManager extends AbstractService {
     }
 
     @Nullable
-    public TransactionType getType() {
+    public InProgressType getType() {
       return type;
     }
 
@@ -1364,7 +1423,7 @@ public class TransactionManager extends AbstractService {
         // for backwards compatibility when long running txns were represented with -1 expiration
         return expiration == -1;
       }
-      return type == TransactionType.LONG;
+      return type == InProgressType.LONG;
     }
 
     public void addCheckpointWritePointer(long checkpointWritePointer) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
index ccf026d..eb6b70b 100644
--- a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
@@ -21,7 +21,6 @@ package org.apache.tephra.snapshot;
 import com.google.common.collect.Maps;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.persist.TransactionSnapshot;
 
 import java.io.IOException;
@@ -66,9 +65,9 @@ public class SnapshotCodecV2 extends DefaultSnapshotCodec {
         long expiration = decoder.readLong();
         long visibilityUpperBound = decoder.readLong();
         int txTypeIdx = decoder.readInt();
-        TransactionType txType;
+        TransactionManager.InProgressType txType;
         try {
-          txType = TransactionType.values()[txTypeIdx];
+          txType = TransactionManager.InProgressType.values()[txTypeIdx];
         } catch (ArrayIndexOutOfBoundsException e) {
           throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
index cadaa8e..1da358f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
@@ -21,7 +21,6 @@ package org.apache.tephra.snapshot;
 import com.google.common.collect.Maps;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.persist.TransactionSnapshot;
 
 import java.io.IOException;
@@ -52,7 +51,7 @@ public class SnapshotCodecV4 extends SnapshotCodecV2 {
         encoder.writeInt(entry.getValue().getType().ordinal());
         // write checkpoint tx IDs
         LongArrayList checkpointPointers = entry.getValue().getCheckpointWritePointers();
-        if (checkpointPointers != null && !checkpointPointers.isEmpty()) {
+        if (!checkpointPointers.isEmpty()) {
           encoder.writeInt(checkpointPointers.size());
           for (int i = 0; i < checkpointPointers.size(); i++) {
             encoder.writeLong(checkpointPointers.getLong(i));
@@ -76,9 +75,9 @@ public class SnapshotCodecV4 extends SnapshotCodecV2 {
         long expiration = decoder.readLong();
         long visibilityUpperBound = decoder.readLong();
         int txTypeIdx = decoder.readInt();
-        TransactionType txType;
+        TransactionManager.InProgressType txType;
         try {
-          txType = TransactionType.values()[txTypeIdx];
+          txType = TransactionManager.InProgressType.values()[txTypeIdx];
         } catch (ArrayIndexOutOfBoundsException e) {
           throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
index 3269241..1fca773 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java
@@ -30,6 +30,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
@@ -70,6 +71,92 @@ public class TransactionManagerTest extends TransactionSystemTest {
   }
 
   @Test
+  public void testCheckpointing() throws TransactionNotInProgressException {
+    // create a few transactions
+    Transaction tx1 = txManager.startShort();
+    Transaction tx2 = txManager.startShort();
+    Transaction tx3 = txManager.startShort();
+
+    // start and commit a few
+    for (int i = 0; i < 5; i++) {
+      Transaction tx = txManager.startShort();
+      Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
+      Assert.assertTrue(txManager.commit(tx));
+    }
+
+    // checkpoint the transactions
+    Transaction tx3c = txManager.checkpoint(tx3);
+    Transaction tx2c = txManager.checkpoint(tx2);
+    Transaction tx1c = txManager.checkpoint(tx1);
+
+    // start and commit a few (this moves the read pointer past all checkpoint write versions)
+    for (int i = 5; i < 10; i++) {
+      Transaction tx = txManager.startShort();
+      Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte) i })));
+      Assert.assertTrue(txManager.commit(tx));
+    }
+
+    // start new tx and validate all write pointers are excluded
+    Transaction tx = txManager.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx1.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx1c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    txManager.abort(tx);
+
+    // abort one of the checkpoints
+    txManager.abort(tx1c);
+
+    // start new tx and validate all write pointers are excluded
+    tx = txManager.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    txManager.abort(tx);
+
+    // invalidate one of the checkpoints
+    txManager.invalidate(tx2c.getTransactionId());
+
+    // start new tx and validate all write pointers are excluded
+    tx = txManager.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx3c.getWritePointer()));
+    txManager.abort(tx);
+
+    // commit the last checkpoint
+    Assert.assertTrue(txManager.canCommit(tx3, Collections.<byte[]>emptyList()));
+    Assert.assertTrue(txManager.commit(tx3c));
+
+    // start new tx and validate all write pointers are excluded
+    tx = txManager.startShort();
+    validateSorted(tx.getInProgress());
+    validateSorted(tx.getInvalids());
+    Assert.assertFalse(tx.isVisible(tx2.getWritePointer()));
+    Assert.assertFalse(tx.isVisible(tx2c.getWritePointer()));
+    txManager.abort(tx);
+  }
+
+  private void validateSorted(long[] array) {
+    Long lastSeen = null;
+    for (long value : array) {
+      Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)),
+                        lastSeen == null || lastSeen < value);
+      lastSeen = value;
+    }
+  }
+
+  @Test
   public void testTransactionCleanup() throws Exception {
     conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3);
     conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2);
@@ -80,11 +167,14 @@ public class TransactionManagerTest extends TransactionSystemTest {
     try {
       Assert.assertEquals(0, txm.getInvalidSize());
       Assert.assertEquals(0, txm.getCommittedSize());
-      // start a transaction and leave it open
+      // start two transactions and leave them open
       Transaction tx1 = txm.startShort();
-      // start a long running transaction and leave it open
-      Transaction tx2 = txm.startLong();
-      Transaction tx3 = txm.startLong();
+      Transaction tx2 = txm.startShort();
+      // start two long running transactions and leave them open
+      Transaction ltx1 = txm.startLong();
+      Transaction ltx2 = txm.startLong();
+      // checkpoint one of the short transactions
+      Transaction tx2c = txm.checkpoint(tx2);
       // start and commit a bunch of transactions
       for (int i = 0; i < 10; i++) {
         Transaction tx = txm.startShort();
@@ -94,16 +184,25 @@ public class TransactionManagerTest extends TransactionSystemTest {
       // all of these should still be in the committed set
       Assert.assertEquals(0, txm.getInvalidSize());
       Assert.assertEquals(10, txm.getCommittedSize());
+
       // sleep longer than the cleanup interval
       TimeUnit.SECONDS.sleep(5);
       // transaction should now be invalid
-      Assert.assertEquals(1, txm.getInvalidSize());
+      //Assert.assertEquals(3, txm.getInvalidSize());
       // run another transaction
       Transaction txx = txm.startShort();
       // verify the exclude
-      Assert.assertFalse(txx.isVisible(tx1.getTransactionId()));
-      Assert.assertFalse(txx.isVisible(tx2.getTransactionId()));
-      Assert.assertFalse(txx.isVisible(tx3.getTransactionId()));
+      Assert.assertFalse(txx.isVisible(tx1.getWritePointer()));
+      Assert.assertFalse(txx.isVisible(tx2.getWritePointer()));
+      Assert.assertFalse(txx.isVisible(tx2c.getWritePointer()));
+      Assert.assertFalse(txx.isVisible(ltx1.getWritePointer()));
+      Assert.assertFalse(txx.isVisible(ltx2.getWritePointer()));
+      // verify all of  the short write pointers are in the invalid list
+      Assert.assertEquals(3, txx.getInvalids().length);
+      Assert.assertArrayEquals(new long[] {
+                                 tx1.getWritePointer(),
+                                 tx2.getWritePointer(),
+                                 tx2c.getWritePointer()}, txx.getInvalids());
       // try to commit the last transaction that was started
       Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a })));
       Assert.assertTrue(txm.commit(txx));
@@ -117,9 +216,15 @@ public class TransactionManagerTest extends TransactionSystemTest {
       } catch (TransactionNotInProgressException e) {
         // expected
       }
+
+      // abort should remove tx1 from invalid, but tx2 and tx2c are still there
       txm.abort(tx1);
-      // abort should have removed from invalid
+      Assert.assertEquals(2, txm.getInvalidSize());
+
+      // aborting tx2c should remove both tx2 and tx2c from invalids
+      txm.abort(tx2c);
       Assert.assertEquals(0, txm.getInvalidSize());
+
       // run another bunch of transactions
       for (int i = 0; i < 10; i++) {
         Transaction tx = txm.startShort();
@@ -130,13 +235,13 @@ public class TransactionManagerTest extends TransactionSystemTest {
       Assert.assertEquals(0, txm.getInvalidSize());
       Assert.assertEquals(0, txm.getCommittedSize());
       // commit tx2, abort tx3
-      Assert.assertTrue(txm.commit(tx2));
-      txm.abort(tx3);
+      Assert.assertTrue(txm.commit(ltx1));
+      txm.abort(ltx2);
       // none of these should still be in the committed set (tx2 is long-running).
       // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes
       // so it should NOT be in invalid list
       Assert.assertEquals(1, txm.getInvalidSize());
-      Assert.assertEquals(tx3.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next());
+      Assert.assertEquals(ltx2.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next());
       Assert.assertEquals(1, txm.getExcludedListSize());
     } finally {
       txm.stopAndWait();

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
index 628bced..21090c5 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
@@ -512,11 +512,11 @@ public abstract class AbstractTransactionStateStorageTest {
       if (i % 20 == 0) {
         inProgress.put(startPointer + i,
                        new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1),
-                                                           TransactionType.LONG));
+                                                           TransactionManager.InProgressType.LONG));
       } else {
         inProgress.put(startPointer + i,
-                       new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L, 
-                                                           TransactionType.SHORT));
+                       new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L,
+                                                           TransactionManager.InProgressType.SHORT));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
index 9535102..38b1283 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionManager.InProgressType;
 import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.metrics.TxMetricsCollector;
@@ -138,10 +139,10 @@ public class LocalTransactionStateStorageTest extends AbstractTransactionStateSt
         // There should be four in-progress transactions, and no invalid transactions
         TransactionSnapshot snapshot1 = txm.getCurrentState();
         Assert.assertEquals(ImmutableSortedSet.of(wp1, wp2, wp3, wp4), snapshot1.getInProgress().keySet());
-        verifyInProgress(snapshot1.getInProgress().get(wp1), TransactionType.LONG, time1 + longTimeout);
-        verifyInProgress(snapshot1.getInProgress().get(wp2), TransactionType.SHORT, time2 + 1000);
-        verifyInProgress(snapshot1.getInProgress().get(wp3), TransactionType.LONG, time3 + longTimeout);
-        verifyInProgress(snapshot1.getInProgress().get(wp4), TransactionType.SHORT, time4 + 1000);
+        verifyInProgress(snapshot1.getInProgress().get(wp1), InProgressType.LONG, time1 + longTimeout);
+        verifyInProgress(snapshot1.getInProgress().get(wp2), InProgressType.SHORT, time2 + 1000);
+        verifyInProgress(snapshot1.getInProgress().get(wp3), InProgressType.LONG, time3 + longTimeout);
+        verifyInProgress(snapshot1.getInProgress().get(wp4), InProgressType.SHORT, time4 + 1000);
         Assert.assertEquals(0, snapshot1.getInvalid().size());
       } finally {
         txm.stopAndWait();
@@ -212,8 +213,8 @@ public class LocalTransactionStateStorageTest extends AbstractTransactionStateSt
     }
   }
 
-  private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, TransactionType type,
-                                long expiration) throws Exception {
+  private void verifyInProgress(TransactionManager.InProgressTx inProgressTx,
+                                InProgressType type, long expiration) throws Exception {
     Assert.assertEquals(type, inProgressTx.getType());
     Assert.assertTrue(inProgressTx.getExpiration() == expiration);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index afdff5c..f67c58b 100644
--- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -30,7 +30,6 @@ import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionNotInProgressException;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.persist.TransactionSnapshot;
 import org.apache.tephra.persist.TransactionStateStorage;
@@ -66,7 +65,7 @@ public class SnapshotCodecTest {
   public static TemporaryFolder tmpDir = new TemporaryFolder();
 
   @Test
-  public void testMinimalDeserilization() throws Exception {
+  public void testMinimalDeserialization() throws Exception {
     long now = System.currentTimeMillis();
     long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
     /*
@@ -82,8 +81,8 @@ public class SnapshotCodecTest {
       tLong, new TransactionManager.InProgressTx(readPtr,
                                                  TransactionManager.getTxExpirationFromWritePointer(
                                                    tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
-                                                 TransactionType.LONG),
-      tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
+                                                 TransactionManager.InProgressType.LONG),
+      tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionManager.InProgressType.SHORT)));
 
     TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
                                                            Lists.newArrayList(tInvalid), // invalid
@@ -144,10 +143,11 @@ public class SnapshotCodecTest {
     long tShort = nowWritePointer - 1;      // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
 
     TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
-        tLong, new TransactionManager.InProgressTx(readPtr,
-            TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
-            TransactionType.LONG),
-        tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));
+        tLong, new TransactionManager.InProgressTx(
+          readPtr,
+          TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
+          TransactionManager.InProgressType.LONG),
+        tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionManager.InProgressType.SHORT)));
 
     TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
         Lists.newArrayList(tInvalid), // invalid
@@ -240,7 +240,7 @@ public class SnapshotCodecTest {
     assertEquals(1, snapshot2.getInProgress().size());
     Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx =
         snapshot2.getInProgress().entrySet().iterator().next();
-    assertEquals(TransactionType.LONG, inProgressTx.getValue().getType());
+    assertEquals(TransactionManager.InProgressType.LONG, inProgressTx.getValue().getType());
 
     // save a new snapshot
     txManager2.stopAndWait();
@@ -315,13 +315,18 @@ public class SnapshotCodecTest {
     assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
 
     Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress();
-    Assert.assertEquals(1, inProgress.size());
+    Assert.assertEquals(2, inProgress.size());
 
     TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId());
     Assert.assertNotNull(inProgressTx);
     Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
                              inProgressTx.getCheckpointWritePointers().toLongArray());
 
+    inProgressTx = inProgress.get(checkpointTx.getWritePointer());
+    Assert.assertNotNull(inProgressTx);
+    Assert.assertEquals(TransactionManager.InProgressType.CHECKPOINT, inProgressTx.getType());
+    Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty());
+
     txStorage.stopAndWait();
 
     // start a new Tx manager to see if the transaction is restored correctly.
@@ -335,13 +340,18 @@ public class SnapshotCodecTest {
     // state should be recovered
     snapshot = txManager.getCurrentState();
     inProgress = snapshot.getInProgress();
-    Assert.assertEquals(1, inProgress.size());
+    Assert.assertEquals(2, inProgress.size());
 
     inProgressTx = inProgress.get(transaction.getTransactionId());
     Assert.assertNotNull(inProgressTx);
     Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
                              inProgressTx.getCheckpointWritePointers().toLongArray());
 
+    inProgressTx = inProgress.get(checkpointTx.getWritePointer());
+    Assert.assertNotNull(inProgressTx);
+    Assert.assertEquals(TransactionManager.InProgressType.CHECKPOINT, inProgressTx.getType());
+    Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty());
+
     // Should be able to commit the transaction
     Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList()));
     Assert.assertTrue(txManager.commit(checkpointTx));

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index ae71e05..b25ae37 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
@@ -146,8 +145,8 @@ public class TransactionProcessorTest {
       TransactionSnapshot.copyFrom(
         System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
         // this will set visibility upper bound to V[6]
-        Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE,
-                                                                                        TransactionType.SHORT))),
+        Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
+          V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
         new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index a1091ea..e612e2a 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
@@ -151,8 +150,8 @@ public class TransactionProcessorTest {
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
       System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
       // this will set visibility upper bound to V[6]
-      Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE,
-                                                                                      TransactionType.SHORT))),
+      Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
+        V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
       new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index d7c2af0..b92bb09 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
@@ -131,8 +130,8 @@ public class TransactionProcessorTest {
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
       System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
       // this will set visibility upper bound to V[6]
-      Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE,
-                                                                                      TransactionType.SHORT))),
+      Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
+        V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
       new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 7cbc010..4b236fc 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
@@ -131,8 +130,8 @@ public class TransactionProcessorTest {
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
       System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
       // this will set visibility upper bound to V[6]
-      Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE,
-                                                                                      TransactionType.SHORT))),
+      Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
+        V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
       new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 4a694eb..d21c987 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
@@ -131,8 +130,8 @@ public class TransactionProcessorTest {
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
         System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
         // this will set visibility upper bound to V[6]
-        Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE,
-                                                                                        TransactionType.SHORT))),
+        Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
+          V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
         new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
     txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
                                                 txSnapshot.getWritePointer(), txSnapshot.getInvalid(),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java
index 955d2de..b91ee17 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.hbase.AbstractHBaseTableTest;
@@ -173,10 +172,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Create a new transaction snapshot
     InMemoryTransactionStateCache.setTransactionSnapshot(
       new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
-                              ImmutableSortedMap.of(
-                                105L, new TransactionManager.InProgressTx(100, 30, TransactionType.SHORT)
-                              )
-    ));
+                              ImmutableSortedMap.of(105L, new TransactionManager.InProgressTx(
+                                100, 30, TransactionManager.InProgressType.SHORT))));
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 


Mime
View raw message