tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject [incubator-tephra] branch master updated: TEPHRA-299 Executing a large batch delete is very slow.
Date Sun, 09 Jun 2019 23:28:33 GMT
This is an automated email from the ASF dual-hosted git repository.

larsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tephra.git


The following commit(s) were added to refs/heads/master by this push:
     new 1caeea1  TEPHRA-299 Executing a large batch delete is very slow.
1caeea1 is described below

commit 1caeea197f39563dfa02219d887584150a449c59
Author: Lars Hofhansl <larsh@apache.org>
AuthorDate: Sun Jun 9 16:28:51 2019 -0700

    TEPHRA-299 Executing a large batch delete is very slow.
---
 .../tephra/hbase/TransactionAwareHTable.java       | 35 +++++++++++++---------
 .../tephra/hbase/TransactionAwareHTable.java       | 35 +++++++++++++---------
 .../tephra/hbase/TransactionAwareHTable.java       | 35 ++++++++++++----------
 3 files changed, 62 insertions(+), 43 deletions(-)

diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index 18886c7..347c1fe 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
@@ -316,7 +317,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    hTable.delete(transactionalizeAction(delete));
+    hTable.put(transactionalizeAction(delete));
   }
 
   @Override
@@ -324,12 +325,12 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+    List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
     for (Delete delete : deletes) {
-      Delete txDelete = transactionalizeAction(delete);
+      Put txDelete = transactionalizeAction(delete);
       transactionalizedDeletes.add(txDelete);
     }
-    hTable.delete(transactionalizedDeletes);
+    hTable.put(transactionalizedDeletes);
   }
 
   @Override
@@ -541,11 +542,11 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     return txPut;
   }
 
-  private Delete transactionalizeAction(Delete delete) throws IOException {
+  private Put transactionalizeAction(Delete delete) throws IOException {
     long transactionTimestamp = tx.getWritePointer();
 
     byte[] deleteRow = delete.getRow();
-    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
 
     Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
     if (familyToDelete.isEmpty()) {
@@ -556,6 +557,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
         // Therefore get all the column families of the hTable from the HTableDescriptor
and add them to the changeSet
         for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies())
{
           // no need to identify individual columns deleted
+          deleteMarkers.add(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER,
transactionTimestamp,
+            HConstants.EMPTY_BYTE_ARRAY);
           addToChangeSet(deleteRow, columnDescriptor.getName(), null);
         }
       } else {
@@ -565,7 +568,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
         for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet())
{
           NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
           for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-            txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+            deleteMarkers.add(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
           }
         }
@@ -583,31 +587,34 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
           if (conflictLevel == TxConstants.ConflictDetection.ROW ||
               conflictLevel == TxConstants.ConflictDetection.NONE) {
             // no need to identify individual columns deleted
-            txDelete.deleteFamily(family);
+            deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, family, null);
           } else {
             Result result = get(new Get(delete.getRow()).addFamily(family));
             // Delete entire family
             NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
             for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-              txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
+              deleteMarkers.add(family, column.getKey(), transactionTimestamp,
+                HConstants.EMPTY_BYTE_ARRAY);
               addToChangeSet(deleteRow, family, column.getKey());
             }
           }
         } else {
           for (Cell value : entries) {
-            txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
+            deleteMarkers.add(value.getFamily(), value.getQualifier(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
           }
         }
       }
     }
     for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
-        txDelete.setAttribute(entry.getKey(), entry.getValue());
+      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
     }
-    txDelete.setDurability(delete.getDurability());
-    addToOperation(txDelete, tx);
-    return txDelete;
+    deleteMarkers.setDurability(delete.getDurability());
+    addToOperation(deleteMarkers, tx);
+    return deleteMarkers;
   }
 
   private List<? extends Row> transactionalizeActions(List<? extends Row> actions)
throws IOException {
diff --git a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index e3ef374..981143e 100644
--- a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
@@ -317,7 +318,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    hTable.delete(transactionalizeAction(delete));
+    hTable.put(transactionalizeAction(delete));
   }
 
   @Override
@@ -325,12 +326,12 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+    List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
     for (Delete delete : deletes) {
-      Delete txDelete = transactionalizeAction(delete);
+      Put txDelete = transactionalizeAction(delete);
       transactionalizedDeletes.add(txDelete);
     }
-    hTable.delete(transactionalizedDeletes);
+    hTable.put(transactionalizedDeletes);
   }
 
   @Override
@@ -564,11 +565,11 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     return txPut;
   }
 
