tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From go...@apache.org
Subject incubator-tephra git commit: TEPHRA-210 Get table specific properties from tableDescriptor and other properties from getConfiguration method, construct compactionState lazily
Date Thu, 26 Jan 2017 20:49:08 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master 92c61e7c4 -> 87cb21a0f


TEPHRA-210 Get table specific properties from tableDescriptor and other properties from getConfiguration method, construct compactionState lazily

This closes #28 from GitHub.

Signed-off-by: Gokul Gunasekaran <gokul@cask.co>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/87cb21a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/87cb21a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/87cb21a0

Branch: refs/heads/master
Commit: 87cb21a0fff5d1e5f39594c746788761c2341b21
Parents: 92c61e7
Author: Gokul Gunasekaran <gokul@cask.co>
Authored: Thu Jan 19 18:01:14 2017 -0800
Committer: Gokul Gunasekaran <gokul@cask.co>
Committed: Thu Jan 26 12:48:34 2017 -0800

----------------------------------------------------------------------
 .../hbase/coprocessor/TransactionProcessor.java | 103 ++++++++++++++-----
 .../tephra/hbase/txprune/CompactionState.java   |   4 +-
 .../hbase/coprocessor/TransactionProcessor.java | 103 ++++++++++++++-----
 .../tephra/hbase/txprune/CompactionState.java   |   4 +-
 .../hbase/coprocessor/TransactionProcessor.java | 103 ++++++++++++++-----
 .../tephra/hbase/txprune/CompactionState.java   |   4 +-
 .../hbase/coprocessor/TransactionProcessor.java | 103 ++++++++++++++-----
 .../tephra/hbase/txprune/CompactionState.java   |   4 +-
 .../hbase/coprocessor/TransactionProcessor.java | 100 +++++++++++++-----
 .../tephra/hbase/txprune/CompactionState.java   |   4 +-
 10 files changed, 384 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 6e89571..c42dd64 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -109,7 +110,8 @@ public class TransactionProcessor extends BaseRegionObserver {
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  protected Long txMaxLifetimeMillis;
+  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver {
         ttlByFamily.put(columnDesc.getName(), ttl);
       }
 
-      this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
-                                                                TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
+      this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
-
-      this.txMaxLifetimeMillis =
-        TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-
-      boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                                               TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-      if (pruneEnabled) {
-        String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis);
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
-      }
     }
   }
 
+  /**
+   * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default,
+   * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived
+   * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched
+   * from a HBase Table.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @return {@link Configuration}, can be null if it is not available
+   */
+  @Nullable
+  protected Configuration getConfiguration(CoprocessorEnvironment env) {
+    return env.getConfiguration();
+  }
+
   protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
@@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
   }
 
   @Override
@@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -216,13 +217,11 @@ public class TransactionProcessor extends BaseRegionObserver {
       List<Cell> familyCells = delete.getFamilyCellMap().get(family);
       if (isFamilyDelete(familyCells)) {
         deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
-            HConstants.EMPTY_BYTE_ARRAY);
+                          HConstants.EMPTY_BYTE_ARRAY);
       } else {
-        int cellSize = familyCells.size();
-        for (int i = 0; i < cellSize; i++) {
-          Cell cell = familyCells.get(i);
+        for (Cell cell : familyCells) {
           deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
-                  HConstants.EMPTY_BYTE_ARRAY);
+                            HConstants.EMPTY_BYTE_ARRAY);
         }
       }
     }
@@ -234,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver {
     e.bypass();
   }
 
+  private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) {
+    String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY);
+    Configuration conf = getConfiguration(env);
+    boolean allowEmptyValuesFromConfig = (conf != null) ?
+      conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) :
+      TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+
+    // If the property is not present in the tableDescriptor, get it from the Configuration
+    return  (allowEmptyValuesFromTableDesc != null) ?
+      Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -304,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    // Record tx state before the compaction
-    if (compactionState != null) {
+    if (pruneEnable == null) {
+      Configuration conf = getConfiguration(c.getEnvironment());
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
+        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
+                    pruneTable);
+      }
+    }
+
+    if (Boolean.TRUE.equals(pruneEnable)) {
+      // Record tx state before the compaction
       compactionState.record(request, snapshot);
     }
