tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From poo...@apache.org
Subject incubator-tephra git commit: TEPHRA-223 Encapsulate the two data structures used for invalid transactions to avoid update issues
Date Wed, 22 Feb 2017 01:52:17 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/release/0.11.0-incubating 10e36e632 -> 872fb1090


TEPHRA-223 Encapsulate the two data structures used for invalid transactions to avoid update
issues

This closes #37

Signed-off-by: poorna <poorna@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/872fb109
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/872fb109
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/872fb109

Branch: refs/heads/release/0.11.0-incubating
Commit: 872fb1090efadb3d62a75fa8f51b308511eea754
Parents: 10e36e6
Author: poorna <poorna@cask.co>
Authored: Mon Feb 20 17:13:39 2017 -0800
Committer: poorna <poorna@apache.org>
Committed: Tue Feb 21 17:46:33 2017 -0800

----------------------------------------------------------------------
 .../org/apache/tephra/TransactionManager.java   | 104 +++++++--------
 .../apache/tephra/manager/InvalidTxList.java    | 126 +++++++++++++++++++
 .../tephra/manager/InvalidTxListTest.java       | 110 ++++++++++++++++
 .../AbstractTransactionStateStorageTest.java    |  10 ++
 4 files changed, 294 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 0b90d7f..f2060cd 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -29,7 +29,11 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractService;
 import com.google.inject.Inject;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongArraySet;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongSet;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.DefaultMetricsCollector;
 import org.apache.tephra.metrics.MetricsCollector;
 import org.apache.tephra.persist.NoOpTransactionStateStorage;
@@ -46,7 +50,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -115,15 +118,11 @@ public class TransactionManager extends AbstractService {
   //poll every 10 second to emit metrics
   private static final long METRICS_POLL_INTERVAL = 10000L;
 
-  private static final long[] NO_INVALID_TX = { };
-
   // Transactions that are in progress, with their info.
   private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long,
InProgressTx>();
 
   // the list of transactions that are invalid (not properly committed/aborted, or timed
out)
-  // TODO: explain usage of two arrays
-  private final LongArrayList invalid = new LongArrayList();
-  private long[] invalidArray = NO_INVALID_TX;
+  private final InvalidTxList invalidTxList = new InvalidTxList();
 
   // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil)
   // todo: should this be consolidated with inProgress?
@@ -198,8 +197,7 @@ public class TransactionManager extends AbstractService {
   }
 
   private void clear() {
-    invalid.clear();
-    invalidArray = NO_INVALID_TX;
+    invalidTxList.clear();
     inProgress.clear();
     committedChangeSets.clear();
     committingChangeSets.clear();
@@ -318,7 +316,7 @@ public class TransactionManager extends AbstractService {
         txMetricsCollector.gauge("committing.size", committingChangeSets.size());
         txMetricsCollector.gauge("committed.size", committedChangeSets.size());
         txMetricsCollector.gauge("inprogress.size", inProgress.size());
-        txMetricsCollector.gauge("invalid.size", invalidArray.length);
+        txMetricsCollector.gauge("invalid.size", getInvalidSize());
       }
 
       @Override
@@ -327,7 +325,7 @@ public class TransactionManager extends AbstractService {
         txMetricsCollector.gauge("committing.size", committingChangeSets.size());
         txMetricsCollector.gauge("committed.size", committedChangeSets.size());
         txMetricsCollector.gauge("inprogress.size", inProgress.size());
-        txMetricsCollector.gauge("invalid.size", invalidArray.length);
+        txMetricsCollector.gauge("invalid.size", getInvalidSize());
       }
 
       @Override
@@ -363,7 +361,7 @@ public class TransactionManager extends AbstractService {
         }
         if (!timedOut.isEmpty()) {
           invalidEdits = Lists.newArrayListWithCapacity(timedOut.size());
-          invalid.addAll(timedOut.keySet());
+          invalidTxList.addAll(timedOut.keySet());
           for (Map.Entry<Long, InProgressType> tx : timedOut.entrySet()) {
             inProgress.remove(tx.getKey());
             // checkpoints never go into the committing change sets or the edits
@@ -373,9 +371,6 @@ public class TransactionManager extends AbstractService {
             }
           }
 
-          // todo: find a more efficient way to keep this sorted. Could it just be an array?
-          Collections.sort(invalid);
-          invalidArray = invalid.toLongArray();
           LOG.info("Invalidated {} transactions due to timeout.", timedOut.size());
         }
       }
