http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java new file mode 100644 index 0000000..40e2c37 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.coprocessor; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.Transaction; +import org.apache.tephra.TxConstants; +import org.apache.tephra.util.TxUtils; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Applies filtering of data based on transactional visibility (HBase 1.3 specific version). + * Note: this is intended for server-side use only, as additional properties need to be set on + * any {@code Scan} or {@code Get} operation performed. + */ +public class TransactionVisibilityFilter extends FilterBase { + private final Transaction tx; + // oldest visible timestamp by column family, used to apply TTL when reading + private final Map oldestTsByFamily; + // if false, empty values will be interpreted as deletes + private final boolean allowEmptyValues; + // whether or not we can remove delete markers + // these can only be safely removed when we are traversing all storefiles + private final boolean clearDeletes; + // optional sub-filter to apply to visible cells + private final Filter cellFilter; + // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV + private final ImmutableBytesWritable currentFamily = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY); + + private long currentOldestTs; + + private DeleteTracker deleteTracker = new DeleteTracker(); + + /** + * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions. + * + * @param tx the current transaction to apply. Only data visible to this transaction will be returned. + * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name + * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} + * these will be interpreted as "delete" markers and the column will be filtered out + * @param scanType the type of scan operation being performed + */ + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + ScanType scanType) { + this(tx, ttlByFamily, allowEmptyValues, scanType, null); + } + + /** + * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions. + * + * @param tx the current transaction to apply. Only data visible to this transaction will be returned. + * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name + * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} + * these will be interpreted as "delete" markers and the column will be filtered out + * @param scanType the type of scan operation being performed + * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by + * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then + * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. + */ + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { + this.tx = tx; + this.oldestTsByFamily = Maps.newTreeMap(); + for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { + long familyTTL = ttlEntry.getValue(); + oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + } + this.allowEmptyValues = allowEmptyValues; + this.clearDeletes = + scanType == ScanType.COMPACT_DROP_DELETES || + (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL); + this.cellFilter = cellFilter; + } + + @Override + public ReturnCode filterKeyValue(Cell cell) throws IOException { + if (!CellUtil.matchingFamily(cell, currentFamily.get(), currentFamily.getOffset(), currentFamily.getLength())) { + // column family changed + currentFamily.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); + Long familyOldestTs = oldestTsByFamily.get(currentFamily); + currentOldestTs = familyOldestTs != null ? familyOldestTs : 0; + deleteTracker.reset(); + } + // need to apply TTL for the column family here + long kvTimestamp = cell.getTimestamp(); + if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) { + // passed TTL for this column, seek to next + return ReturnCode.NEXT_COL; + } else if (tx.isVisible(kvTimestamp)) { + // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL + if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) { + // cell is visible + // visibility SNAPSHOT_ALL needs all matches + return runSubFilter(ReturnCode.INCLUDE, cell); + } + if (DeleteTracker.isFamilyDelete(cell)) { + deleteTracker.addFamilyDelete(cell); + if (clearDeletes) { + return ReturnCode.NEXT_COL; + } else { + // cell is visible + // as soon as we find a KV to include we can move to the next column + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } + } + // check if masked by family delete + if (deleteTracker.isDeleted(cell)) { + return ReturnCode.NEXT_COL; + } + // check for column delete + if (isColumnDelete(cell)) { + if (clearDeletes) { + // skip "deleted" cell + return ReturnCode.NEXT_COL; + } else { + // keep the marker but skip any remaining versions + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } + } + // cell is visible + // as soon as we find a KV to include we can move to the next column + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } else { + return ReturnCode.SKIP; + } + } + + private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException { + if (cellFilter != null) { + ReturnCode subFilterCode = cellFilter.filterKeyValue(cell); + return determineReturnCode(txFilterCode, subFilterCode); + } + return txFilterCode; + } + + /** + * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code. + * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's + * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the + * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden. + * + * @param txFilterCode return code from TransactionVisibilityFilter + * @param subFilterCode return code from sub-filter + * @return final return code + */ + protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) { + // Return the more restrictive of the two filter responses + switch (subFilterCode) { + case INCLUDE: + return txFilterCode; + case INCLUDE_AND_NEXT_COL: + return ReturnCode.INCLUDE_AND_NEXT_COL; + case SKIP: + return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL; + default: + return subFilterCode; + } + } + + @Override + public boolean filterRow() throws IOException { + if (cellFilter != null) { + return cellFilter.filterRow(); + } + return super.filterRow(); + } + + @Override + public Cell transformCell(Cell cell) throws IOException { + // Convert Tephra deletes back into HBase deletes + if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) { + if (DeleteTracker.isFamilyDelete(cell)) { + return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(), + KeyValue.Type.DeleteFamily); + } else if (isColumnDelete(cell)) { + // Note: in some cases KeyValue.Type.Delete is used in Delete object, + // and in some other cases KeyValue.Type.DeleteColumn is used. + // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn. + // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will + // work in both cases. + return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + cell.getTimestamp(), KeyValue.Type.DeleteColumn); + } + } + return cell; + } + + @Override + public void reset() throws IOException { + deleteTracker.reset(); + if (cellFilter != null) { + cellFilter.reset(); + } + } + + @Override + public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { + if (cellFilter != null) { + return cellFilter.filterRowKey(buffer, offset, length); + } + return super.filterRowKey(buffer, offset, length); + } + + @Override + public boolean filterAllRemaining() throws IOException { + if (cellFilter != null) { + return cellFilter.filterAllRemaining(); + } + return super.filterAllRemaining(); + } + + @Override + public void filterRowCells(List kvs) throws IOException { + if (cellFilter != null) { + cellFilter.filterRowCells(kvs); + } else { + super.filterRowCells(kvs); + } + } + + @Override + public boolean hasFilterRow() { + if (cellFilter != null) { + return cellFilter.hasFilterRow(); + } + return super.hasFilterRow(); + } + + @SuppressWarnings("deprecation") + @Override + public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { + if (cellFilter != null) { + return cellFilter.getNextKeyHint(currentKV); + } + return super.getNextKeyHint(currentKV); + } + + @Override + public Cell getNextCellHint(Cell currentKV) throws IOException { + if (cellFilter != null) { + return cellFilter.getNextCellHint(currentKV); + } + return super.getNextCellHint(currentKV); + } + + @Override + public boolean isFamilyEssential(byte[] name) throws IOException { + if (cellFilter != null) { + return cellFilter.isFamilyEssential(name); + } + return super.isFamilyEssential(name); + } + + private boolean isColumnDelete(Cell cell) { + return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues; + } + + private static final class DeleteTracker { + private long familyDeleteTs; + private byte[] rowKey; + + public static boolean isFamilyDelete(Cell cell) { + return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && + CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) && + CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); + } + + public void addFamilyDelete(Cell delete) { + this.familyDeleteTs = delete.getTimestamp(); + this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength()); + } + + public boolean isDeleted(Cell cell) { + return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs; + } + + public void reset() { + this.familyDeleteTs = 0; + this.rowKey = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java new file mode 100644 index 0000000..9b856d9 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.tephra.Transaction; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.util.TxUtils; + +import java.io.IOException; +import javax.annotation.Nullable; + +/** + * Record compaction state for invalid list pruning + */ +public class CompactionState { + private static final Log LOG = LogFactory.getLog(CompactionState.class); + + private final byte[] regionName; + private final String regionNameAsString; + private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier; + private final PruneUpperBoundWriter pruneUpperBoundWriter; + + private volatile long pruneUpperBound = -1; + + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) { + this.regionName = env.getRegionInfo().getRegionName(); + this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); + DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return env.getTable(stateTable); + } + }); + this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState, + pruneFlushInterval); + this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get(); + } + + /** + * Records the transaction state used for a compaction. This method is called when the compaction starts. + * + * @param request {@link CompactionRequest} for the compaction + * @param snapshot transaction state that will be used for the compaction + */ + public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) { + if (request.isMajor() && snapshot != null) { + Transaction tx = TxUtils.createDummyTransaction(snapshot); + pruneUpperBound = TxUtils.getPruneUpperBound(tx); + if (LOG.isDebugEnabled()) { + LOG.debug( + String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s", + pruneUpperBound, request, snapshot.getTimestamp())); + } + } else { + pruneUpperBound = -1; + } + } + + /** + * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}. + * This method is called after the compaction has successfully completed. + */ + public void persist() { + if (pruneUpperBound != -1) { + pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); + } + } + } + + /** + * Persist that the given region is empty at the given time + * @param time time in milliseconds + */ + public void persistRegionEmpty(long time) { + pruneUpperBoundWriter.persistRegionEmpty(regionName, time); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time)); + } + } + + /** + * Releases the usage {@link PruneUpperBoundWriter}. + */ + public void stop() { + pruneUpperBoundWriterSupplier.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java new file mode 100644 index 0000000..db59d7d --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.RegionPruneInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Persist data janitor state into an HBase table. + * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin} + * to persist and read the compaction state. + */ +@SuppressWarnings("WeakerAccess") +public class DataJanitorState { + private static final Log LOG = LogFactory.getLog(DataJanitorState.class); + + public static final byte[] FAMILY = {'f'}; + public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'}; + + private static final byte[] REGION_TIME_COL = {'r'}; + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'}; + private static final byte[] EMPTY_REGION_TIME_COL = {'e'}; + + private static final byte[] REGION_KEY_PREFIX = {0x1}; + private static final byte[] REGION_KEY_PREFIX_STOP = {0x2}; + + private static final byte[] REGION_TIME_KEY_PREFIX = {0x2}; + private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3}; + + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3}; + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4}; + + private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4}; + private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5}; + + private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5}; + private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6}; + + private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + // This value can be used when we don't care about the value we write in a column + private static final byte[] COL_VAL = Bytes.toBytes('1'); + + private final TableSupplier stateTableSupplier; + + + public DataJanitorState(TableSupplier stateTableSupplier) { + this.stateTableSupplier = stateTableSupplier; + } + + // ---------------------------------------------------------------- + // ------- Methods for prune upper bound for a given region ------- + // ---------------------------------------------------------------- + // The data is stored in the following format - + // Key: 0x1 + // Col 'u': + // ---------------------------------------------------------------- + + /** + * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor} + * after major compaction. + * + * @param regionId region id + * @param pruneUpperBound the latest prune upper bound for the region + * @throws IOException when not able to persist the data to HBase + */ + public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Put put = new Put(makeRegionKey(regionId)); + put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound)); + stateTable.put(put); + } + } + + /** + * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no + * longer has writes in this region. + * + * @param regionId region id + * @return latest prune upper bound for the region + * @throws IOException when not able to read the data from HBase + */ + public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { + RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId); + return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound(); + } + + /** + * Get the latest {@link RegionPruneInfo} for a given region. + * + * @param regionId region id + * @return {@link RegionPruneInfo} for the region + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Get get = new Get(makeRegionKey(regionId)); + get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell == null) { + return null; + } + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId), + Bytes.toLong(pruneUpperBoundBytes), timestamp); + } + } + + /** + * Get latest prune upper bounds for given regions. This is a batch operation of method + * {@link #getPruneUpperBoundForRegion(byte[])} + * + * @param regions a set of regions + * @return a map containing region id and its latest prune upper bound value + * @throws IOException when not able to read the data from HBase + */ + public Map getPruneUpperBoundForRegions(SortedSet regions) throws IOException { + Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List regionPruneInfos = getPruneInfoForRegions(regions); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound()); + } + return Collections.unmodifiableMap(resultMap); + } + + /** + * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null. + * + * @param regions a set of regions + * @return list of {@link RegionPruneInfo}s. + * @throws IOException when not able to read the data from HBase + */ + public List getPruneInfoForRegions(@Nullable SortedSet regions) throws IOException { + List regionPruneInfos = new ArrayList<>(); + try (Table stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (regions == null || regions.contains(region)) { + Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell != null) { + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region), + Bytes.toLong(pruneUpperBoundBytes), timestamp)); + } + } + } + } + } + return Collections.unmodifiableList(regionPruneInfos); + } + + /** + * Delete prune upper bounds for the regions that are not in the given exclude set, and the + * prune upper bound is less than the given value. + * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have + * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are + * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions. + * + * @param deletionPruneUpperBound prune upper bound below which regions will be deleted + * @param excludeRegions set of regions that should not be deleted + * @throws IOException when not able to delete data in HBase + */ + public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet excludeRegions) + throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (!excludeRegions.contains(region)) { + byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + if (timeBytes != null) { + long pruneUpperBoundRegion = Bytes.toLong(timeBytes); + if (pruneUpperBoundRegion < deletionPruneUpperBound) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + } + } + } + + // --------------------------------------------------- + // ------- Methods for regions at a given time ------- + // --------------------------------------------------- + // Key: 0x2 + // Col 't': + // --------------------------------------------------- + + /** + * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of + * transactional regions existing in the HBase instance periodically. + * + * @param time timestamp in milliseconds + * @param regions set of regions at the time + * @throws IOException when not able to persist the data to HBase + */ + public void saveRegionsForTime(long time, Set regions) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + for (byte[] region : regions) { + Put put = new Put(makeTimeRegionKey(timeBytes, region)); + put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL); + stateTable.put(put); + } + + // Save the count of regions as a checksum + saveRegionCountForTime(stateTable, timeBytes, regions.size()); + } + } + + @VisibleForTesting + void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException { + Put put = new Put(makeTimeRegionCountKey(timeBytes)); + put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count)); + stateTable.put(put); + } + + /** + * Return the set of regions saved for the time at or before the given time. This method finds the greatest time + * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are + * older than that. + * + * @param time timestamp in milliseconds + * @return set of regions and time at which they were recorded, or null if no regions found + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + TimeRegions timeRegions; + while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) { + int count = getRegionCountForTime(stateTable, timeRegions.getTime()); + if (count != -1 && count == timeRegions.getRegions().size()) { + return timeRegions; + } else { + LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s", + timeRegions.getTime(), count, timeRegions.getRegions().size())); + time = timeRegions.getTime() - 1; + } + } + return null; + } + } + + @Nullable + private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + + + long currentRegionTime = -1; + SortedSet regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Result next; + try (ResultScanner scanner = stateTable.getScanner(scan)) { + while ((next = scanner.next()) != null) { + Map.Entry timeRegion = getTimeRegion(next.getRow()); + // Stop if reached next time value + if (currentRegionTime == -1) { + currentRegionTime = timeRegion.getKey(); + } else if (timeRegion.getKey() < currentRegionTime) { + break; + } else if (timeRegion.getKey() > currentRegionTime) { + throw new IllegalStateException( + String.format("Got out of order time %d when expecting time less than or equal to %d", + timeRegion.getKey(), currentRegionTime)); + } + regions.add(timeRegion.getValue()); + } + } + return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions)); + } + + @VisibleForTesting + int getRegionCountForTime(Table stateTable, long time) throws IOException { + Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time)))); + get.addColumn(FAMILY, REGION_TIME_COL); + Result result = stateTable.get(get); + byte[] value = result.getValue(FAMILY, REGION_TIME_COL); + return value == null ? -1 : Bytes.toInt(value); + } + + /** + * Delete all the regions that were recorded for all times equal or less than the given time. + * + * @param time timestamp in milliseconds + * @throws IOException when not able to delete data in HBase + */ + public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + // Delete the regions + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + deleteFromScan(stateTable, scan); + + // Delete the count + scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + deleteFromScan(stateTable, scan); + } + } + + // --------------------------------------------------------------------- + // ------- Methods for inactive transaction bound for given time ------- + // --------------------------------------------------------------------- + // Key: 0x3 + // Col 'p': + // --------------------------------------------------------------------- + + /** + * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that + * will not have writes in any HBase regions that are created after the given time. + * + * @param time time in milliseconds + * @param inactiveTransactionBound inactive transaction bound for the given time + * @throws IOException when not able to persist the data to HBase + */ + public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); + put.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound)); + stateTable.put(put); + } + } + + /** + * Return inactive transaction bound for the given time. + * + * @param time time in milliseconds + * @return inactive transaction bound for the given time + * @throws IOException when not able to read the data from HBase + */ + public long getInactiveTransactionBoundForTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); + get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + return result == null ? -1 : Bytes.toLong(result); + } + } + + /** + * Delete all inactive transaction bounds recorded for a time less than the given time + * + * @param time time in milliseconds + * @throws IOException when not able to delete data in HBase + */ + public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))), + INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + deleteFromScan(stateTable, scan); + } + } + + // -------------------------------------------------------- + // ------- Methods for empty regions at a given time ------- + // -------------------------------------------------------- + // Key: 0x4