+
     // Also make sure to use the same snapshot for the compaction
     return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
   }
@@ -359,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver {
     return null;
   }
 
-  private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException {
+  /**
+   * Make sure that the transaction is within the max valid transaction lifetime.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param tx {@link Transaction} supplied by the
+   * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
+   *         IOException throw if the value of max lifetime of transaction is unavailable
+   */
+  protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;
     }
 
+    if (txMaxLifetimeMillis == null) {
+      Configuration conf = getConfiguration(env);
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+      } else {
+        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
+                                              "unavailable. Please retry the operation."));
+      }
+    }
+
     boolean validLifetime =
       TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
     if (!validLifetime) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 1df754b..733f636 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -41,15 +41,13 @@ public class CompactionState {
   private final byte[] regionName;
   private final String regionNameAsString;
   private final TableName stateTable;
-  private final long txMaxLifetimeMills;
   private final DataJanitorState dataJanitorState;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) {
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
     this.regionName = env.getRegion().getRegionName();
     this.regionNameAsString = env.getRegion().getRegionNameAsString();
     this.stateTable = stateTable;
-    this.txMaxLifetimeMills = txMaxLifetimeMills;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 92862ad..19eb09e 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -109,7 +110,8 @@ public class TransactionProcessor extends BaseRegionObserver {
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  protected Long txMaxLifetimeMillis;
+  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver {
         ttlByFamily.put(columnDesc.getName(), ttl);
       }
 
-      this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
-                                                                TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
+      this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
-
-      this.txMaxLifetimeMillis =
-        TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-
-      boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                                               TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-      if (pruneEnabled) {
-        String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis);
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
-      }
     }
   }
 
+  /**
+   * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default,
+   * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived
+   * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched
+   * from a HBase Table.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @return {@link Configuration}, can be null if it is not available
+   */
+  @Nullable
+  protected Configuration getConfiguration(CoprocessorEnvironment env) {
+    return env.getConfiguration();
+  }
+
   protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
@@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
   }
 
   @Override
@@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -216,13 +217,11 @@ public class TransactionProcessor extends BaseRegionObserver {
       List<Cell> familyCells = delete.getFamilyCellMap().get(family);
       if (isFamilyDelete(familyCells)) {
         deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
-                   HConstants.EMPTY_BYTE_ARRAY);
+                          HConstants.EMPTY_BYTE_ARRAY);
       } else {
-        int cellSize = familyCells.size();
-        for (int i = 0; i < cellSize; i++) {
-          Cell cell = familyCells.get(i);
+        for (Cell cell : familyCells) {
           deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
-                  HConstants.EMPTY_BYTE_ARRAY);
+                            HConstants.EMPTY_BYTE_ARRAY);
         }
       }
     }
@@ -234,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver {
     e.bypass();
   }
 
+  private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) {
+    String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY);
+    Configuration conf = getConfiguration(env);
+    boolean allowEmptyValuesFromConfig = (conf != null) ?
+      conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) :
+      TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+
+    // If the property is not present in the tableDescriptor, get it from the Configuration
+    return  (allowEmptyValuesFromTableDesc != null) ?
+      Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -304,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    // Record tx state before the compaction
-    if (compactionState != null) {
+    if (pruneEnable == null) {
+      Configuration conf = getConfiguration(c.getEnvironment());
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
+        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
+                    pruneTable);
+      }
+    }
+
+    if (Boolean.TRUE.equals(pruneEnable)) {
+      // Record tx state before the compaction
       compactionState.record(request, snapshot);
     }