@@ -468,7 +463,8 @@ public class TransactionManager extends AbstractService {
 
   public synchronized TransactionSnapshot getCurrentState() {
     return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer,
-                                        invalid, inProgress, committingChangeSets, committedChangeSets);
+                                        invalidTxList.toRawList(), inProgress, committingChangeSets,
+                                        committedChangeSets);
   }
 
   public synchronized void recoverState() {
@@ -497,7 +493,7 @@ public class TransactionManager extends AbstractService {
     Preconditions.checkState(lastSnapshotTime == 0, "lastSnapshotTime has been set!");
     Preconditions.checkState(readPointer == 0, "readPointer has been set!");
     Preconditions.checkState(lastWritePointer == 0, "lastWritePointer has been set!");
-    Preconditions.checkState(invalid.isEmpty(), "invalid list should be empty!");
+    Preconditions.checkState(invalidTxList.isEmpty(), "invalid list should be empty!");
     Preconditions.checkState(inProgress.isEmpty(), "inProgress map should be empty!");
     Preconditions.checkState(committingChangeSets.isEmpty(), "committingChangeSets should
be empty!");
     Preconditions.checkState(committedChangeSets.isEmpty(), "committedChangeSets should be
empty!");
@@ -506,7 +502,7 @@ public class TransactionManager extends AbstractService {
     lastSnapshotTime = snapshot.getTimestamp();
     readPointer = snapshot.getReadPointer();
     lastWritePointer = snapshot.getWritePointer();
-    invalid.addAll(snapshot.getInvalid());
+    invalidTxList.addAll(snapshot.getInvalid());
     inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress()));
     committingChangeSets.putAll(snapshot.getCommittingChangeSets());
     committedChangeSets.putAll(snapshot.getCommittedChangeSets());
@@ -818,14 +814,17 @@ public class TransactionManager extends AbstractService {
     txMetricsCollector.rate("canCommit");
     Stopwatch timer = new Stopwatch().start();
     if (inProgress.get(tx.getTransactionId()) == null) {
-      // invalid transaction, either this has timed out and moved to invalid, or something
else is wrong.
-      if (invalid.contains(tx.getTransactionId())) {
-        throw new TransactionNotInProgressException(
-          String.format("canCommit() is called for transaction %d that is not in progress
(it is known to be invalid)",
-                        tx.getTransactionId()));
-      } else {
-        throw new TransactionNotInProgressException(
-          String.format("canCommit() is called for transaction %d that is not in progress",
tx.getTransactionId()));
+      synchronized (this) {
+        // invalid transaction, either this has timed out and moved to invalid, or something
else is wrong.
+        if (invalidTxList.contains(tx.getTransactionId())) {
+          throw new TransactionNotInProgressException(
+            String.format(
+              "canCommit() is called for transaction %d that is not in progress (it is known
to be invalid)",
+              tx.getTransactionId()));
+        } else {
+          throw new TransactionNotInProgressException(
+            String.format("canCommit() is called for transaction %d that is not in progress",
tx.getTransactionId()));
+        }
       }
     }
 
@@ -872,7 +871,7 @@ public class TransactionManager extends AbstractService {
         commitPointer = lastWritePointer + 1;
         if (inProgress.get(tx.getTransactionId()) == null) {
           // invalid transaction, either this has timed out and moved to invalid, or something
else is wrong.
-          if (invalid.contains(tx.getTransactionId())) {
+          if (invalidTxList.contains(tx.getTransactionId())) {
             throw new TransactionNotInProgressException(
               String.format("canCommit() is called for transaction %d that is not in progress
" +
                               "(it is known to be invalid)", tx.getTransactionId()));
@@ -928,8 +927,7 @@ public class TransactionManager extends AbstractService {
     InProgressTx previous = inProgress.remove(transactionId);
     if (previous == null) {
       // tx was not in progress! perhaps it timed out and is invalid? try to remove it there.
-      if (invalid.rem(transactionId)) {
-        invalidArray = invalid.toLongArray();
+      if (invalidTxList.remove(transactionId)) {
         LOG.info("Tx invalid list: removed committed tx {}", transactionId);
       }
     } else {
@@ -983,17 +981,16 @@ public class TransactionManager extends AbstractService {
     boolean removeInProgressCheckpoints = true;
     if (removed == null) {
       // tx was not in progress! perhaps it timed out and is invalid? try to remove it there.
-      if (invalid.rem(writePointer)) {
+      if (invalidTxList.remove(writePointer)) {
         // the tx and all its children were invalidated: no need to remove them from inProgress
         removeInProgressCheckpoints = false;
         // remove any invalidated checkpoint pointers
         // this will only be present if the parent write pointer was also invalidated
         if (checkpointWritePointers != null) {
           for (long checkpointWritePointer : checkpointWritePointers) {
-            invalid.rem(checkpointWritePointer);
+            invalidTxList.remove(checkpointWritePointer);
           }
         }
-        invalidArray = invalid.toLongArray();
         LOG.info("Tx invalid list: removed aborted tx {}", writePointer);
       }
     }
@@ -1032,21 +1029,18 @@ public class TransactionManager extends AbstractService {
     // This check is to prevent from invalidating committed transactions
     if (previous != null || previousChangeSet != null) {
       // add tx to invalids
-      invalid.add(writePointer);
+      invalidTxList.add(writePointer);
       if (previous == null) {
         LOG.debug("Invalidating tx {} in committing change sets but not in-progress", writePointer);
       } else {
         // invalidate any checkpoint write pointers
         LongArrayList childWritePointers = previous.getCheckpointWritePointers();
         if (!childWritePointers.isEmpty()) {
-          invalid.addAll(childWritePointers);
+          invalidTxList.addAll(childWritePointers);
           inProgress.keySet().removeAll(childWritePointers);
         }
       }
       LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer);
-      // todo: find a more efficient way to keep this sorted. Could it just be an array?
-      Collections.sort(invalid);
-      invalidArray = invalid.toLongArray();
       if (previous != null && !previous.isLongRunning()) {
         // tx was short-running: must move read pointer
         moveReadPointerIfNeeded(writePointer);
@@ -1080,13 +1074,9 @@ public class TransactionManager extends AbstractService {
     }
   }
 
-  private boolean doTruncateInvalidTx(Set<Long> invalidTxIds) {
-    LOG.info("Removing tx ids {} from invalid list", invalidTxIds);
-    boolean success = invalid.removeAll(invalidTxIds);
-    if (success) {
-      invalidArray = invalid.toLongArray();
-    }
-    return success;
+  private boolean doTruncateInvalidTx(Set<Long> toRemove) {
+    LOG.info("Removing tx ids {} from invalid list", toRemove);
+    return invalidTxList.removeAll(toRemove);
   }
 
   /**
@@ -1123,15 +1113,16 @@ public class TransactionManager extends AbstractService {
     }
     
     // Find all invalid transactions earlier than truncateWp
-    Set<Long> toTruncate = Sets.newHashSet();
-    for (long wp : invalid) {
-      // invalid list is sorted, hence can stop as soon as we reach a wp >= truncateWp
-      if (wp >= truncateWp) {
-        break;
+    LongSet toTruncate = new LongArraySet();
+    LongIterator it = invalidTxList.toRawList().iterator();
+    while (it.hasNext()) {
+      long wp = it.nextLong();
+      if (wp < truncateWp) {
+        toTruncate.add(wp);
       }
-      toTruncate.add(wp);
     }
-    return doTruncateInvalidTx(toTruncate);
+    LOG.info("Removing tx ids {} from invalid list", toTruncate);
+    return invalidTxList.removeAll(toTruncate);
   }
 
   public Transaction checkpoint(Transaction originalTx) throws TransactionNotInProgressException
{
@@ -1149,7 +1140,7 @@ public class TransactionManager extends AbstractService {
         // check that the parent tx is in progress
         InProgressTx parentTx = inProgress.get(txId);
         if (parentTx == null) {
-          if (invalid.contains(txId)) {
+          if (invalidTxList.contains(txId)) {
             throw new TransactionNotInProgressException(
                 String.format("Transaction %d is not in progress because it was invalidated",
txId));
           } else {
@@ -1184,14 +1175,14 @@ public class TransactionManager extends AbstractService {
   
   // hack for exposing important metric
   public int getExcludedListSize() {
-    return invalid.size() + inProgress.size();
+    return getInvalidSize() + inProgress.size();
   }
 
   /**
    * @return the size of invalid list
    */
-  public int getInvalidSize() {
-    return this.invalid.size();
+  public synchronized int getInvalidSize() {
+    return this.invalidTxList.size();
   }
 
   int getCommittedSize() {
@@ -1254,7 +1245,8 @@ public class TransactionManager extends AbstractService {
         firstShortTx = txId;
       }
     }
-    return new Transaction(readPointer, writePointer, invalidArray, inProgressIds.toLongArray(),
firstShortTx, type);
+    return new Transaction(readPointer, writePointer, invalidTxList.toSortedArray(),
+                           inProgressIds.toLongArray(), firstShortTx, type);
   }
 
   private void appendToLog(TransactionEdit edit) {
@@ -1285,7 +1277,7 @@ public class TransactionManager extends AbstractService {
    */
   public void logStatistics() {
     LOG.info("Transaction Statistics: write pointer = " + lastWritePointer +
-               ", invalid = " + invalid.size() +
+               ", invalid = " + getInvalidSize() +
                ", in progress = " + inProgress.size() +
                ", committing = " + committingChangeSets.size() +
                ", committed = " + committedChangeSets.size());

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
new file mode 100644
index 0000000..231196c
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+import it.unimi.dsi.fastutil.longs.LongLists;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Arrays;
+import java.util.Collection;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * This is an internal class used by the {@link TransactionManager} to store invalid transaction
ids.
+ * This class uses both a list and an array to keep track of the invalid ids. The list is
the primary
+ * data structure for storing the invalid ids. The array is populated lazily on changes to
the list.
+ * The array is used to avoid creating a new array every time method {@link #toSortedArray()}
is invoked.
+ *
+ * This class is not thread safe and relies on external synchronization. TransactionManager
always
+ * accesses an instance of this class after synchronization.
+ */
+@NotThreadSafe
+public class InvalidTxList {
+  private static final long[] NO_INVALID_TX = { };
+
+  private final LongList invalid = new LongArrayList();
+  private long[] invalidArray = NO_INVALID_TX;
+
+  private boolean dirty = false; // used to track changes to the invalid list
+
+  public int size() {
+    return invalid.size();
+  }
+
+  public boolean isEmpty() {
+    return invalid.isEmpty();
+  }
+
+  public boolean add(long id) {
+    boolean changed = invalid.add(id);
+    dirty = dirty || changed;
+    return changed;
+  }
+
+  public boolean addAll(Collection<? extends Long> ids) {
+    boolean changed = invalid.addAll(ids);
+    dirty = dirty || changed;
+    return changed;
+  }
+
+  public boolean addAll(LongList ids) {
+    boolean changed = invalid.addAll(ids);
+    dirty = dirty || changed;
+    return changed;
+  }
+
+  public boolean contains(long id) {
+    return invalid.contains(id);
+  }
+
+  public boolean remove(long id) {
+    boolean changed = invalid.rem(id);
+    dirty = dirty || changed;
+    return changed;
+  }
+
+  public boolean removeAll(Collection<? extends Long> ids) {
+    boolean changed = invalid.removeAll(ids);
+    dirty = dirty || changed;
+    return changed;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean removeAll(LongList ids) {
+    boolean changed = invalid.removeAll(ids);
+    dirty = dirty || changed;
+    return changed;
+  }
+
+  public void clear() {
+    invalid.clear();
+    invalidArray = NO_INVALID_TX;
+    dirty = false;
+  }
+
+  /**
+   * @return sorted array of invalid transactions
+   */
+  public long[] toSortedArray() {
+    lazyUpdate();
+    return invalidArray;
+  }
+
+  /**
+   * @return list of invalid transactions. The list is not sorted.
+   */
+  public LongList toRawList() {
+    return LongLists.unmodifiable(invalid);
+  }
+
+  private void lazyUpdate() {
+    if (dirty) {
+      invalidArray = invalid.toLongArray();
+      Arrays.sort(invalidArray);
+      dirty = false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java b/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java
new file mode 100644
index 0000000..4f45072
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import com.google.common.collect.ImmutableList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class InvalidTxListTest {
+
+  @Test
+  public void testInvalidTxList() {
+    InvalidTxList invalidTxList = new InvalidTxList();
+    // Assert that the list is empty at the beginning
+    Assert.assertTrue(invalidTxList.isEmpty());
+    Assert.assertEquals(0, invalidTxList.size());
+    Assert.assertEquals(ImmutableList.of(), invalidTxList.toRawList());
+    Assert.assertArrayEquals(new long[0], invalidTxList.toSortedArray());
+
+    // Try removing something from the empty list
+    Assert.assertFalse(invalidTxList.remove(3));
+    Assert.assertFalse(invalidTxList.removeAll(ImmutableList.of(5L, 9L)));
+
+    // verify contains
+    Assert.assertFalse(invalidTxList.contains(3));
+
+    // Add some elements to the list
+    invalidTxList.add(3);
+    invalidTxList.add(1);
+    invalidTxList.add(8);
+    invalidTxList.add(5);
+
+    // verify contains
+    Assert.assertTrue(invalidTxList.contains(3));
+
+    // Assert the newly added elements
+    Assert.assertFalse(invalidTxList.isEmpty());
+    Assert.assertEquals(4, invalidTxList.size());
+    Assert.assertEquals(ImmutableList.of(3L, 1L, 8L, 5L), invalidTxList.toRawList());
+    Assert.assertArrayEquals(new long[] {1, 3, 5, 8}, invalidTxList.toSortedArray());
+
+    // Add a collection of elements
+    invalidTxList.addAll(ImmutableList.of(7L, 10L, 4L, 2L));
+
+    // Assert the newly added elements
+    Assert.assertFalse(invalidTxList.isEmpty());
+    Assert.assertEquals(8, invalidTxList.size());
+    Assert.assertEquals(ImmutableList.of(3L, 1L, 8L, 5L, 7L, 10L, 4L, 2L), invalidTxList.toRawList());
+    Assert.assertArrayEquals(new long[] {1, 2, 3, 4, 5, 7, 8, 10}, invalidTxList.toSortedArray());
+
+    // Remove elements that are not present
+    Assert.assertFalse(invalidTxList.remove(6));
+    Assert.assertFalse(invalidTxList.removeAll(ImmutableList.of(9L, 11L)));
+
+    // Remove a collection of elements
+    Assert.assertTrue(invalidTxList.removeAll(ImmutableList.of(8L, 4L, 2L)));
+    // This time check the array first and then check the list
+    Assert.assertArrayEquals(new long[] {1, 3, 5, 7, 10}, invalidTxList.toSortedArray());
+    Assert.assertEquals(ImmutableList.of(3L, 1L, 5L, 7L, 10L), invalidTxList.toRawList());
+
+    // Remove a single element
+    Assert.assertTrue(invalidTxList.remove(5));
+    Assert.assertArrayEquals(new long[] {1, 3, 7, 10}, invalidTxList.toSortedArray());
+    Assert.assertEquals(ImmutableList.of(3L, 1L, 7L, 10L), invalidTxList.toRawList());
+
+    // Add a LongCollection
+    invalidTxList.addAll(new LongArrayList(new long[] {15, 12, 13}));
+
+    // Assert the newly added elements
+    Assert.assertEquals(7, invalidTxList.size());
+    Assert.assertArrayEquals(new long[] {1, 3, 7, 10, 12, 13, 15}, invalidTxList.toSortedArray());
+    Assert.assertEquals(ImmutableList.of(3L, 1L, 7L, 10L, 15L, 12L, 13L), invalidTxList.toRawList());
+
+    // Remove a LongCollection
+    invalidTxList.removeAll(new LongArrayList(new long[] {3, 7, 12}));
+
+    // Assert removals
+    Assert.assertEquals(4, invalidTxList.size());
+    Assert.assertArrayEquals(new long[] {1, 10, 13, 15}, invalidTxList.toSortedArray());
+    Assert.assertEquals(ImmutableList.of(1L, 10L, 15L, 13L), invalidTxList.toRawList());
+
+    // Clear the list
+    invalidTxList.clear();
+    Assert.assertTrue(invalidTxList.isEmpty());
+    Assert.assertEquals(0, invalidTxList.size());
+    Assert.assertArrayEquals(new long[0], invalidTxList.toSortedArray());
+    Assert.assertEquals(ImmutableList.of(), invalidTxList.toRawList());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
index 21090c5..ec06528 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.ChangeId;
@@ -137,6 +138,9 @@ public abstract class AbstractTransactionStateStorageTest {
       // TODO: replace with new persistence tests
       final byte[] a = { 'a' };
       final byte[] b = { 'b' };
+      // Start and invalidate a transaction
+      Transaction invalid = txManager.startShort();
+      txManager.invalidate(invalid.getTransactionId());
       // start a tx1, add a change A and commit
       Transaction tx1 = txManager.startShort();
       Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
@@ -162,6 +166,12 @@ public abstract class AbstractTransactionStateStorageTest {
       LOG.info("New state: " + newState);
       assertEquals(origState, newState);
 
+      // Verify that the invalid transaction list matches
+      Transaction checkTx = txManager.startShort();
+      Assert.assertEquals(origState.getInvalid(), Longs.asList(checkTx.getInvalids()));
+      txManager.abort(checkTx);
+      txManager.abort(invalid);
+
       // commit tx2
       Assert.assertTrue(txManager.commit(tx2));
       // start another transaction, must be greater than tx3


Mime
View raw message