tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject incubator-tephra git commit: TEPHRA-208 Compaction removes rows incorrectly for family deletes with column level conflict detection
Date Thu, 02 Feb 2017 20:59:18 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master d3c0b6157 -> 6edfa091c


TEPHRA-208 Compaction removes rows incorrectly for family deletes with column level conflict detection

This closes #26 from GitHub.

Signed-off-by: Thomas D'Silva <tdsilva@salesforce.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/6edfa091
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/6edfa091
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/6edfa091

Branch: refs/heads/master
Commit: 6edfa091cbda59a8cee2613d8456b5a7ccad38a2
Parents: d3c0b61
Author: Thomas D'Silva <tdsilva@salesforce.com>
Authored: Mon Jan 9 15:02:00 2017 -0800
Committer: Thomas D'Silva <tdsilva@salesforce.com>
Committed: Thu Feb 2 12:58:38 2017 -0800

----------------------------------------------------------------------
 .../TransactionVisibilityFilter.java            |   6 +-
 .../hbase/TransactionAwareHTableTest.java       | 119 +++++++++++++++++--
 .../TransactionVisibilityFilter.java            |   6 +-
 .../hbase/TransactionAwareHTableTest.java       | 110 ++++++++++++++++-
 .../TransactionVisibilityFilter.java            |   9 +-
 .../hbase/TransactionAwareHTableTest.java       | 110 ++++++++++++++++-
 .../TransactionVisibilityFilter.java            |   9 +-
 .../hbase/TransactionAwareHTableTest.java       | 114 ++++++++++++++++--
 .../TransactionVisibilityFilter.java            |   7 +-
 .../hbase/TransactionAwareHTableTest.java       | 116 +++++++++++++++++-
 10 files changed, 572 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index a986d1c..3675268 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -291,6 +291,7 @@ public class TransactionVisibilityFilter extends FilterBase {
 
   private static final class DeleteTracker {
     private long familyDeleteTs;
+    private byte[] rowKey;
 
     public static boolean isFamilyDelete(Cell cell) {
       return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
@@ -300,14 +301,17 @@ public class TransactionVisibilityFilter extends FilterBase {
 
     public void addFamilyDelete(Cell delete) {
       this.familyDeleteTs = delete.getTimestamp();
+      this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength());
     }
 
     public boolean isDeleted(Cell cell) {
-      return cell.getTimestamp() <= familyDeleteTs;
+      return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), 
+        cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs;
     }
 
     public void reset() {
       this.familyDeleteTs = 0;
+      this.rowKey = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index afd7c01..9c5dca2 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -20,14 +20,18 @@ package org.apache.tephra.hbase;
  import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -43,23 +47,29 @@ import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.ValueFilter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
- import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
- import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.TxConstants.ConflictDetection;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.InMemoryTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +80,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
- import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeUnit;
 
  import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -92,7 +102,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
   private HTable hTable;
-
+  
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+  
+  private static MiniDFSCluster dfsCluster;
+  
   private static final class TestBytes {
     private static final byte[] table = Bytes.toBytes("testtable");
     private static final byte[] family = Bytes.toBytes("f1");
@@ -136,18 +151,33 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     }
   }
   
+  //override AbstractHBaseTableTest.startMiniCluster to setup configuration
   @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    txStateStorage = new InMemoryTransactionStateStorage();
-    txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
-    txManager.startAndWait();
-  }
+  public static void startMiniCluster() throws Exception {
+     // setup the configuration to use  HDFSTransactionStateStorage
+     conf = HBaseConfiguration.create();
+     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+     dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    
+     conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+     conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+     conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5);
+    
+     AbstractHBaseTableTest.startMiniCluster(); 
+
+     txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+     txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+     txManager.startAndWait();
+   }
 
   @AfterClass
   public static void shutdownAfterClass() throws Exception {
     if (txManager != null) {
       txManager.stopAndWait();
     }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
   }
 
   @Before