+
     // Also make sure to use the same snapshot for the compaction
     return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
   }
@@ -359,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver {
     return null;
   }
 
-  private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException {
+  /**
+   * Make sure that the transaction is within the max valid transaction lifetime.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param tx {@link Transaction} supplied by the
+   * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
+   *         IOException throw if the value of max lifetime of transaction is unavailable
+   */
+  protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;
     }
 
+    if (txMaxLifetimeMillis == null) {
+      Configuration conf = getConfiguration(env);
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+      } else {
+        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
+                                              "unavailable. Please retry the operation."));
+      }
+    }
+
     boolean validLifetime =
       TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
     if (!validLifetime) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 53bd96d..38a79d6 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -41,15 +41,13 @@ public class CompactionState {
   private final byte[] regionName;
   private final String regionNameAsString;
   private final TableName stateTable;
-  private final long txMaxLifetimeMills;
   private final DataJanitorState dataJanitorState;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) {
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
     this.stateTable = stateTable;
-    this.txMaxLifetimeMills = txMaxLifetimeMills;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 744fa4c..19eb09e 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -109,7 +110,8 @@ public class TransactionProcessor extends BaseRegionObserver {
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  protected Long txMaxLifetimeMillis;
+  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver {
         ttlByFamily.put(columnDesc.getName(), ttl);
       }
 
-      this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
-                                                                TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
+      this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
-
-      this.txMaxLifetimeMillis =
-        TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-
-      boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                                               TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-      if (pruneEnabled) {
-        String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis);
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
-      }
     }
   }
 
+  /**
+   * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default,
+   * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived
+   * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched
+   * from a HBase Table.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @return {@link Configuration}, can be null if it is not available
+   */
+  @Nullable
+  protected Configuration getConfiguration(CoprocessorEnvironment env) {
+    return env.getConfiguration();
+  }
+
   protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
@@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
   }
 
   @Override
@@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -216,13 +217,11 @@ public class TransactionProcessor extends BaseRegionObserver {
       List<Cell> familyCells = delete.getFamilyCellMap().get(family);
       if (isFamilyDelete(familyCells)) {
         deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
-            HConstants.EMPTY_BYTE_ARRAY);
+                          HConstants.EMPTY_BYTE_ARRAY);
       } else {
-        int cellSize = familyCells.size();
-        for (int i = 0; i < cellSize; i++) {
-          Cell cell = familyCells.get(i);
+        for (Cell cell : familyCells) {
           deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
-                  HConstants.EMPTY_BYTE_ARRAY);
+                            HConstants.EMPTY_BYTE_ARRAY);
         }
       }
     }
@@ -234,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver {
     e.bypass();
   }
 
+  private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) {
+    String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY);
+    Configuration conf = getConfiguration(env);
+    boolean allowEmptyValuesFromConfig = (conf != null) ?
+      conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) :
+      TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+
+    // If the property is not present in the tableDescriptor, get it from the Configuration
+    return  (allowEmptyValuesFromTableDesc != null) ?
+      Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -304,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    // Record tx state before the compaction
-    if (compactionState != null) {
+    if (pruneEnable == null) {
+      Configuration conf = getConfiguration(c.getEnvironment());
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
+        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
+                    pruneTable);
+      }
+    }
+
+    if (Boolean.TRUE.equals(pruneEnable)) {
+      // Record tx state before the compaction
       compactionState.record(request, snapshot);
     }
+
     // Also make sure to use the same snapshot for the compaction
     return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
   }
@@ -359,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver {
     return null;
   }
 
-  private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException {
+  /**
+   * Make sure that the transaction is within the max valid transaction lifetime.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param tx {@link Transaction} supplied by the
+   * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
+   *         IOException throw if the value of max lifetime of transaction is unavailable
+   */
+  protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;
     }
 