-  private Delete transactionalizeAction(Delete delete) throws IOException {
+  private Put transactionalizeAction(Delete delete) throws IOException {
     long transactionTimestamp = tx.getWritePointer();
 
     byte[] deleteRow = delete.getRow();
-    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
 
     Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
     if (familyToDelete.isEmpty()) {
@@ -579,6 +580,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
         // Therefore get all the column families of the hTable from the HTableDescriptor
and add them to the changeSet
         for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies())
{
           // no need to identify individual columns deleted
+          deleteMarkers.add(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER,
transactionTimestamp,
+            HConstants.EMPTY_BYTE_ARRAY);
           addToChangeSet(deleteRow, columnDescriptor.getName(), null);
         }
       } else {
@@ -588,7 +591,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
         for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet())
{
           NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
           for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-            txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+            deleteMarkers.add(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
           }
         }
@@ -606,31 +610,34 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
           if (conflictLevel == TxConstants.ConflictDetection.ROW ||
               conflictLevel == TxConstants.ConflictDetection.NONE) {
             // no need to identify individual columns deleted
-            txDelete.deleteFamily(family);
+            deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, family, null);
           } else {
             Result result = get(new Get(delete.getRow()).addFamily(family));
             // Delete entire family
             NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
             for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-              txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
+              deleteMarkers.add(family, column.getKey(), transactionTimestamp,
+                HConstants.EMPTY_BYTE_ARRAY);
               addToChangeSet(deleteRow, family, column.getKey());
             }
           }
         } else {
           for (Cell value : entries) {
-            txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
+            deleteMarkers.add(value.getFamily(), value.getQualifier(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
           }
         }
       }
     }
     for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
-        txDelete.setAttribute(entry.getKey(), entry.getValue());
+      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
     }
-    txDelete.setDurability(delete.getDurability());
-    addToOperation(txDelete, tx);
-    return txDelete;
+    deleteMarkers.setDurability(delete.getDurability());
+    addToOperation(deleteMarkers, tx);
+    return deleteMarkers;
   }
 
   private List<? extends Row> transactionalizeActions(List<? extends Row> actions)
throws IOException {
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index b35c8aa..4a99378 100644
--- a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
@@ -293,7 +294,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    hTable.delete(transactionalizeAction(delete));
+    hTable.put(transactionalizeAction(delete));
   }
 
   @Override
@@ -301,12 +302,12 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
-    List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+    List<Put> transactionalizedDeletes = new ArrayList<>(deletes.size());
     for (Delete delete : deletes) {
-      Delete txDelete = transactionalizeAction(delete);
+      Put txDelete = transactionalizeAction(delete);
       transactionalizedDeletes.add(txDelete);
     }
-    hTable.delete(transactionalizedDeletes);
+    hTable.put(transactionalizedDeletes);
   }
 
   @Override
@@ -631,11 +632,11 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     return txPut;
   }
 
-  private Delete transactionalizeAction(Delete delete) throws IOException {
+  private Put transactionalizeAction(Delete delete) throws IOException {
     long transactionTimestamp = tx.getWritePointer();
 
     byte[] deleteRow = delete.getRow();
-    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
 
     Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
     if (familyToDelete.isEmpty()) {
@@ -648,6 +649,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
         // changeSet
         for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies())
{
           // no need to identify individual columns deleted
+          deleteMarkers.addColumn(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER,
transactionTimestamp,
+            HConstants.EMPTY_BYTE_ARRAY);
           addToChangeSet(deleteRow, columnDescriptor.getName(), null);
         }
       } else {
@@ -657,7 +660,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
         for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet())
{
           NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
           for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-            txDelete.addColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+            deleteMarkers.addColumn(familyEntry.getKey(), column.getKey(), transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
           }
         }
@@ -675,32 +679,33 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
           if (conflictLevel == TxConstants.ConflictDetection.ROW
               || conflictLevel == TxConstants.ConflictDetection.NONE) {
             // no need to identify individual columns deleted
-            txDelete.addFamily(family);
+            deleteMarkers.addColumn(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, family, null);
           } else {
             Result result = get(new Get(delete.getRow()).addFamily(family));
             // Delete entire family
             NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
             for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
-              txDelete.addColumns(family, column.getKey(), transactionTimestamp);
+              deleteMarkers.addColumn(family, column.getKey(), transactionTimestamp, HConstants.EMPTY_BYTE_ARRAY);
               addToChangeSet(deleteRow, family, column.getKey());
             }
           }
         } else {
           for (Cell value : entries) {
-            txDelete.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value),
-              transactionTimestamp);
+            deleteMarkers.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value),
transactionTimestamp,
+              HConstants.EMPTY_BYTE_ARRAY);
             addToChangeSet(deleteRow, CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value));
           }
         }
       }
     }
     for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
-      txDelete.setAttribute(entry.getKey(), entry.getValue());
+      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
     }
-    txDelete.setDurability(delete.getDurability());
-    addToOperation(txDelete, tx);
-    return txDelete;
+    deleteMarkers.setDurability(delete.getDurability());
+    addToOperation(deleteMarkers, tx);
+    return deleteMarkers;
   }
 
   private List<? extends Row> transactionalizeActions(List<? extends Row> actions)


Mime
View raw message