http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java new file mode 100644 index 0000000..5d7b871 --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.apache.tephra.txprune.hbase.InvalidListPruningDebug; +import org.apache.tephra.txprune.hbase.RegionsAtTime; +import org.apache.tephra.util.TimeMathParser; +import org.apache.tephra.util.TxUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebugTool implements InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebugTool.class); + private static final Gson GSON = new Gson(); + private static final String NOW = "now"; + @VisibleForTesting + static final String DATE_FORMAT = "d-MMM-yyyy HH:mm:ss z"; + + private DataJanitorState dataJanitorState; + private Connection connection; + private TableName tableName; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException when not able to create an HBase connection + */ + @Override + @SuppressWarnings("WeakerAccess") + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + connection = ConnectionFactory.createConnection(conf); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return connection.getTable(tableName); + } + }); + } + + @Override + @SuppressWarnings("WeakerAccess") + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + *

+ * Note that this can return false positives in the following case - + * At time 't' empty regions were recorded, and time 't+1' prune iteration was invoked. + * Since a new set of regions was recorded at time 't+1', all regions recorded as empty before time 't + 1' will + * now be reported as blocking the pruning, even though they are empty. This is because we cannot tell if those + * regions got any new data between time 't' and 't + 1'. + * + * @param numRegions number of regions + * @param time time in milliseconds or relative time, regions recorded before the given time are returned + * @return {@link Set} of regions that needs to be compacted and flushed + */ + @Override + @SuppressWarnings("WeakerAccess") + public Set getRegionsToBeCompacted(Integer numRegions, String time) throws IOException { + // Fetch the live regions at the given time + RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time); + if (timeRegion.getRegions().isEmpty()) { + return Collections.emptySet(); + } + + Long timestamp = timeRegion.getTime(); + SortedSet regions = timeRegion.getRegions(); + + // Get the live regions + SortedSet liveRegions = getRegionsOnOrBeforeTime(NOW).getRegions(); + // Retain only the live regions + regions = Sets.newTreeSet(Sets.intersection(liveRegions, regions)); + + SortedSet emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet emptyRegionNames = new TreeSet<>(); + Iterable regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set nonEmptyRegions = Sets.newHashSet(Sets.difference(regions, emptyRegionNames)); + + // Get all pruned regions for the current time and remove them from the nonEmptyRegions, + // resulting in a set of regions that are not empty and have not been registered prune upper bound + List prunedRegions = dataJanitorState.getPruneInfoForRegions(null); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. + * + * @param numRegions number of regions + * @param time time in milliseconds or relative time, regions recorded before the given time are returned + * @return Map of region name and its prune upper bound + */ + @Override + @SuppressWarnings("WeakerAccess") + public SortedSet getIdleRegions(Integer numRegions, String time) throws IOException { + List regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new TreeSet<>(); + } + + // Create a set with region names + Set pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the latest live regions + RegionsAtTime latestRegions = getRegionsOnOrBeforeTime(NOW); + + // Fetch the regions at the given time + RegionsAtTime timeRegions = getRegionsOnOrBeforeTime(time); + Set liveRegions = Sets.intersection(latestRegions.getRegions(), timeRegions.getRegions()); + Set liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Comparator comparator = new Comparator() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound()); + if (result == 0) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + return result; + } + }; + MinMaxPriorityQueue lowestPrunes = + MinMaxPriorityQueue.orderedBy(comparator).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(new RegionPruneInfoPretty(pruneInfo)); + } + + SortedSet regions = new TreeSet<>(comparator); + regions.addAll(lowestPrunes); + return regions; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Override + @SuppressWarnings("WeakerAccess") + @Nullable + public RegionPruneInfoPretty getRegionPruneInfo(String regionId) throws IOException { + RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + return pruneInfo == null ? null : new RegionPruneInfoPretty(pruneInfo); + } + + /** + * + * @param timeString Given a time, provide the {@link TimeRegions} at or before that time. + * Time can be in milliseconds or relative time. + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + @Override + @SuppressWarnings("WeakerAccess") + public RegionsAtTime getRegionsOnOrBeforeTime(String timeString) throws IOException { + long time = TimeMathParser.parseTime(timeString, TimeUnit.MILLISECONDS); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return new RegionsAtTime(time, new TreeSet(), dateFormat); + } + SortedSet regionNames = new TreeSet<>(); + Iterable regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + return new RegionsAtTime(timeRegions.getTime(), regionNames, dateFormat); + } + + private void printUsage(PrintWriter pw) { + pw.println(); + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning "); + pw.println(); + pw.println("Available commands"); + pw.println("------------------"); + pw.println("to-compact-regions limit [time]"); + pw.println("Desc: Prints out the regions that are active, but not empty, " + + "and have not registered a prune upper bound."); + pw.println(); + pw.println("idle-regions limit [time]"); + pw.println("Desc: Prints out the regions that have the lowest prune upper bounds."); + pw.println(); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints the prune upper bound and the time it was recorded for the given region."); + pw.println(); + pw.println("time-region [time]"); + pw.println("Desc: Prints out the transactional regions present in HBase recorded at or before the given time."); + pw.println(); + pw.println("Parameters"); + pw.println("----------"); + pw.println(" * limit - used to limit the number of regions returned, -1 to apply no limit"); + pw.println(" * time - if time is not provided, the current time is used. "); + pw.println(" When provided, the data recorded on or before the given time is returned."); + pw.println(" Time can be provided in milliseconds, or can be provided as a relative time."); + pw.println(" Examples for relative time -"); + pw.println(" now = current time,"); + pw.println(" now-1d = current time - 1 day,"); + pw.println(" now-1d+4h = 20 hours before now,"); + pw.println(" now+5s = current time + 5 seconds"); + pw.println(); + } + + @VisibleForTesting + boolean execute(String[] args, PrintWriter out) throws IOException { + if (args.length < 1) { + printUsage(out); + return false; + } + + String command = args[0]; + switch (command) { + case "time-region": + if (args.length <= 2) { + String time = args.length == 2 ? args[1] : NOW; + RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time); + out.println(GSON.toJson(timeRegion)); + return true; + } + break; + case "idle-regions": + if (args.length <= 3) { + Integer numRegions = Integer.parseInt(args[1]); + String time = args.length == 3 ? args[2] : NOW; + SortedSet regionPruneInfos = getIdleRegions(numRegions, time); + out.println(GSON.toJson(regionPruneInfos)); + return true; + } + break; + case "prune-info": + if (args.length == 2) { + String regionName = args[1]; + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(regionName); + if (regionPruneInfo != null) { + out.println(GSON.toJson(regionPruneInfo)); + } else { + out.println(String.format("No prune info found for the region %s.", regionName)); + } + return true; + } + break; + case "to-compact-regions": + if (args.length <= 3) { + Integer numRegions = Integer.parseInt(args[1]); + String time = args.length == 3 ? args[2] : NOW; + Set toBeCompactedRegions = getRegionsToBeCompacted(numRegions, time); + out.println(GSON.toJson(toBeCompactedRegions)); + return true; + } + break; + } + + printUsage(out); + return false; + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebugTool pruningDebug = new InvalidListPruningDebugTool(); + try (PrintWriter out = new PrintWriter(System.out)) { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args, out); + pruningDebug.destroy(); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } + + /** + * Wrapper class around {@link RegionPruneInfo} to print human readable dates for timestamps. + */ + @SuppressWarnings({"WeakerAccess", "unused"}) + public static class RegionPruneInfoPretty extends RegionPruneInfo { + private final transient SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + private final String pruneUpperBoundAsString; + private final String pruneRecordTimeAsString; + + public RegionPruneInfoPretty(RegionPruneInfo regionPruneInfo) { + this(regionPruneInfo.getRegionName(), regionPruneInfo.getRegionNameAsString(), + regionPruneInfo.getPruneUpperBound(), regionPruneInfo.getPruneRecordTime()); + } + + public RegionPruneInfoPretty(byte[] regionName, String regionNameAsString, + long pruneUpperBound, long pruneRecordTime) { + super(regionName, regionNameAsString, pruneUpperBound, pruneRecordTime); + pruneUpperBoundAsString = dateFormat.format(TxUtils.getTimestamp(pruneUpperBound)); + pruneRecordTimeAsString = dateFormat.format(pruneRecordTime); + } + + public String getPruneUpperBoundAsString() { + return pruneUpperBoundAsString; + } + + public String getPruneRecordTimeAsString() { + return pruneRecordTimeAsString; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + RegionPruneInfoPretty that = (RegionPruneInfoPretty) o; + return Objects.equals(pruneUpperBoundAsString, that.pruneUpperBoundAsString) && + Objects.equals(pruneRecordTimeAsString, that.pruneRecordTimeAsString); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), pruneUpperBoundAsString, pruneRecordTimeAsString); + } + + @Override + public String toString() { + return "RegionPruneInfoPretty{" + + ", pruneUpperBoundAsString='" + pruneUpperBoundAsString + '\'' + + ", pruneRecordTimeAsString='" + pruneRecordTimeAsString + '\'' + + "} " + super.toString(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java new file mode 100644 index 0000000..1476906 --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.AbstractHBaseTableTest; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.apache.tephra.txprune.hbase.RegionsAtTime; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Test {@link InvalidListPruningDebugTool}. + */ +public class InvalidListPruningDebugTest extends AbstractHBaseTableTest { + private static final Gson GSON = new Gson(); + private static final boolean DEBUG_PRINT = true; + private static final Function PRUNE_INFO_TO_BYTES = + new Function() { + @Override + public byte[] apply(RegionPruneInfo input) { + return input.getRegionName(); + } + }; + private static final Function PRUNE_INFO_TO_STRING = + new Function() { + @Override + public String apply(RegionPruneInfo input) { + return input.getRegionNameAsString(); + } + }; + private static final Function STRING_TO_BYTES = + new Function() { + @Override + public byte[] apply(String input) { + return Bytes.toBytes(input); + } + }; + private static final Type PRUNE_INFO_LIST_TYPE = + new TypeToken>() { }.getType(); + + private static TableName pruneStateTable; + private static InvalidListPruningDebugTool pruningDebug; + + private static TreeMultimap compactedRegions = + TreeMultimap.create(Ordering.natural(), stringComparator()); + private static TreeMultimap emptyRegions = TreeMultimap.create(); + private static TreeMultimap notCompactedRegions = TreeMultimap.create(); + private static TreeMultimap deletedRegions = + TreeMultimap.create(Ordering.natural(), stringComparator()); + + + @BeforeClass + public static void addData() throws Exception { + pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false, + // Prune state table is a non-transactional table, hence no transaction co-processor + Collections.emptyList()); + table.close(); + + DataJanitorState dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + + // Record prune upper bounds for 9 regions + long now = System.currentTimeMillis(); + int maxRegions = 9; + TableName compactedTable = TableName.valueOf("default", "compacted_table"); + TableName emptyTable = TableName.valueOf("default", "empty_table"); + TableName notCompactedTable = TableName.valueOf("default", "not_compacted_table"); + TableName deletedTable = TableName.valueOf("default", "deleted_table"); + for (long i = 0; i < maxRegions; ++i) { + // Compacted region + byte[] compactedRegion = HRegionInfo.createRegionName(compactedTable, null, i, true); + // The first three regions are recorded at one time, second set at another and the third set at a different time + long recordTime = now - 6000 + (i / 3) * 100; + long pruneUpperBound = (now - (i / 3) * 100000) * TxConstants.MAX_TX_PER_MS; + dataJanitorState.savePruneUpperBoundForRegion(compactedRegion, pruneUpperBound); + RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(compactedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo)); + + // Empty region + byte[] emptyRegion = HRegionInfo.createRegionName(emptyTable, null, i, true); + dataJanitorState.saveEmptyRegionForTime(recordTime + 1, emptyRegion); + emptyRegions.put(recordTime, Bytes.toString(emptyRegion)); + + // Not compacted region + byte[] notCompactedRegion = HRegionInfo.createRegionName(notCompactedTable, null, i, true); + notCompactedRegions.put(recordTime, Bytes.toString(notCompactedRegion)); + + // Deleted region + byte[] deletedRegion = HRegionInfo.createRegionName(deletedTable, null, i, true); + dataJanitorState.savePruneUpperBoundForRegion(deletedRegion, pruneUpperBound - 1000); + RegionPruneInfo deletedPruneInfo = dataJanitorState.getPruneInfoForRegion(deletedRegion); + deletedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(deletedPruneInfo)); + } + + // Also record some common regions across all runs + byte[] commonCompactedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_compacted"), null, 100, true); + byte[] commonNotCompactedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_not_compacted"), null, 100, true); + byte[] commonEmptyRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_empty"), null, 100, true); + // Create one region that is the latest deleted region, this region represents a region that gets recorded + // every prune run, but gets deleted just before the latest run. + byte[] newestDeletedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:newest_deleted"), null, 100, true); + + int runs = maxRegions / 3; + for (int i = 0; i < runs; ++i) { + long recordTime = now - 6000 + i * 100; + long pruneUpperBound = (now - i * 100000) * TxConstants.MAX_TX_PER_MS; + + dataJanitorState.savePruneUpperBoundForRegion(commonCompactedRegion, pruneUpperBound - 1000); + RegionPruneInfo c = dataJanitorState.getPruneInfoForRegion(commonCompactedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(c)); + + dataJanitorState.saveEmptyRegionForTime(recordTime + 1, commonEmptyRegion); + emptyRegions.put(recordTime, Bytes.toString(commonEmptyRegion)); + + notCompactedRegions.put(recordTime, Bytes.toString(commonNotCompactedRegion)); + + // Record the latest deleted region in all the runs except the last one + if (i < runs - 1) { + dataJanitorState.savePruneUpperBoundForRegion(newestDeletedRegion, pruneUpperBound - 1000); + RegionPruneInfo d = dataJanitorState.getPruneInfoForRegion(newestDeletedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(d)); + } + } + + // Record the regions present at various times + for (long time : compactedRegions.asMap().keySet()) { + Set allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Iterables.addAll(allRegions, Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_BYTES)); + Iterables.addAll(allRegions, Iterables.transform(emptyRegions.get(time), STRING_TO_BYTES)); + Iterables.addAll(allRegions, Iterables.transform(notCompactedRegions.get(time), STRING_TO_BYTES)); + dataJanitorState.saveRegionsForTime(time, allRegions); + } + } + + @AfterClass + public static void cleanup() throws Exception { + pruningDebug.destroy(); + + hBaseAdmin.disableTable(pruneStateTable); + hBaseAdmin.deleteTable(pruneStateTable); + } + + @Before + public void before() throws Exception { + pruningDebug = new InvalidListPruningDebugTool(); + pruningDebug.initialize(conf); + } + + @After + public void after() throws Exception { + pruningDebug.destroy(); + } + + @Test + public void testUsage() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + Assert.assertFalse(pruningDebug.execute(new String[0], out)); + out.flush(); + readOutputStream(outputStream); + } + } + + @Test + public void testTimeRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the latest regions for latest recorded time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region"}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Get the latest regions for latest recorded time by giving the timestamp + long now = System.currentTimeMillis(); + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(now)}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Using relative time + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", "now-1s"}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Get the regions for the oldest recorded time + Long oldestRecordTime = compactedRegions.asMap().firstKey(); + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(oldestRecordTime)}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(oldestRecordTime)), readOutputStream(outputStream)); + } + } + + @Test + public void testGetPruneInfo() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + Long recordTime = compactedRegions.asMap().lastKey(); + RegionPruneInfo pruneInfo = compactedRegions.get(recordTime).first(); + Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", pruneInfo.getRegionNameAsString()}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo)), + readOutputStream(outputStream)); + + // non-exising region + String nonExistingRegion = "non-existing-region"; + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", nonExistingRegion}, out)); + out.flush(); + Assert.assertEquals(String.format("No prune info found for the region %s.", nonExistingRegion), + readOutputStream(outputStream)); + } + } + + @Test + public void testIdleRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the list of regions that have the lowest prune upper bounds for the latest record time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + SortedSet latestExpected = + ImmutableSortedSet.copyOf(pruneUpperBoundAndStringComparator(), compactedRegions.get(latestRecordTime)); + pruningDebug.execute(new String[]{"idle-regions", "-1"}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with explicit time + outputStream.reset(); + pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime)}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with relative time + outputStream.reset(); + pruningDebug.execute(new String[]{"idle-regions", "-1", "now-2s"}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with reduced number of regions + outputStream.reset(); + int limit = 2; + pruningDebug.execute(new String[]{"idle-regions", String.valueOf(limit), String.valueOf(latestRecordTime)}, out); + out.flush(); + Assert.assertEquals(GSON.toJson(subset(latestExpected, 0, limit)), readOutputStream(outputStream)); + + // For a different time, this time only live regions that are compacted are returned + outputStream.reset(); + Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1); + Set compactedRegionsTime = + Sets.newTreeSet(Iterables.transform(compactedRegions.get(secondLastRecordTime), PRUNE_INFO_TO_STRING)); + Set compactedRegionsLatest = + Sets.newTreeSet(Iterables.transform(compactedRegions.get(latestRecordTime), PRUNE_INFO_TO_STRING)); + Set liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest)); + pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime - 1)}, out); + out.flush(); + List actual = GSON.fromJson(readOutputStream(outputStream), PRUNE_INFO_LIST_TYPE); + Assert.assertEquals(liveExpected, Sets.newTreeSet(Iterables.transform(actual, PRUNE_INFO_TO_STRING))); + } + } + + @Test + public void testToCompactRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the regions that are not compacted for the latest time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + SortedSet expected = notCompactedRegions.get(latestRecordTime); + pruningDebug.execute(new String[]{"to-compact-regions", "-1"}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with explicit time + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(latestRecordTime)}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with relative time + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", "now+1h-3m"}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with reduced number of regions + int limit = 2; + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", String.valueOf(limit), String.valueOf(latestRecordTime)}, + out); + out.flush(); + // Assert that the actual set is a subset of expected, with size 2 (since the output is not sorted) + SortedSet actual = GSON.fromJson(readOutputStream(outputStream), + new TypeToken>() { }.getType()); + Assert.assertEquals(limit, actual.size()); + Assert.assertTrue(Sets.difference(actual, expected).isEmpty()); + + // For a different time, only live regions that are not compacted are returned + outputStream.reset(); + Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1); + Set compactedRegionsTime = notCompactedRegions.get(secondLastRecordTime); + Set compactedRegionsLatest = notCompactedRegions.get(latestRecordTime); + Set liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest)); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(secondLastRecordTime)}, out); + out.flush(); + Assert.assertEquals(GSON.toJson(liveExpected), readOutputStream(outputStream)); + } + } + + private static RegionsAtTime expectedRegionsForTime(long time) { + SortedSet regions = new TreeSet<>(); + regions.addAll(Sets.newTreeSet(Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_STRING))); + regions.addAll(emptyRegions.get(time)); + regions.addAll(notCompactedRegions.get(time)); + return new RegionsAtTime(time, regions, new SimpleDateFormat(InvalidListPruningDebugTool.DATE_FORMAT)); + } + + private static Comparator stringComparator() { + return new Comparator() { + @Override + public int compare(InvalidListPruningDebugTool.RegionPruneInfoPretty o1, + InvalidListPruningDebugTool.RegionPruneInfoPretty o2) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + }; + } + + private static Comparator pruneUpperBoundAndStringComparator() { + return new Comparator() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound()); + if (result == 0) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + return result; + } + }; + } + + private String readOutputStream(ByteArrayOutputStream out) throws UnsupportedEncodingException { + String s = out.toString(Charsets.UTF_8.toString()); + if (DEBUG_PRINT) { + System.out.println(s); + } + // remove the last newline + return s.length() <= 1 ? "" : s.substring(0, s.length() - 1); + } + + private void assertEquals(Collection expectedSorted, String actualString) { + List actual = GSON.fromJson(actualString, PRUNE_INFO_LIST_TYPE); + List actualSorted = new ArrayList<>(actual); + Collections.sort(actualSorted, pruneUpperBoundAndStringComparator()); + + Assert.assertEquals(GSON.toJson(expectedSorted), GSON.toJson(actualSorted)); + } + + @SuppressWarnings("SameParameterValue") + private SortedSet subset(SortedSet set, int from, int to) { + SortedSet subset = new TreeSet<>(set.comparator()); + int i = from; + for (T e : set) { + if (i++ >= to) { + break; + } + subset.add(e); + } + return subset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java deleted file mode 100644 index 443c998..0000000 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tephra.hbase.txprune; - -import com.google.common.collect.Iterables; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.tephra.TxConstants; -import org.apache.tephra.txprune.RegionPruneInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import javax.annotation.Nullable; - -/** - * Invalid List Pruning Debug Tool. - */ -public class InvalidListPruningDebug { - private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); - private static final Gson GSON = new Gson(); - private DataJanitorState dataJanitorState; - private Connection connection; - private TableName tableName; - - /** - * Initialize the Invalid List Debug Tool. - * @param conf {@link Configuration} - * @throws IOException - */ - public void initialize(final Configuration conf) throws IOException { - LOG.debug("InvalidListPruningDebugMain : initialize method called"); - connection = ConnectionFactory.createConnection(conf); - tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); - dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return connection.getTable(tableName); - } - }); - } - - public void destroy() throws IOException { - if (connection != null) { - connection.close(); - } - } - - /** - * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions - * will stop the progress of pruning. - * - * @param numRegions number of regions - * @return {@link Set} of regions that needs to be compacted and flushed - */ - public Set getRegionsToBeCompacted(Integer numRegions) throws IOException { - // Fetch the live regions - Map> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); - if (latestTimeRegion.isEmpty()) { - return new HashSet<>(); - } - - Long timestamp = latestTimeRegion.keySet().iterator().next(); - SortedSet liveRegions = latestTimeRegion.get(timestamp); - - SortedSet emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); - SortedSet emptyRegionNames = new TreeSet<>(); - Iterable regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); - for (String regionString : regionStrings) { - emptyRegionNames.add(regionString); - } - - Set nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); - - // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are - // not empty and have not been registered prune upper bound - Queue prunedRegions = getIdleRegions(-1); - for (RegionPruneInfo prunedRegion : prunedRegions) { - if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { - nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); - } - } - - if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { - return nonEmptyRegions; - } - - Set subsetRegions = new HashSet<>(numRegions); - for (String regionName : nonEmptyRegions) { - if (subsetRegions.size() == numRegions) { - break; - } - subsetRegions.add(regionName); - } - return subsetRegions; - } - - /** - * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. - * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions - * that are known to be live will be returned. - * - * @param numRegions number of regions - * @return Map of region name and its prune upper bound - */ - public Queue getIdleRegions(Integer numRegions) throws IOException { - List regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); - if (regionPruneInfos.isEmpty()) { - return new LinkedList<>(); - } - - // Create a set with region names - Set pruneRegionNameSet = new HashSet<>(); - for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { - pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); - } - - // Fetch the live regions - Map> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); - if (!latestTimeRegion.isEmpty()) { - SortedSet liveRegions = latestTimeRegion.values().iterator().next(); - Set liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); - List liveRegionWithPruneInfoList = new ArrayList<>(); - for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { - if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { - liveRegionWithPruneInfoList.add(regionPruneInfo); - } - } - - // Use the subset of live regions and prune regions - regionPruneInfos = liveRegionWithPruneInfoList; - } - - if (numRegions < 0) { - numRegions = regionPruneInfos.size(); - } - - Queue lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator() { - @Override - public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { - return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); - } - }).maximumSize(numRegions).create(); - - for (RegionPruneInfo pruneInfo : regionPruneInfos) { - lowestPrunes.add(pruneInfo); - } - return lowestPrunes; - } - - /** - * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, - * it will return a null. - * - * @param regionId region id - * @return {@link RegionPruneInfo} of the region - * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} - */ - @Nullable - public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { - return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); - } - - /** - * - * @param time Given a time, provide the {@link TimeRegions} at or before that time - * @return transactional regions that are present at or before the given time - * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} - */ - public Map> getRegionsOnOrBeforeTime(Long time) throws IOException { - Map> regionMap = new HashMap<>(); - TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); - if (timeRegions == null) { - return regionMap; - } - SortedSet regionNames = new TreeSet<>(); - Iterable regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); - for (String regionString : regionStrings) { - regionNames.add(regionString); - } - regionMap.put(timeRegions.getTime(), regionNames); - return regionMap; - } - - private void printUsage(PrintWriter pw) { - pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning "); - pw.println("Available commands, corresponding parameters are:"); - pw.println("****************************************************"); - pw.println("time-region ts"); - pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + - "or the latest time before time 'ts'."); - pw.println("idle-regions limit"); - pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + - "provided as the limit, prune upper bounds of all regions are returned."); - pw.println("prune-info region-name-as-string"); - pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); - pw.println("to-compact-regions limit"); - pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + - "and have not registered a prune upper bound."); - } - - private boolean execute(String[] args) throws IOException { - try (PrintWriter pw = new PrintWriter(System.out)) { - if (args.length != 2) { - printUsage(pw); - return false; - } - - String command = args[0]; - String parameter = args[1]; - if ("time-region".equals(command)) { - Long time = Long.parseLong(parameter); - Map> timeRegion = getRegionsOnOrBeforeTime(time); - pw.println(GSON.toJson(timeRegion)); - return true; - } else if ("idle-regions".equals(command)) { - Integer numRegions = Integer.parseInt(parameter); - Queue regionPruneInfos = getIdleRegions(numRegions); - pw.println(GSON.toJson(regionPruneInfos)); - return true; - } else if ("prune-info".equals(command)) { - RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); - if (regionPruneInfo != null) { - pw.println(GSON.toJson(regionPruneInfo)); - } else { - pw.println(String.format("No prune info found for the region %s.", parameter)); - } - return true; - } else if ("to-compact-regions".equals(command)) { - Integer numRegions = Integer.parseInt(parameter); - Set toBeCompactedRegions = getRegionsToBeCompacted(numRegions); - pw.println(GSON.toJson(toBeCompactedRegions)); - return true; - } else { - pw.println(String.format("%s is not a valid command.", command)); - printUsage(pw); - return false; - } - } - } - - public static void main(String[] args) { - Configuration hConf = HBaseConfiguration.create(); - InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); - try { - pruningDebug.initialize(hConf); - boolean success = pruningDebug.execute(args); - pruningDebug.destroy(); - if (!success) { - System.exit(1); - } - } catch (IOException ex) { - LOG.error("Received an exception while trying to execute the debug tool. ", ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java new file mode 100644 index 0000000..5d7b871 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.apache.tephra.txprune.hbase.InvalidListPruningDebug; +import org.apache.tephra.txprune.hbase.RegionsAtTime; +import org.apache.tephra.util.TimeMathParser; +import org.apache.tephra.util.TxUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebugTool implements InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebugTool.class); + private static final Gson GSON = new Gson(); + private static final String NOW = "now"; + @VisibleForTesting + static final String DATE_FORMAT = "d-MMM-yyyy HH:mm:ss z"; + + private DataJanitorState dataJanitorState; + private Connection connection; + private TableName tableName; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException when not able to create an HBase connection + */ + @Override + @SuppressWarnings("WeakerAccess") + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + connection = ConnectionFactory.createConnection(conf); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return connection.getTable(tableName); + } + }); + } + + @Override + @SuppressWarnings("WeakerAccess") + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + *

+ * Note that this can return false positives in the following case - + * At time 't' empty regions were recorded, and time 't+1' prune iteration was invoked. + * Since a new set of regions was recorded at time 't+1', all regions recorded as empty before time 't + 1' will + * now be reported as blocking the pruning, even though they are empty. This is because we cannot tell if those + * regions got any new data between time 't' and 't + 1'. + * + * @param numRegions number of regions + * @param time time in milliseconds or relative time, regions recorded before the given time are returned + * @return {@link Set} of regions that needs to be compacted and flushed + */ + @Override + @SuppressWarnings("WeakerAccess") + public Set getRegionsToBeCompacted(Integer numRegions, String time) throws IOException { + // Fetch the live regions at the given time + RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time); + if (timeRegion.getRegions().isEmpty()) { + return Collections.emptySet(); + } + + Long timestamp = timeRegion.getTime(); + SortedSet regions = timeRegion.getRegions(); + + // Get the live regions + SortedSet liveRegions = getRegionsOnOrBeforeTime(NOW).getRegions(); + // Retain only the live regions + regions = Sets.newTreeSet(Sets.intersection(liveRegions, regions)); + + SortedSet emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet emptyRegionNames = new TreeSet<>(); + Iterable regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set nonEmptyRegions = Sets.newHashSet(Sets.difference(regions, emptyRegionNames)); + + // Get all pruned regions for the current time and remove them from the nonEmptyRegions, + // resulting in a set of regions that are not empty and have not been registered prune upper bound + List prunedRegions = dataJanitorState.getPruneInfoForRegions(null); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. + * + * @param numRegions number of regions + * @param time time in milliseconds or relative time, regions recorded before the given time are returned + * @return Map of region name and its prune upper bound + */ + @Override + @SuppressWarnings("WeakerAccess") + public SortedSet getIdleRegions(Integer numRegions, String time) throws IOException { + List regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new TreeSet<>(); + } + + // Create a set with region names + Set pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the latest live regions + RegionsAtTime latestRegions = getRegionsOnOrBeforeTime(NOW); + + // Fetch the regions at the given time + RegionsAtTime timeRegions = getRegionsOnOrBeforeTime(time); + Set liveRegions = Sets.intersection(latestRegions.getRegions(), timeRegions.getRegions()); + Set liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Comparator comparator = new Comparator() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound()); + if (result == 0) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + return result; + } + }; + MinMaxPriorityQueue lowestPrunes = + MinMaxPriorityQueue.orderedBy(comparator).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(new RegionPruneInfoPretty(pruneInfo)); + } + + SortedSet regions = new TreeSet<>(comparator); + regions.addAll(lowestPrunes); + return regions; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Override + @SuppressWarnings("WeakerAccess") + @Nullable + public RegionPruneInfoPretty getRegionPruneInfo(String regionId) throws IOException { + RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + return pruneInfo == null ? null : new RegionPruneInfoPretty(pruneInfo); + } + + /** + * + * @param timeString Given a time, provide the {@link TimeRegions} at or before that time. + * Time can be in milliseconds or relative time. + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + @Override + @SuppressWarnings("WeakerAccess") + public RegionsAtTime getRegionsOnOrBeforeTime(String timeString) throws IOException { + long time = TimeMathParser.parseTime(timeString, TimeUnit.MILLISECONDS); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return new RegionsAtTime(time, new TreeSet(), dateFormat); + } + SortedSet regionNames = new TreeSet<>(); + Iterable regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + return new RegionsAtTime(timeRegions.getTime(), regionNames, dateFormat); + } + + private void printUsage(PrintWriter pw) { + pw.println(); + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning "); + pw.println(); + pw.println("Available commands"); + pw.println("------------------"); + pw.println("to-compact-regions limit [time]"); + pw.println("Desc: Prints out the regions that are active, but not empty, " + + "and have not registered a prune upper bound."); + pw.println(); + pw.println("idle-regions limit [time]"); + pw.println("Desc: Prints out the regions that have the lowest prune upper bounds."); + pw.println(); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints the prune upper bound and the time it was recorded for the given region."); + pw.println(); + pw.println("time-region [time]"); + pw.println("Desc: Prints out the transactional regions present in HBase recorded at or before the given time."); + pw.println(); + pw.println("Parameters"); + pw.println("----------"); + pw.println(" * limit - used to limit the number of regions returned, -1 to apply no limit"); + pw.println(" * time - if time is not provided, the current time is used. "); + pw.println(" When provided, the data recorded on or before the given time is returned."); + pw.println(" Time can be provided in milliseconds, or can be provided as a relative time."); + pw.println(" Examples for relative time -"); + pw.println(" now = current time,"); + pw.println(" now-1d = current time - 1 day,"); + pw.println(" now-1d+4h = 20 hours before now,"); + pw.println(" now+5s = current time + 5 seconds"); + pw.println(); + } + + @VisibleForTesting + boolean execute(String[] args, PrintWriter out) throws IOException { + if (args.length < 1) { + printUsage(out); + return false; + } + + String command = args[0]; + switch (command) { + case "time-region": + if (args.length <= 2) { + String time = args.length == 2 ? args[1] : NOW; + RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time); + out.println(GSON.toJson(timeRegion)); + return true; + } + break; + case "idle-regions": + if (args.length <= 3) { + Integer numRegions = Integer.parseInt(args[1]); + String time = args.length == 3 ? args[2] : NOW; + SortedSet regionPruneInfos = getIdleRegions(numRegions, time); + out.println(GSON.toJson(regionPruneInfos)); + return true; + } + break; + case "prune-info": + if (args.length == 2) { + String regionName = args[1]; + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(regionName); + if (regionPruneInfo != null) { + out.println(GSON.toJson(regionPruneInfo)); + } else { + out.println(String.format("No prune info found for the region %s.", regionName)); + } + return true; + } + break; + case "to-compact-regions": + if (args.length <= 3) { + Integer numRegions = Integer.parseInt(args[1]); + String time = args.length == 3 ? args[2] : NOW; + Set toBeCompactedRegions = getRegionsToBeCompacted(numRegions, time); + out.println(GSON.toJson(toBeCompactedRegions)); + return true; + } + break; + } + + printUsage(out); + return false; + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebugTool pruningDebug = new InvalidListPruningDebugTool(); + try (PrintWriter out = new PrintWriter(System.out)) { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args, out); + pruningDebug.destroy(); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } + + /** + * Wrapper class around {@link RegionPruneInfo} to print human readable dates for timestamps. + */ + @SuppressWarnings({"WeakerAccess", "unused"}) + public static class RegionPruneInfoPretty extends RegionPruneInfo { + private final transient SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + private final String pruneUpperBoundAsString; + private final String pruneRecordTimeAsString; + + public RegionPruneInfoPretty(RegionPruneInfo regionPruneInfo) { + this(regionPruneInfo.getRegionName(), regionPruneInfo.getRegionNameAsString(), + regionPruneInfo.getPruneUpperBound(), regionPruneInfo.getPruneRecordTime()); + } + + public RegionPruneInfoPretty(byte[] regionName, String regionNameAsString, + long pruneUpperBound, long pruneRecordTime) { + super(regionName, regionNameAsString, pruneUpperBound, pruneRecordTime); + pruneUpperBoundAsString = dateFormat.format(TxUtils.getTimestamp(pruneUpperBound)); + pruneRecordTimeAsString = dateFormat.format(pruneRecordTime); + } + + public String getPruneUpperBoundAsString() { + return pruneUpperBoundAsString; + } + + public String getPruneRecordTimeAsString() { + return pruneRecordTimeAsString; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + RegionPruneInfoPretty that = (RegionPruneInfoPretty) o; + return Objects.equals(pruneUpperBoundAsString, that.pruneUpperBoundAsString) && + Objects.equals(pruneRecordTimeAsString, that.pruneRecordTimeAsString); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), pruneUpperBoundAsString, pruneRecordTimeAsString); + } + + @Override + public String toString() { + return "RegionPruneInfoPretty{" + + ", pruneUpperBoundAsString='" + pruneUpperBoundAsString + '\'' + + ", pruneRecordTimeAsString='" + pruneRecordTimeAsString + '\'' + + "} " + super.toString(); + } + } + +}