+    if (txMaxLifetimeMillis == null) {
+      Configuration conf = getConfiguration(env);
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+      } else {
+        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
+                                              "unavailable. Please retry the operation."));
+      }
+    }
+
     boolean validLifetime =
       TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
     if (!validLifetime) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 2e61275..850f508 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -41,15 +41,13 @@ public class CompactionState {
   private final byte[] regionName;
   private final String regionNameAsString;
   private final TableName stateTable;
-  private final long txMaxLifetimeMills;
   private final DataJanitorState dataJanitorState;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) {
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
     this.stateTable = stateTable;
-    this.txMaxLifetimeMills = txMaxLifetimeMills;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 744fa4c..19eb09e 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -109,7 +110,8 @@ public class TransactionProcessor extends BaseRegionObserver {
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  protected Long txMaxLifetimeMillis;
+  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver {
         ttlByFamily.put(columnDesc.getName(), ttl);
       }
 
-      this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
-                                                                TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
+      this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
-
-      this.txMaxLifetimeMillis =
-        TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-
-      boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                                               TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-      if (pruneEnabled) {
-        String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis);
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
-      }
     }
   }
 
+  /**
+   * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default,
+   * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived
+   * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched
+   * from a HBase Table.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @return {@link Configuration}, can be null if it is not available
+   */
+  @Nullable
+  protected Configuration getConfiguration(CoprocessorEnvironment env) {
+    return env.getConfiguration();
+  }
+
   protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
@@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
   }
 
   @Override
@@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -216,13 +217,11 @@ public class TransactionProcessor extends BaseRegionObserver {
       List<Cell> familyCells = delete.getFamilyCellMap().get(family);
       if (isFamilyDelete(familyCells)) {
         deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
-            HConstants.EMPTY_BYTE_ARRAY);
+                          HConstants.EMPTY_BYTE_ARRAY);
       } else {
-        int cellSize = familyCells.size();
-        for (int i = 0; i < cellSize; i++) {
-          Cell cell = familyCells.get(i);
+        for (Cell cell : familyCells) {
           deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
-                  HConstants.EMPTY_BYTE_ARRAY);
+                            HConstants.EMPTY_BYTE_ARRAY);
         }
       }
     }
@@ -234,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver {
     e.bypass();
   }
 
+  private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) {
+    String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY);
+    Configuration conf = getConfiguration(env);
+    boolean allowEmptyValuesFromConfig = (conf != null) ?
+      conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) :
+      TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+
+    // If the property is not present in the tableDescriptor, get it from the Configuration
+    return  (allowEmptyValuesFromTableDesc != null) ?
+      Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -304,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    // Record tx state before the compaction
-    if (compactionState != null) {
+    if (pruneEnable == null) {
+      Configuration conf = getConfiguration(c.getEnvironment());
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
+        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
+                    pruneTable);
+      }
+    }
+
+    if (Boolean.TRUE.equals(pruneEnable)) {
+      // Record tx state before the compaction
       compactionState.record(request, snapshot);
     }
+
     // Also make sure to use the same snapshot for the compaction
     return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
   }
@@ -359,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver {
     return null;
   }
 
-  private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException {
+  /**
+   * Make sure that the transaction is within the max valid transaction lifetime.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param tx {@link Transaction} supplied by the
+   * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
+   *         IOException throw if the value of max lifetime of transaction is unavailable
+   */
+  protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;
     }
 