@@ -243,7 +273,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       result = txTable.get(new Get(TestBytes.row));
       txContext.finish();
       assertTrue(result.isEmpty());
-
+      
       // test column delete
       // load 10 rows
       txContext.start();
@@ -395,6 +425,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       }
     }
   
+  @Test
+  public void testFamilyDeleteWithCompaction() throws Exception {
+    HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+                                new byte[][]{TestBytes.family, TestBytes.family2});
+    try {
+      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+            
+      txContext.start();
+      Put put = new Put(TestBytes.row);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      
+      put = new Put(TestBytes.row2);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      txContext.finish();
+      
+      txContext.start();
+      Result result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertFalse(result.isEmpty());
+      
+      txContext.start();
+      // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete)
+      Delete delete = new Delete(TestBytes.row);
+      delete.deleteFamily(TestBytes.family);
+      txTable.delete(delete);
+      txContext.finish();
+      
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertTrue(result.isEmpty());
+      
+      boolean compactionDone = false;
+      int count = 0;
+      while (count++ < 12 && !compactionDone) {
+         // run major compaction and verify the row was removed
+         HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
+         hbaseAdmin.flush("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.close();
+         Thread.sleep(5000L);
+         
+         Scan scan = new Scan();
+         scan.setStartRow(TestBytes.row);
+         scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 }));
+         scan.setRaw(true);
+         
+         ResultScanner scanner = hTable.getScanner(scan);
+         compactionDone = scanner.next() == null;
+         scanner.close();
+      }
+      assertTrue("Compaction should have removed the row", compactionDone);
+      
+      Scan scan = new Scan();
+      scan.setStartRow(TestBytes.row2);
+      scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 }));
+      scan.setRaw(true);
+      
+      ResultScanner scanner = hTable.getScanner(scan);
+      Result res = scanner.next();
+      assertNotNull(res);
+      } finally {
+        hTable.close();
+      }
+  }
+  
   /**
    * Test aborted transactional delete requests, that must be rolled back.
    *

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index a3b902a..9a617a9 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -291,6 +291,7 @@ public class TransactionVisibilityFilter extends FilterBase {
 
   private static final class DeleteTracker {
     private long familyDeleteTs;
+    private byte[] rowKey;
 
     public static boolean isFamilyDelete(Cell cell) {
       return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
@@ -300,14 +301,17 @@ public class TransactionVisibilityFilter extends FilterBase {
 
     public void addFamilyDelete(Cell delete) {
       this.familyDeleteTs = delete.getTimestamp();
+      this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength());
     }
 
     public boolean isDeleted(Cell cell) {
-      return cell.getTimestamp() <= familyDeleteTs;
+      return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), 
+        cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs;
     }
 
     public void reset() {
       this.familyDeleteTs = 0;
+      this.rowKey = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 2922cf3..68324c9 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,14 +19,19 @@ package org.apache.tephra.hbase;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -42,23 +47,29 @@ import org.apache.hadoop.hbase.filter.LongComparator;
 import org.apache.hadoop.hbase.filter.ValueFilter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.TxConstants.ConflictDetection;
 import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.InMemoryTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,7 +102,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
   private HTable hTable;
-
+  
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+  
+  private static MiniDFSCluster dfsCluster;
+  
   private static final class TestBytes {
     private static final byte[] table = Bytes.toBytes("testtable");
     private static final byte[] family = Bytes.toBytes("f1");
@@ -135,9 +151,21 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     }
   }
   
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    txStateStorage = new InMemoryTransactionStateStorage();
+ // override AbstractHBaseTableTest.startMiniCluster to setup configuration
+ @BeforeClass
+ public static void startMiniCluster() throws Exception {
+    // setup the configuration to use  HDFSTransactionStateStorage
+    conf = HBaseConfiguration.create();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    
+    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+    conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5);
+    
+    AbstractHBaseTableTest.startMiniCluster(); 
+
+    txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
     txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
     txManager.startAndWait();
   }
@@ -147,6 +175,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     if (txManager != null) {
       txManager.stopAndWait();
     }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
   }
 
   @Before
@@ -242,7 +273,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       result = txTable.get(new Get(TestBytes.row));
       txContext.finish();
       assertTrue(result.isEmpty());
-
+      
       // test column delete
       // load 10 rows
       txContext.start();
@@ -347,6 +378,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     }
   }
   
+  @Test
+  public void testFamilyDeleteWithCompaction() throws Exception {
+    HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+                                new byte[][]{TestBytes.family, TestBytes.family2});
+    try {
+      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+            
+      txContext.start();
+      Put put = new Put(TestBytes.row);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      
+      put = new Put(TestBytes.row2);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      txContext.finish();
+      
+      txContext.start();
+      Result result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertFalse(result.isEmpty());
+      
+      txContext.start();
+      // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete)
+      Delete delete = new Delete(TestBytes.row);
+      delete.deleteFamily(TestBytes.family);
+      txTable.delete(delete);
+      txContext.finish();
+      
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertTrue(result.isEmpty());
+      
+      boolean compactionDone = false;
+      int count = 0;
+      while (count++ < 12 && !compactionDone) {
+         // run major compaction and verify the row was removed
+         HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
+         hbaseAdmin.flush("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.close();
+         Thread.sleep(5000L);
+         
+         Scan scan = new Scan();
+         scan.setStartRow(TestBytes.row);
+         scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 }));
+         scan.setRaw(true);
+         
+         ResultScanner scanner = hTable.getScanner(scan);
+         compactionDone = scanner.next() == null;
+         scanner.close();
+      }
+      assertTrue("Compaction should have removed the row", compactionDone);
+      
+      Scan scan = new Scan();
+      scan.setStartRow(TestBytes.row2);
+      scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 }));
+      scan.setRaw(true);
+      
+      ResultScanner scanner = hTable.getScanner(scan);
+      Result res = scanner.next();
+      assertNotNull(res);
+      } finally {
+        hTable.close();
+      }
+  }
+  
   /**
    * Test that put and delete attributes are preserved
    *

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index 541e019..9825ded 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -19,6 +19,7 @@
 package org.apache.tephra.hbase.coprocessor;
 
 import com.google.common.collect.Maps;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.util.TxUtils;
@@ -34,6 +36,7 @@ import org.apache.tephra.util.TxUtils;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+
 import javax.annotation.Nullable;
 
 /**
@@ -291,6 +294,7 @@ public class TransactionVisibilityFilter extends FilterBase {
 
   private static final class DeleteTracker {
     private long familyDeleteTs;
+    private byte[] rowKey;
 
     public static boolean isFamilyDelete(Cell cell) {
       return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
@@ -300,14 +304,17 @@ public class TransactionVisibilityFilter extends FilterBase {
 
     public void addFamilyDelete(Cell delete) {
       this.familyDeleteTs = delete.getTimestamp();
+      this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength());
     }
 
     public boolean isDeleted(Cell cell) {
-      return cell.getTimestamp() <= familyDeleteTs;
+      return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), 
+        cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs;
     }
 
     public void reset() {
       this.familyDeleteTs = 0;
+      this.rowKey = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 0dc483d..defad80 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,14 +19,19 @@ package org.apache.tephra.hbase;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -42,23 +47,29 @@ import org.apache.hadoop.hbase.filter.LongComparator;
 import org.apache.hadoop.hbase.filter.ValueFilter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.TxConstants.ConflictDetection;
 import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.InMemoryTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,7 +102,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
   private HTable hTable;
-
+  
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+  
+  private static MiniDFSCluster dfsCluster;
+  
   private static final class TestBytes {
     private static final byte[] table = Bytes.toBytes("testtable");
     private static final byte[] family = Bytes.toBytes("f1");
@@ -135,9 +151,21 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     }
   }
   
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    txStateStorage = new InMemoryTransactionStateStorage();
+ //override AbstractHBaseTableTest.startMiniCluster to setup configuration
+ @BeforeClass
+ public static void startMiniCluster() throws Exception {
+    // setup the configuration to use  HDFSTransactionStateStorage
+    conf = HBaseConfiguration.create();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+   
+    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+    conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5);
+   
+    AbstractHBaseTableTest.startMiniCluster(); 
+
+    txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
     txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
     txManager.startAndWait();
   }
@@ -147,6 +175,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     if (txManager != null) {
       txManager.stopAndWait();
     }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
   }
 
   @Before
@@ -242,7 +273,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       result = txTable.get(new Get(TestBytes.row));
       txContext.finish();
       assertTrue(result.isEmpty());
-
+      
       // test column delete
       // load 10 rows
       txContext.start();
@@ -394,6 +425,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       }
     }
   
+  @Test
+  public void testFamilyDeleteWithCompaction() throws Exception {
+    HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+                                new byte[][]{TestBytes.family, TestBytes.family2});
+    try {
+      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+            
+      txContext.start();
+      Put put = new Put(TestBytes.row);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      
+      put = new Put(TestBytes.row2);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      txContext.finish();
+      
+      txContext.start();
+      Result result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertFalse(result.isEmpty());
+      
+      txContext.start();
+      // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete)
+      Delete delete = new Delete(TestBytes.row);
+      delete.deleteFamily(TestBytes.family);
+      txTable.delete(delete);
+      txContext.finish();
+      
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertTrue(result.isEmpty());
+      
+      boolean compactionDone = false;
+      int count = 0;
+      while (count++ < 12 && !compactionDone) {
+         // run major compaction and verify the row was removed
+         HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
+         hbaseAdmin.flush("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.close();
+         Thread.sleep(5000L);
+         
+         Scan scan = new Scan();
+         scan.setStartRow(TestBytes.row);
+         scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 }));
+         scan.setRaw(true);
+         
+         ResultScanner scanner = hTable.getScanner(scan);
+         compactionDone = scanner.next() == null;
+         scanner.close();
+      }
+      assertTrue("Compaction should have removed the row", compactionDone);
+      
+      Scan scan = new Scan();
+      scan.setStartRow(TestBytes.row2);
+      scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 }));
+      scan.setRaw(true);
+      
+      ResultScanner scanner = hTable.getScanner(scan);
+      Result res = scanner.next();
+      assertNotNull(res);
+      } finally {
+        hTable.close();
+      }
+  }
+  
   /**
    * Test aborted transactional delete requests, that must be rolled back.
    *

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index 541e019..9825ded 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -19,6 +19,7 @@
 package org.apache.tephra.hbase.coprocessor;
 
 import com.google.common.collect.Maps;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.util.TxUtils;
@@ -34,6 +36,7 @@ import org.apache.tephra.util.TxUtils;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+
 import javax.annotation.Nullable;
 
 /**
@@ -291,6 +294,7 @@ public class TransactionVisibilityFilter extends FilterBase {
 
   private static final class DeleteTracker {
     private long familyDeleteTs;
+    private byte[] rowKey;
 
     public static boolean isFamilyDelete(Cell cell) {
       return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
@@ -300,14 +304,17 @@ public class TransactionVisibilityFilter extends FilterBase {
 
     public void addFamilyDelete(Cell delete) {
       this.familyDeleteTs = delete.getTimestamp();
+      this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength());
     }
 
     public boolean isDeleted(Cell cell) {
-      return cell.getTimestamp() <= familyDeleteTs;
+      return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), 
+        cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs;
     }
 
     public void reset() {
       this.familyDeleteTs = 0;
+      this.rowKey = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index e2fadbd..30fcd8a 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,14 +19,19 @@ package org.apache.tephra.hbase;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -42,23 +47,29 @@ import org.apache.hadoop.hbase.filter.LongComparator;
 import org.apache.hadoop.hbase.filter.ValueFilter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.TxConstants.ConflictDetection;
 import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.InMemoryTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,7 +102,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
   private HTable hTable;
-
+  
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+  
+  private static MiniDFSCluster dfsCluster;
+  
   private static final class TestBytes {
     private static final byte[] table = Bytes.toBytes("testtable");
     private static final byte[] family = Bytes.toBytes("f1");
@@ -135,18 +151,33 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     }
   }
 
+  //override AbstractHBaseTableTest.startMiniCluster to setup configuration
   @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    txStateStorage = new InMemoryTransactionStateStorage();
-    txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
-    txManager.startAndWait();
-  }
+  public static void startMiniCluster() throws Exception {
+     // setup the configuration to use  HDFSTransactionStateStorage
+     conf = HBaseConfiguration.create();
+     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+     dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    
+     conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+     conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+     conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5);
+    
+     AbstractHBaseTableTest.startMiniCluster(); 
+
+     txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+     txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+     txManager.startAndWait();
+   }
 
   @AfterClass
   public static void shutdownAfterClass() throws Exception {
     if (txManager != null) {
       txManager.stopAndWait();
     }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
   }
 
   @Before
@@ -242,7 +273,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       result = txTable.get(new Get(TestBytes.row));
       txContext.finish();
       assertTrue(result.isEmpty());
-
+      
       // test column delete
       // load 10 rows
       txContext.start();
@@ -394,6 +425,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       }
     }
   
+  @Test
+  public void testFamilyDeleteWithCompaction() throws Exception {
+    HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+                                new byte[][]{TestBytes.family, TestBytes.family2});
+    try {
+      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+            
+      txContext.start();
+      Put put = new Put(TestBytes.row);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      
+      put = new Put(TestBytes.row2);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      txContext.finish();
+      
+      txContext.start();
+      Result result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertFalse(result.isEmpty());
+      
+      txContext.start();
+      // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete)
+      Delete delete = new Delete(TestBytes.row);
+      delete.deleteFamily(TestBytes.family);
+      txTable.delete(delete);
+      txContext.finish();
+      
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertTrue(result.isEmpty());
+      
+      boolean compactionDone = false;
+      int count = 0;
+      while (count++ < 12 && !compactionDone) {
+         // run major compaction and verify the row was removed
+         HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
+         hbaseAdmin.flush("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.close();
+         Thread.sleep(5000L);
+         
+         Scan scan = new Scan();
+         scan.setStartRow(TestBytes.row);
+         scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 }));
+         scan.setRaw(true);
+         
+         ResultScanner scanner = hTable.getScanner(scan);
+         compactionDone = scanner.next() == null;
+         scanner.close();
+      }
+      assertTrue("Compaction should have removed the row", compactionDone);
+      
+      Scan scan = new Scan();
+      scan.setStartRow(TestBytes.row2);
+      scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 }));
+      scan.setRaw(true);
+      
+      ResultScanner scanner = hTable.getScanner(scan);
+      Result res = scanner.next();
+      assertNotNull(res);
+      } finally {
+        hTable.close();
+      }
+  }
+  
   /**
    * Test aborted transactional delete requests, that must be rolled back.
    *

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
index a258972..5ad7c29 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.util.TxUtils;
@@ -286,6 +287,7 @@ public class TransactionVisibilityFilter extends FilterBase {
 
   private static final class DeleteTracker {
     private long familyDeleteTs;
+    private byte[] rowKey;
 
     public static boolean isFamilyDelete(Cell cell) {
       return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
@@ -295,14 +297,17 @@ public class TransactionVisibilityFilter extends FilterBase {
 
     public void addFamilyDelete(Cell delete) {
       this.familyDeleteTs = delete.getTimestamp();
+      this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength());
     }
 
     public boolean isDeleted(Cell cell) {
-      return cell.getTimestamp() <= familyDeleteTs;
+      return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), 
+        cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs;
     }
 
     public void reset() {
       this.familyDeleteTs = 0;
+      this.rowKey = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 46ac384..11ffd1a 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,14 +19,17 @@ package org.apache.tephra.hbase;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -42,23 +45,29 @@ import org.apache.hadoop.hbase.filter.LongComparator;
 import org.apache.hadoop.hbase.filter.ValueFilter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.TxConstants.ConflictDetection;
 import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.InMemoryTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,6 +100,15 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
   private HTable hTable;
+  
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+  
+  private static MiniDFSCluster dfsCluster;
+  
+  public static void tearDownAfterClass() throws Exception {
+    dfsCluster.shutdown();
+  }
 
   private static final class TestBytes {
     private static final byte[] table = Bytes.toBytes("testtable");
@@ -137,7 +155,32 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
-    txStateStorage = new InMemoryTransactionStateStorage();
+    testUtil = new HBaseTestingUtility();
+    conf = testUtil.getConfiguration();
+
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+
+    conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5);
+    
+    // Tune down the connection thread pool size
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    // Tunn down handler threads in regionserver
+    conf.setInt("hbase.regionserver.handler.count", 10);
+
+    // Set to random port
+    conf.setInt("hbase.master.port", 0);
+    conf.setInt("hbase.master.info.port", 0);
+    conf.setInt("hbase.regionserver.port", 0);
+    conf.setInt("hbase.regionserver.info.port", 0);
+
+    testUtil.startMiniCluster();
+    hBaseAdmin = testUtil.getHBaseAdmin();
+    txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
     txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
     txManager.startAndWait();
   }
@@ -241,7 +284,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       result = txTable.get(new Get(TestBytes.row));
       txContext.finish();
       assertTrue(result.isEmpty());
-
+      
       // test column delete
       // load 10 rows
       txContext.start();
@@ -391,6 +434,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       }
     }
   
+  @Test
+  public void testFamilyDeleteWithCompaction() throws Exception {
+    HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+                                new byte[][]{TestBytes.family, TestBytes.family2});
+    try {
+      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+            
+      txContext.start();
+      Put put = new Put(TestBytes.row);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      
+      put = new Put(TestBytes.row2);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      txTable.put(put);
+      txContext.finish();
+      
+      txContext.start();
+      Result result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertFalse(result.isEmpty());
+      
+      txContext.start();
+      // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete)
+      Delete delete = new Delete(TestBytes.row);
+      delete.deleteFamily(TestBytes.family);
+      txTable.delete(delete);
+      txContext.finish();
+      
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertTrue(result.isEmpty());
+      
+      boolean compactionDone = false;
+      int count = 0;
+      while (count++ < 12 && !compactionDone) {
+         // run major compaction and verify the row was removed
+         HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
+         hbaseAdmin.flush("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction");
+         hbaseAdmin.close();
+         Thread.sleep(5000L);
+         
+         Scan scan = new Scan();
+         scan.setStartRow(TestBytes.row);
+         scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 }));
+         scan.setRaw(true);
+         
+         ResultScanner scanner = hTable.getScanner(scan);
+         compactionDone = scanner.next() == null;
+         scanner.close();
+      }
+      assertTrue("Compaction should have removed the row", compactionDone);
+      
+      Scan scan = new Scan();
+      scan.setStartRow(TestBytes.row2);
+      scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 }));
+      scan.setRaw(true);
+      
+      ResultScanner scanner = hTable.getScanner(scan);
+      Result res = scanner.next();
+      assertNotNull(res);
+      } finally {
+        hTable.close();
+      }
+  }
+  
   /**
    * Test aborted transactional delete requests, that must be rolled back.
    *


Mime
View raw message