tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From poo...@apache.org
Subject [1/4] incubator-tephra git commit: TEPHRA-199 Transaction maximum lifetime
Date Wed, 28 Dec 2016 10:38:26 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master 36318c36d -> 79b97198c


TEPHRA-199 Transaction maximum lifetime

This closes #22

Signed-off-by: poorna <poorna@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/7c8267c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/7c8267c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/7c8267c4

Branch: refs/heads/master
Commit: 7c8267c4a348004d92479466037c1de69886a442
Parents: 36318c3
Author: poorna <poorna@cask.co>
Authored: Mon Dec 5 00:25:16 2016 -0800
Committer: poorna <poorna@apache.org>
Committed: Wed Dec 28 16:08:10 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/tephra/TxConstants.java     |  8 +++
 .../tephra/hbase/TransactionAwareHTable.java    |  1 +
 .../hbase/coprocessor/TransactionProcessor.java | 31 ++++++++++
 .../hbase/TransactionAwareHTableTest.java       | 61 ++++++++++++++++++++
 4 files changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7c8267c4/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 25451b3..bc02936 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -158,6 +158,14 @@ public class TxConstants {
      * The default value for the transaction timeout limit, in seconds: unlimited.
      */
     public static final int DEFAULT_TX_MAX_TIMEOUT = Integer.MAX_VALUE;
+    /**
+     * The maximum time in seconds that a transaction can be used for data writes.
+     */
+    public static final String CFG_TX_MAX_LIFETIME = "data.tx.max.lifetime";
+    /**
+     * The default value for the maximum transaction lifetime.
+     */
+    public static final int DEFAULT_TX_MAX_LIFETIME = (int) TimeUnit.HOURS.toSeconds(25);
     /** The frequency (in seconds) to perform periodic snapshots, or 0 for no periodic snapshots.
*/
     public static final String CFG_TX_SNAPSHOT_INTERVAL = "data.tx.snapshot.interval";
     /** Default value for frequency of periodic snapshots of transaction state. */

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7c8267c4/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index bb7afff..531e010 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -649,6 +649,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
         txDelete.setAttribute(entry.getKey(), entry.getValue());
     }
     txDelete.setDurability(delete.getDurability());
+    addToOperation(txDelete, tx);
     return txDelete;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7c8267c4/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9f723d6..132c157 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -67,6 +68,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 /**
  * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side
processing
@@ -106,6 +109,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
+  protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -143,6 +147,10 @@ public class TransactionProcessor extends BaseRegionObserver {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
 
+      this.txMaxLifetimeMillis =
+        TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+
       boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.DataJanitor.PRUNE_ENABLE,
                                                                TxConstants.DataJanitor.DEFAULT_PRUNE_ENABLE);
       if (pruneEnabled) {
@@ -179,6 +187,13 @@ public class TransactionProcessor extends BaseRegionObserver {
   }
 
   @Override
+  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit
edit, Durability durability)
+    throws IOException {
+    Transaction tx = getFromOperation(put);
+    ensureValidTxLifetime(tx);
+  }
+
+  @Override
   public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
WALEdit edit,
                         Durability durability) throws IOException {
     // Translate deletes into our own delete tombstones
@@ -191,6 +206,9 @@ public class TransactionProcessor extends BaseRegionObserver {
       return;
     }
 
+    Transaction tx = getFromOperation(delete);
+    ensureValidTxLifetime(tx);
+
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
     Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
@@ -341,6 +359,19 @@ public class TransactionProcessor extends BaseRegionObserver {
     return null;
   }
 
+  private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException
{
+    if (tx == null) {
+      return;
+    }
+
+    boolean validLifetime =
+      TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+    if (!validLifetime) {
+      throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime
%s ms",
+                                                    tx.getTransactionId(), txMaxLifetimeMillis));
+    }
+  }
+
   private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
     return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
       // to support old clients

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7c8267c4/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index c336712..46ac384 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -69,6 +69,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -1490,6 +1491,66 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest
{
     transactionContext.finish();
   }
 
+  @Test
+  public void testTxLifetime() throws Exception {
+    // Add some initial values
+    transactionContext.start();
+    Put put = new Put(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    transactionAwareHTable.put(put);
+    put = new Put(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2, TestBytes.value);
+    transactionAwareHTable.put(put);
+    transactionContext.finish();
+
+    // Simulate writing with a transaction past its max lifetime
+    transactionContext.start();
+    Transaction currentTx = transactionContext.getCurrentTransaction();
+    Assert.assertNotNull(currentTx);
+
+    // Create a transaction that is past the max lifetime
+    long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                     TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    long oldTxId = currentTx.getTransactionId() - ((txMaxLifetimeMillis + 10000) * TxConstants.MAX_TX_PER_MS);
+    Transaction oldTx = new Transaction(currentTx.getReadPointer(), oldTxId,
+                                        currentTx.getInvalids(), currentTx.getInProgress(),
+                                        currentTx.getFirstShortInProgress());
+    transactionAwareHTable.updateTx(oldTx);
+    // Put with the old transaction should fail
+    put = new Put(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    try {
+      transactionAwareHTable.put(put);
+      Assert.fail("Excepted exception with old transaction!");
+    } catch (IOException e) {
+      // Expected exception
+    }
+
+    // Delete with the old transaction should also fail
+    Delete delete = new Delete(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier);
+    try {
+      transactionAwareHTable.delete(delete);
+      Assert.fail("Excepted exception with old transaction!");
+    } catch (IOException e) {
+      // Expected exception
+    }
+
+    // Now update the table to use the current transaction
+    transactionAwareHTable.updateTx(currentTx);
+    put = new Put(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2, TestBytes.value);
+    transactionAwareHTable.put(put);
+    delete = new Delete(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2);
+    transactionAwareHTable.delete(delete);
+
+    // Verify values with the same transaction since we cannot commit the old transaction
+    verifyRow(transactionAwareHTable,
+              new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), TestBytes.value);
+    verifyRow(transactionAwareHTable,
+              new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), null);
+    verifyRow(transactionAwareHTable,
+              new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null);
+    verifyRow(transactionAwareHTable,
+              new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2),
TestBytes.value);
+    transactionContext.finish();
+  }
+
   /**
    * Tests that transaction co-processor works with older clients
    *


Mime
View raw message