+    if (txMaxLifetimeMillis == null) {
+      Configuration conf = getConfiguration(env);
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+      } else {
+        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
+                                              "unavailable. Please retry the operation."));
+      }
+    }
+
     boolean validLifetime =
       TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
     if (!validLifetime) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 2e61275..850f508 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -41,15 +41,13 @@ public class CompactionState {
   private final byte[] regionName;
   private final String regionNameAsString;
   private final TableName stateTable;
-  private final long txMaxLifetimeMills;
   private final DataJanitorState dataJanitorState;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) {
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
     this.stateTable = stateTable;
-    this.txMaxLifetimeMills = txMaxLifetimeMills;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index ceffa4c..45eed50 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -103,13 +104,14 @@ import javax.annotation.Nullable;
 public class TransactionProcessor extends BaseRegionObserver {
   private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
 
-  private TransactionStateCache cache;
   private final TransactionCodec txCodec;
+  private TransactionStateCache cache;
   private CompactionState compactionState;
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  protected Long txMaxLifetimeMillis;
+  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver {
         ttlByFamily.put(columnDesc.getName(), ttl);
       }
 
-      this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY,
-                                                                TxConstants.ALLOW_EMPTY_VALUES_DEFAULT);
+      this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
-
-      this.txMaxLifetimeMillis =
-        TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-
-      boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
-                                                               TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-      if (pruneEnabled) {
-        String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis);
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
-      }
     }
   }
 
+  /**
+   * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default,
+   * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived
+   * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched
+   * from a HBase Table.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @return {@link Configuration}, can be null if it is not available
+   */
+  @Nullable
+  protected Configuration getConfiguration(CoprocessorEnvironment env) {
+    return env.getConfiguration();
+  }
+
   protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
     return new TransactionStateCacheSupplier(env.getConfiguration());
   }
@@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
   }
 
   @Override
@@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(tx);
+    ensureValidTxLifetime(e.getEnvironment(), tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -216,9 +217,8 @@ public class TransactionProcessor extends BaseRegionObserver {
       List<Cell> familyCells = delete.getFamilyCellMap().get(family);
       if (isFamilyDelete(familyCells)) {
         deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
-            HConstants.EMPTY_BYTE_ARRAY);
+                          HConstants.EMPTY_BYTE_ARRAY);
       } else {
-        int cellSize = familyCells.size();
         for (Cell cell : familyCells) {
           deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
                             HConstants.EMPTY_BYTE_ARRAY);
@@ -233,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver {
     e.bypass();
   }
 
+  private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) {
+    String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY);
+    Configuration conf = getConfiguration(env);
+    boolean allowEmptyValuesFromConfig = (conf != null) ?
+      conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) :
+      TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+
+    // If the property is not present in the tableDescriptor, get it from the Configuration
+    return  (allowEmptyValuesFromTableDesc != null) ?
+      Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -303,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    // Record tx state before the compaction
-    if (compactionState != null) {
+    if (pruneEnable == null) {
+      Configuration conf = getConfiguration(c.getEnvironment());
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                      TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
+        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
+                    pruneTable);
+      }
+    }
+
+    if (Boolean.TRUE.equals(pruneEnable)) {
+      // Record tx state before the compaction
       compactionState.record(request, snapshot);
     }
+
     // Also make sure to use the same snapshot for the compaction
     return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
   }
@@ -358,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver {
     return null;
   }
 
-  private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException {
+  /**
+   * Make sure that the transaction is within the max valid transaction lifetime.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param tx {@link Transaction} supplied by the
+   * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
+   *         IOException throw if the value of max lifetime of transaction is unavailable
+   */
+  protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;
     }
 
+    if (txMaxLifetimeMillis == null) {
+      Configuration conf = getConfiguration(env);
+      // Configuration won't be null in TransactionProcessor but the derived classes might return
+      // null if it is not available temporarily
+      if (conf != null) {
+        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+      } else {
+        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
+                                              "unavailable. Please retry the operation."));
+      }
+    }
+
     boolean validLifetime =
       TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
     if (!validLifetime) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 2e61275..850f508 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -41,15 +41,13 @@ public class CompactionState {
   private final byte[] regionName;
   private final String regionNameAsString;
   private final TableName stateTable;
-  private final long txMaxLifetimeMills;
   private final DataJanitorState dataJanitorState;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) {
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
     this.stateTable = stateTable;
-    this.txMaxLifetimeMills = txMaxLifetimeMills;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {


Mime
View raw message