http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
new file mode 100644
index 0000000..1476906
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.apache.tephra.txprune.RegionPruneInfo;
+import org.apache.tephra.txprune.hbase.RegionsAtTime;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Type;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Test {@link InvalidListPruningDebugTool}.
+ */
+public class InvalidListPruningDebugTest extends AbstractHBaseTableTest {
+ private static final Gson GSON = new Gson();
+ private static final boolean DEBUG_PRINT = true;
+ private static final Function<RegionPruneInfo, byte[]> PRUNE_INFO_TO_BYTES =
+ new Function<RegionPruneInfo, byte[]>() {
+ @Override
+ public byte[] apply(RegionPruneInfo input) {
+ return input.getRegionName();
+ }
+ };
+ private static final Function<RegionPruneInfo, String> PRUNE_INFO_TO_STRING =
+ new Function<RegionPruneInfo, String>() {
+ @Override
+ public String apply(RegionPruneInfo input) {
+ return input.getRegionNameAsString();
+ }
+ };
+ private static final Function<String, byte[]> STRING_TO_BYTES =
+ new Function<String, byte[]>() {
+ @Override
+ public byte[] apply(String input) {
+ return Bytes.toBytes(input);
+ }
+ };
+ private static final Type PRUNE_INFO_LIST_TYPE =
+ new TypeToken<List<InvalidListPruningDebugTool.RegionPruneInfoPretty>>() { }.getType();
+
+ private static TableName pruneStateTable;
+ private static InvalidListPruningDebugTool pruningDebug;
+
+ private static TreeMultimap<Long, InvalidListPruningDebugTool.RegionPruneInfoPretty> compactedRegions =
+ TreeMultimap.create(Ordering.<Long>natural(), stringComparator());
+ private static TreeMultimap<Long, String> emptyRegions = TreeMultimap.create();
+ private static TreeMultimap<Long, String> notCompactedRegions = TreeMultimap.create();
+ private static TreeMultimap<Long, InvalidListPruningDebugTool.RegionPruneInfoPretty> deletedRegions =
+ TreeMultimap.create(Ordering.<Long>natural(), stringComparator());
+
+
+ @BeforeClass
+ public static void addData() throws Exception {
+ pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ // Prune state table is a non-transactional table, hence no transaction co-processor
+ Collections.<String>emptyList());
+ table.close();
+
+ DataJanitorState dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
+ // Record prune upper bounds for 9 regions
+ long now = System.currentTimeMillis();
+ int maxRegions = 9;
+ TableName compactedTable = TableName.valueOf("default", "compacted_table");
+ TableName emptyTable = TableName.valueOf("default", "empty_table");
+ TableName notCompactedTable = TableName.valueOf("default", "not_compacted_table");
+ TableName deletedTable = TableName.valueOf("default", "deleted_table");
+ for (long i = 0; i < maxRegions; ++i) {
+ // Compacted region
+ byte[] compactedRegion = HRegionInfo.createRegionName(compactedTable, null, i, true);
+ // The first three regions are recorded at one time, second set at another and the third set at a different time
+ long recordTime = now - 6000 + (i / 3) * 100;
+ long pruneUpperBound = (now - (i / 3) * 100000) * TxConstants.MAX_TX_PER_MS;
+ dataJanitorState.savePruneUpperBoundForRegion(compactedRegion, pruneUpperBound);
+ RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(compactedRegion);
+ compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo));
+
+ // Empty region
+ byte[] emptyRegion = HRegionInfo.createRegionName(emptyTable, null, i, true);
+ dataJanitorState.saveEmptyRegionForTime(recordTime + 1, emptyRegion);
+ emptyRegions.put(recordTime, Bytes.toString(emptyRegion));
+
+ // Not compacted region
+ byte[] notCompactedRegion = HRegionInfo.createRegionName(notCompactedTable, null, i, true);
+ notCompactedRegions.put(recordTime, Bytes.toString(notCompactedRegion));
+
+ // Deleted region
+ byte[] deletedRegion = HRegionInfo.createRegionName(deletedTable, null, i, true);
+ dataJanitorState.savePruneUpperBoundForRegion(deletedRegion, pruneUpperBound - 1000);
+ RegionPruneInfo deletedPruneInfo = dataJanitorState.getPruneInfoForRegion(deletedRegion);
+ deletedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(deletedPruneInfo));
+ }
+
+ // Also record some common regions across all runs
+ byte[] commonCompactedRegion =
+ HRegionInfo.createRegionName(TableName.valueOf("default:common_compacted"), null, 100, true);
+ byte[] commonNotCompactedRegion =
+ HRegionInfo.createRegionName(TableName.valueOf("default:common_not_compacted"), null, 100, true);
+ byte[] commonEmptyRegion =
+ HRegionInfo.createRegionName(TableName.valueOf("default:common_empty"), null, 100, true);
+ // Create one region that is the latest deleted region, this region represents a region that gets recorded
+ // every prune run, but gets deleted just before the latest run.
+ byte[] newestDeletedRegion =
+ HRegionInfo.createRegionName(TableName.valueOf("default:newest_deleted"), null, 100, true);
+
+ int runs = maxRegions / 3;
+ for (int i = 0; i < runs; ++i) {
+ long recordTime = now - 6000 + i * 100;
+ long pruneUpperBound = (now - i * 100000) * TxConstants.MAX_TX_PER_MS;
+
+ dataJanitorState.savePruneUpperBoundForRegion(commonCompactedRegion, pruneUpperBound - 1000);
+ RegionPruneInfo c = dataJanitorState.getPruneInfoForRegion(commonCompactedRegion);
+ compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(c));
+
+ dataJanitorState.saveEmptyRegionForTime(recordTime + 1, commonEmptyRegion);
+ emptyRegions.put(recordTime, Bytes.toString(commonEmptyRegion));
+
+ notCompactedRegions.put(recordTime, Bytes.toString(commonNotCompactedRegion));
+
+ // Record the latest deleted region in all the runs except the last one
+ if (i < runs - 1) {
+ dataJanitorState.savePruneUpperBoundForRegion(newestDeletedRegion, pruneUpperBound - 1000);
+ RegionPruneInfo d = dataJanitorState.getPruneInfoForRegion(newestDeletedRegion);
+ compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(d));
+ }
+ }
+
+ // Record the regions present at various times
+ for (long time : compactedRegions.asMap().keySet()) {
+ Set<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Iterables.addAll(allRegions, Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_BYTES));
+ Iterables.addAll(allRegions, Iterables.transform(emptyRegions.get(time), STRING_TO_BYTES));
+ Iterables.addAll(allRegions, Iterables.transform(notCompactedRegions.get(time), STRING_TO_BYTES));
+ dataJanitorState.saveRegionsForTime(time, allRegions);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ pruningDebug.destroy();
+
+ hBaseAdmin.disableTable(pruneStateTable);
+ hBaseAdmin.deleteTable(pruneStateTable);
+ }
+
+ @Before
+ public void before() throws Exception {
+ pruningDebug = new InvalidListPruningDebugTool();
+ pruningDebug.initialize(conf);
+ }
+
+ @After
+ public void after() throws Exception {
+ pruningDebug.destroy();
+ }
+
+ @Test
+ public void testUsage() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ Assert.assertFalse(pruningDebug.execute(new String[0], out));
+ out.flush();
+ readOutputStream(outputStream);
+ }
+ }
+
+ @Test
+ public void testTimeRegions() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ // Get the latest regions for latest recorded time
+ Long latestRecordTime = compactedRegions.asMap().lastKey();
+ Assert.assertTrue(pruningDebug.execute(new String[] {"time-region"}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream));
+
+ // Get the latest regions for latest recorded time by giving the timestamp
+ long now = System.currentTimeMillis();
+ outputStream.reset();
+ Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(now)}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream));
+
+ // Using relative time
+ outputStream.reset();
+ Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", "now-1s"}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream));
+
+ // Get the regions for the oldest recorded time
+ Long oldestRecordTime = compactedRegions.asMap().firstKey();
+ outputStream.reset();
+ Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(oldestRecordTime)}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(expectedRegionsForTime(oldestRecordTime)), readOutputStream(outputStream));
+ }
+ }
+
+ @Test
+ public void testGetPruneInfo() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ Long recordTime = compactedRegions.asMap().lastKey();
+ RegionPruneInfo pruneInfo = compactedRegions.get(recordTime).first();
+ Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", pruneInfo.getRegionNameAsString()}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo)),
+ readOutputStream(outputStream));
+
+ // non-exising region
+ String nonExistingRegion = "non-existing-region";
+ outputStream.reset();
+ Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", nonExistingRegion}, out));
+ out.flush();
+ Assert.assertEquals(String.format("No prune info found for the region %s.", nonExistingRegion),
+ readOutputStream(outputStream));
+ }
+ }
+
+ @Test
+ public void testIdleRegions() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ // Get the list of regions that have the lowest prune upper bounds for the latest record time
+ Long latestRecordTime = compactedRegions.asMap().lastKey();
+ SortedSet<InvalidListPruningDebugTool.RegionPruneInfoPretty> latestExpected =
+ ImmutableSortedSet.copyOf(pruneUpperBoundAndStringComparator(), compactedRegions.get(latestRecordTime));
+ pruningDebug.execute(new String[]{"idle-regions", "-1"}, out);
+ out.flush();
+ assertEquals(latestExpected, readOutputStream(outputStream));
+
+ // Same command with explicit time
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime)}, out);
+ out.flush();
+ assertEquals(latestExpected, readOutputStream(outputStream));
+
+ // Same command with relative time
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"idle-regions", "-1", "now-2s"}, out);
+ out.flush();
+ assertEquals(latestExpected, readOutputStream(outputStream));
+
+ // Same command with reduced number of regions
+ outputStream.reset();
+ int limit = 2;
+ pruningDebug.execute(new String[]{"idle-regions", String.valueOf(limit), String.valueOf(latestRecordTime)}, out);
+ out.flush();
+ Assert.assertEquals(GSON.toJson(subset(latestExpected, 0, limit)), readOutputStream(outputStream));
+
+ // For a different time, this time only live regions that are compacted are returned
+ outputStream.reset();
+ Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1);
+ Set<String> compactedRegionsTime =
+ Sets.newTreeSet(Iterables.transform(compactedRegions.get(secondLastRecordTime), PRUNE_INFO_TO_STRING));
+ Set<String> compactedRegionsLatest =
+ Sets.newTreeSet(Iterables.transform(compactedRegions.get(latestRecordTime), PRUNE_INFO_TO_STRING));
+ Set<String> liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest));
+ pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime - 1)}, out);
+ out.flush();
+ List<RegionPruneInfo> actual = GSON.fromJson(readOutputStream(outputStream), PRUNE_INFO_LIST_TYPE);
+ Assert.assertEquals(liveExpected, Sets.newTreeSet(Iterables.transform(actual, PRUNE_INFO_TO_STRING)));
+ }
+ }
+
+ @Test
+ public void testToCompactRegions() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ // Get the regions that are not compacted for the latest time
+ Long latestRecordTime = compactedRegions.asMap().lastKey();
+ SortedSet<String> expected = notCompactedRegions.get(latestRecordTime);
+ pruningDebug.execute(new String[]{"to-compact-regions", "-1"}, out);
+ out.flush();
+ Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class));
+
+ // Same command with explicit time
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(latestRecordTime)}, out);
+ out.flush();
+ Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class));
+
+ // Same command with relative time
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"to-compact-regions", "-1", "now+1h-3m"}, out);
+ out.flush();
+ Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class));
+
+ // Same command with reduced number of regions
+ int limit = 2;
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"to-compact-regions", String.valueOf(limit), String.valueOf(latestRecordTime)},
+ out);
+ out.flush();
+ // Assert that the actual set is a subset of expected, with size 2 (since the output is not sorted)
+ SortedSet<String> actual = GSON.fromJson(readOutputStream(outputStream),
+ new TypeToken<SortedSet<String>>() { }.getType());
+ Assert.assertEquals(limit, actual.size());
+ Assert.assertTrue(Sets.difference(actual, expected).isEmpty());
+
+ // For a different time, only live regions that are not compacted are returned
+ outputStream.reset();
+ Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1);
+ Set<String> compactedRegionsTime = notCompactedRegions.get(secondLastRecordTime);
+ Set<String> compactedRegionsLatest = notCompactedRegions.get(latestRecordTime);
+ Set<String> liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest));
+ pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(secondLastRecordTime)}, out);
+ out.flush();
+ Assert.assertEquals(GSON.toJson(liveExpected), readOutputStream(outputStream));
+ }
+ }
+
+ private static RegionsAtTime expectedRegionsForTime(long time) {
+ SortedSet<String> regions = new TreeSet<>();
+ regions.addAll(Sets.newTreeSet(Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_STRING)));
+ regions.addAll(emptyRegions.get(time));
+ regions.addAll(notCompactedRegions.get(time));
+ return new RegionsAtTime(time, regions, new SimpleDateFormat(InvalidListPruningDebugTool.DATE_FORMAT));
+ }
+
+ private static Comparator<InvalidListPruningDebugTool.RegionPruneInfoPretty> stringComparator() {
+ return new Comparator<InvalidListPruningDebugTool.RegionPruneInfoPretty>() {
+ @Override
+ public int compare(InvalidListPruningDebugTool.RegionPruneInfoPretty o1,
+ InvalidListPruningDebugTool.RegionPruneInfoPretty o2) {
+ return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString());
+ }
+ };
+ }
+
+ private static Comparator<RegionPruneInfo> pruneUpperBoundAndStringComparator() {
+ return new Comparator<RegionPruneInfo>() {
+ @Override
+ public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
+ int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound());
+ if (result == 0) {
+ return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString());
+ }
+ return result;
+ }
+ };
+ }
+
+ private String readOutputStream(ByteArrayOutputStream out) throws UnsupportedEncodingException {
+ String s = out.toString(Charsets.UTF_8.toString());
+ if (DEBUG_PRINT) {
+ System.out.println(s);
+ }
+ // remove the last newline
+ return s.length() <= 1 ? "" : s.substring(0, s.length() - 1);
+ }
+
+ private void assertEquals(Collection<? extends RegionPruneInfo> expectedSorted, String actualString) {
+ List<? extends RegionPruneInfo> actual = GSON.fromJson(actualString, PRUNE_INFO_LIST_TYPE);
+ List<RegionPruneInfo> actualSorted = new ArrayList<>(actual);
+ Collections.sort(actualSorted, pruneUpperBoundAndStringComparator());
+
+ Assert.assertEquals(GSON.toJson(expectedSorted), GSON.toJson(actualSorted));
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ private <T> SortedSet<T> subset(SortedSet<T> set, int from, int to) {
+ SortedSet<T> subset = new TreeSet<>(set.comparator());
+ int i = from;
+ for (T e : set) {
+ if (i++ >= to) {
+ break;
+ }
+ subset.add(e);
+ }
+ return subset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
deleted file mode 100644
index 443c998..0000000
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tephra.hbase.txprune;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MinMaxPriorityQueue;
-import com.google.common.collect.Sets;
-import com.google.gson.Gson;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.txprune.RegionPruneInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-
-/**
- * Invalid List Pruning Debug Tool.
- */
-public class InvalidListPruningDebug {
- private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
- private static final Gson GSON = new Gson();
- private DataJanitorState dataJanitorState;
- private Connection connection;
- private TableName tableName;
-
- /**
- * Initialize the Invalid List Debug Tool.
- * @param conf {@link Configuration}
- * @throws IOException
- */
- public void initialize(final Configuration conf) throws IOException {
- LOG.debug("InvalidListPruningDebugMain : initialize method called");
- connection = ConnectionFactory.createConnection(conf);
- tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
- dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public Table get() throws IOException {
- return connection.getTable(tableName);
- }
- });
- }
-
- public void destroy() throws IOException {
- if (connection != null) {
- connection.close();
- }
- }
-
- /**
- * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
- * will stop the progress of pruning.
- *
- * @param numRegions number of regions
- * @return {@link Set} of regions that needs to be compacted and flushed
- */
- public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
- // Fetch the live regions
- Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
- if (latestTimeRegion.isEmpty()) {
- return new HashSet<>();
- }
-
- Long timestamp = latestTimeRegion.keySet().iterator().next();
- SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
-
- SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
- SortedSet<String> emptyRegionNames = new TreeSet<>();
- Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
- for (String regionString : regionStrings) {
- emptyRegionNames.add(regionString);
- }
-
- Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
-
- // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
- // not empty and have not been registered prune upper bound
- Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
- for (RegionPruneInfo prunedRegion : prunedRegions) {
- if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
- nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
- }
- }
-
- if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
- return nonEmptyRegions;
- }
-
- Set<String> subsetRegions = new HashSet<>(numRegions);
- for (String regionName : nonEmptyRegions) {
- if (subsetRegions.size() == numRegions) {
- break;
- }
- subsetRegions.add(regionName);
- }
- return subsetRegions;
- }
-
- /**
- * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
- * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
- * that are known to be live will be returned.
- *
- * @param numRegions number of regions
- * @return Map of region name and its prune upper bound
- */
- public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException {
- List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null);
- if (regionPruneInfos.isEmpty()) {
- return new LinkedList<>();
- }
-
- // Create a set with region names
- Set<String> pruneRegionNameSet = new HashSet<>();
- for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
- pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
- }
-
- // Fetch the live regions
- Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
- if (!latestTimeRegion.isEmpty()) {
- SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
- Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
- List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
- for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
- if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
- liveRegionWithPruneInfoList.add(regionPruneInfo);
- }
- }
-
- // Use the subset of live regions and prune regions
- regionPruneInfos = liveRegionWithPruneInfoList;
- }
-
- if (numRegions < 0) {
- numRegions = regionPruneInfos.size();
- }
-
- Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
- @Override
- public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
- return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound());
- }
- }).maximumSize(numRegions).create();
-
- for (RegionPruneInfo pruneInfo : regionPruneInfos) {
- lowestPrunes.add(pruneInfo);
- }
- return lowestPrunes;
- }
-
- /**
- * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet,
- * it will return a null.
- *
- * @param regionId region id
- * @return {@link RegionPruneInfo} of the region
- * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo}
- */
- @Nullable
- public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException {
- return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId));
- }
-
- /**
- *
- * @param time Given a time, provide the {@link TimeRegions} at or before that time
- * @return transactional regions that are present at or before the given time
- * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions}
- */
- public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException {
- Map<Long, SortedSet<String>> regionMap = new HashMap<>();
- TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time);
- if (timeRegions == null) {
- return regionMap;
- }
- SortedSet<String> regionNames = new TreeSet<>();
- Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN);
- for (String regionString : regionStrings) {
- regionNames.add(regionString);
- }
- regionMap.put(timeRegions.getTime(), regionNames);
- return regionMap;
- }
-
- private void printUsage(PrintWriter pw) {
- pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>");
- pw.println("Available commands, corresponding parameters are:");
- pw.println("****************************************************");
- pw.println("time-region ts");
- pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " +
- "or the latest time before time 'ts'.");
- pw.println("idle-regions limit");
- pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " +
- "provided as the limit, prune upper bounds of all regions are returned.");
- pw.println("prune-info region-name-as-string");
- pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
- pw.println("to-compact-regions limit");
- pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
- "and have not registered a prune upper bound.");
- }
-
- private boolean execute(String[] args) throws IOException {
- try (PrintWriter pw = new PrintWriter(System.out)) {
- if (args.length != 2) {
- printUsage(pw);
- return false;
- }
-
- String command = args[0];
- String parameter = args[1];
- if ("time-region".equals(command)) {
- Long time = Long.parseLong(parameter);
- Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time);
- pw.println(GSON.toJson(timeRegion));
- return true;
- } else if ("idle-regions".equals(command)) {
- Integer numRegions = Integer.parseInt(parameter);
- Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions);
- pw.println(GSON.toJson(regionPruneInfos));
- return true;
- } else if ("prune-info".equals(command)) {
- RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter);
- if (regionPruneInfo != null) {
- pw.println(GSON.toJson(regionPruneInfo));
- } else {
- pw.println(String.format("No prune info found for the region %s.", parameter));
- }
- return true;
- } else if ("to-compact-regions".equals(command)) {
- Integer numRegions = Integer.parseInt(parameter);
- Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
- pw.println(GSON.toJson(toBeCompactedRegions));
- return true;
- } else {
- pw.println(String.format("%s is not a valid command.", command));
- printUsage(pw);
- return false;
- }
- }
- }
-
- public static void main(String[] args) {
- Configuration hConf = HBaseConfiguration.create();
- InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug();
- try {
- pruningDebug.initialize(hConf);
- boolean success = pruningDebug.execute(args);
- pruningDebug.destroy();
- if (!success) {
- System.exit(1);
- }
- } catch (IOException ex) {
- LOG.error("Received an exception while trying to execute the debug tool. ", ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
new file mode 100644
index 0000000..5d7b871
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.txprune.RegionPruneInfo;
+import org.apache.tephra.txprune.hbase.InvalidListPruningDebug;
+import org.apache.tephra.txprune.hbase.RegionsAtTime;
+import org.apache.tephra.util.TimeMathParser;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+/**
+ * Invalid List Pruning Debug Tool.
+ */
+public class InvalidListPruningDebugTool implements InvalidListPruningDebug {
+ private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebugTool.class);
+ private static final Gson GSON = new Gson();
+ private static final String NOW = "now";
+ @VisibleForTesting
+ static final String DATE_FORMAT = "d-MMM-yyyy HH:mm:ss z";
+
+ private DataJanitorState dataJanitorState;
+ private Connection connection;
+ private TableName tableName;
+
+ /**
+ * Initialize the Invalid List Debug Tool.
+ * @param conf {@link Configuration}
+ * @throws IOException when not able to create an HBase connection
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public void initialize(final Configuration conf) throws IOException {
+ LOG.debug("InvalidListPruningDebugMain : initialize method called");
+ connection = ConnectionFactory.createConnection(conf);
+ tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return connection.getTable(tableName);
+ }
+ });
+ }
+
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public void destroy() throws IOException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+ * will stop the progress of pruning.
+ * <p/>
+ * Note that this can return false positives in the following case -
+ * At time 't' empty regions were recorded, and time 't+1' prune iteration was invoked.
+ * Since a new set of regions was recorded at time 't+1', all regions recorded as empty before time 't + 1' will
+ * now be reported as blocking the pruning, even though they are empty. This is because we cannot tell if those
+ * regions got any new data between time 't' and 't + 1'.
+ *
+ * @param numRegions number of regions
+ * @param time time in milliseconds or relative time, regions recorded before the given time are returned
+ * @return {@link Set} of regions that needs to be compacted and flushed
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public Set<String> getRegionsToBeCompacted(Integer numRegions, String time) throws IOException {
+ // Fetch the live regions at the given time
+ RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time);
+ if (timeRegion.getRegions().isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ Long timestamp = timeRegion.getTime();
+ SortedSet<String> regions = timeRegion.getRegions();
+
+ // Get the live regions
+ SortedSet<String> liveRegions = getRegionsOnOrBeforeTime(NOW).getRegions();
+ // Retain only the live regions
+ regions = Sets.newTreeSet(Sets.intersection(liveRegions, regions));
+
+ SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+ SortedSet<String> emptyRegionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ emptyRegionNames.add(regionString);
+ }
+
+ Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(regions, emptyRegionNames));
+
+ // Get all pruned regions for the current time and remove them from the nonEmptyRegions,
+ // resulting in a set of regions that are not empty and have not been registered prune upper bound
+ List<RegionPruneInfo> prunedRegions = dataJanitorState.getPruneInfoForRegions(null);
+ for (RegionPruneInfo prunedRegion : prunedRegions) {
+ if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+ nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+ }
+ }
+
+ if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+ return nonEmptyRegions;
+ }
+
+ Set<String> subsetRegions = new HashSet<>(numRegions);
+ for (String regionName : nonEmptyRegions) {
+ if (subsetRegions.size() == numRegions) {
+ break;
+ }
+ subsetRegions.add(regionName);
+ }
+ return subsetRegions;
+ }
+
+ /**
+ * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
+ * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+ * that are known to be live will be returned.
+ *
+ * @param numRegions number of regions
+ * @param time time in milliseconds or relative time, regions recorded before the given time are returned
+ * @return Map of region name and its prune upper bound
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public SortedSet<RegionPruneInfoPretty> getIdleRegions(Integer numRegions, String time) throws IOException {
+ List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null);
+ if (regionPruneInfos.isEmpty()) {
+ return new TreeSet<>();
+ }
+
+ // Create a set with region names
+ Set<String> pruneRegionNameSet = new HashSet<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+ }
+
+ // Fetch the latest live regions
+ RegionsAtTime latestRegions = getRegionsOnOrBeforeTime(NOW);
+
+ // Fetch the regions at the given time
+ RegionsAtTime timeRegions = getRegionsOnOrBeforeTime(time);
+ Set<String> liveRegions = Sets.intersection(latestRegions.getRegions(), timeRegions.getRegions());
+ Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+ List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+ liveRegionWithPruneInfoList.add(regionPruneInfo);
+ }
+
+ // Use the subset of live regions and prune regions
+ regionPruneInfos = liveRegionWithPruneInfoList;
+ }
+
+ if (numRegions < 0) {
+ numRegions = regionPruneInfos.size();
+ }
+
+ Comparator<RegionPruneInfo> comparator = new Comparator<RegionPruneInfo>() {
+ @Override
+ public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
+ int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound());
+ if (result == 0) {
+ return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString());
+ }
+ return result;
+ }
+ };
+ MinMaxPriorityQueue<RegionPruneInfoPretty> lowestPrunes =
+ MinMaxPriorityQueue.orderedBy(comparator).maximumSize(numRegions).create();
+
+ for (RegionPruneInfo pruneInfo : regionPruneInfos) {
+ lowestPrunes.add(new RegionPruneInfoPretty(pruneInfo));
+ }
+
+ SortedSet<RegionPruneInfoPretty> regions = new TreeSet<>(comparator);
+ regions.addAll(lowestPrunes);
+ return regions;
+ }
+
+ /**
+ * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet,
+ * it will return a null.
+ *
+ * @param regionId region id
+ * @return {@link RegionPruneInfo} of the region
+ * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo}
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ @Nullable
+ public RegionPruneInfoPretty getRegionPruneInfo(String regionId) throws IOException {
+ RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId));
+ return pruneInfo == null ? null : new RegionPruneInfoPretty(pruneInfo);
+ }
+
+ /**
+ *
+ * @param timeString Given a time, provide the {@link TimeRegions} at or before that time.
+ * Time can be in milliseconds or relative time.
+ * @return transactional regions that are present at or before the given time
+ * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions}
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public RegionsAtTime getRegionsOnOrBeforeTime(String timeString) throws IOException {
+ long time = TimeMathParser.parseTime(timeString, TimeUnit.MILLISECONDS);
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time);
+ if (timeRegions == null) {
+ return new RegionsAtTime(time, new TreeSet<String>(), dateFormat);
+ }
+ SortedSet<String> regionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ regionNames.add(regionString);
+ }
+ return new RegionsAtTime(timeRegions.getTime(), regionNames, dateFormat);
+ }
+
+ private void printUsage(PrintWriter pw) {
+ pw.println();
+ pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameters>");
+ pw.println();
+ pw.println("Available commands");
+ pw.println("------------------");
+ pw.println("to-compact-regions limit [time]");
+ pw.println("Desc: Prints out the regions that are active, but not empty, " +
+ "and have not registered a prune upper bound.");
+ pw.println();
+ pw.println("idle-regions limit [time]");
+ pw.println("Desc: Prints out the regions that have the lowest prune upper bounds.");
+ pw.println();
+ pw.println("prune-info region-name-as-string");
+ pw.println("Desc: Prints the prune upper bound and the time it was recorded for the given region.");
+ pw.println();
+ pw.println("time-region [time]");
+ pw.println("Desc: Prints out the transactional regions present in HBase recorded at or before the given time.");
+ pw.println();
+ pw.println("Parameters");
+ pw.println("----------");
+ pw.println(" * limit - used to limit the number of regions returned, -1 to apply no limit");
+ pw.println(" * time - if time is not provided, the current time is used. ");
+ pw.println(" When provided, the data recorded on or before the given time is returned.");
+ pw.println(" Time can be provided in milliseconds, or can be provided as a relative time.");
+ pw.println(" Examples for relative time -");
+ pw.println(" now = current time,");
+ pw.println(" now-1d = current time - 1 day,");
+ pw.println(" now-1d+4h = 20 hours before now,");
+ pw.println(" now+5s = current time + 5 seconds");
+ pw.println();
+ }
+
+ @VisibleForTesting
+ boolean execute(String[] args, PrintWriter out) throws IOException {
+ if (args.length < 1) {
+ printUsage(out);
+ return false;
+ }
+
+ String command = args[0];
+ switch (command) {
+ case "time-region":
+ if (args.length <= 2) {
+ String time = args.length == 2 ? args[1] : NOW;
+ RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time);
+ out.println(GSON.toJson(timeRegion));
+ return true;
+ }
+ break;
+ case "idle-regions":
+ if (args.length <= 3) {
+ Integer numRegions = Integer.parseInt(args[1]);
+ String time = args.length == 3 ? args[2] : NOW;
+ SortedSet<RegionPruneInfoPretty> regionPruneInfos = getIdleRegions(numRegions, time);
+ out.println(GSON.toJson(regionPruneInfos));
+ return true;
+ }
+ break;
+ case "prune-info":
+ if (args.length == 2) {
+ String regionName = args[1];
+ RegionPruneInfo regionPruneInfo = getRegionPruneInfo(regionName);
+ if (regionPruneInfo != null) {
+ out.println(GSON.toJson(regionPruneInfo));
+ } else {
+ out.println(String.format("No prune info found for the region %s.", regionName));
+ }
+ return true;
+ }
+ break;
+ case "to-compact-regions":
+ if (args.length <= 3) {
+ Integer numRegions = Integer.parseInt(args[1]);
+ String time = args.length == 3 ? args[2] : NOW;
+ Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions, time);
+ out.println(GSON.toJson(toBeCompactedRegions));
+ return true;
+ }
+ break;
+ }
+
+ printUsage(out);
+ return false;
+ }
+
+ public static void main(String[] args) {
+ Configuration hConf = HBaseConfiguration.create();
+ InvalidListPruningDebugTool pruningDebug = new InvalidListPruningDebugTool();
+ try (PrintWriter out = new PrintWriter(System.out)) {
+ pruningDebug.initialize(hConf);
+ boolean success = pruningDebug.execute(args, out);
+ pruningDebug.destroy();
+ if (!success) {
+ System.exit(1);
+ }
+ } catch (IOException ex) {
+ LOG.error("Received an exception while trying to execute the debug tool. ", ex);
+ }
+ }
+
+ /**
+ * Wrapper class around {@link RegionPruneInfo} to print human readable dates for timestamps.
+ */
+ @SuppressWarnings({"WeakerAccess", "unused"})
+ public static class RegionPruneInfoPretty extends RegionPruneInfo {
+ private final transient SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ private final String pruneUpperBoundAsString;
+ private final String pruneRecordTimeAsString;
+
+ public RegionPruneInfoPretty(RegionPruneInfo regionPruneInfo) {
+ this(regionPruneInfo.getRegionName(), regionPruneInfo.getRegionNameAsString(),
+ regionPruneInfo.getPruneUpperBound(), regionPruneInfo.getPruneRecordTime());
+ }
+
+ public RegionPruneInfoPretty(byte[] regionName, String regionNameAsString,
+ long pruneUpperBound, long pruneRecordTime) {
+ super(regionName, regionNameAsString, pruneUpperBound, pruneRecordTime);
+ pruneUpperBoundAsString = dateFormat.format(TxUtils.getTimestamp(pruneUpperBound));
+ pruneRecordTimeAsString = dateFormat.format(pruneRecordTime);
+ }
+
+ public String getPruneUpperBoundAsString() {
+ return pruneUpperBoundAsString;
+ }
+
+ public String getPruneRecordTimeAsString() {
+ return pruneRecordTimeAsString;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ RegionPruneInfoPretty that = (RegionPruneInfoPretty) o;
+ return Objects.equals(pruneUpperBoundAsString, that.pruneUpperBoundAsString) &&
+ Objects.equals(pruneRecordTimeAsString, that.pruneRecordTimeAsString);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), pruneUpperBoundAsString, pruneRecordTimeAsString);
+ }
+
+ @Override
+ public String toString() {
+ return "RegionPruneInfoPretty{" +
+ ", pruneUpperBoundAsString='" + pruneUpperBoundAsString + '\'' +
+ ", pruneRecordTimeAsString='" + pruneRecordTimeAsString + '\'' +
+ "} " + super.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
new file mode 100644
index 0000000..1476906
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.apache.tephra.txprune.RegionPruneInfo;
+import org.apache.tephra.txprune.hbase.RegionsAtTime;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Type;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Test {@link InvalidListPruningDebugTool}.
+ */
+public class InvalidListPruningDebugTest extends AbstractHBaseTableTest {
+ private static final Gson GSON = new Gson();
+ private static final boolean DEBUG_PRINT = true;
+ private static final Function<RegionPruneInfo, byte[]> PRUNE_INFO_TO_BYTES =
+ new Function<RegionPruneInfo, byte[]>() {
+ @Override
+ public byte[] apply(RegionPruneInfo input) {
+ return input.getRegionName();
+ }
+ };
+ private static final Function<RegionPruneInfo, String> PRUNE_INFO_TO_STRING =
+ new Function<RegionPruneInfo, String>() {
+ @Override
+ public String apply(RegionPruneInfo input) {
+ return input.getRegionNameAsString();
+ }
+ };
+ private static final Function<String, byte[]> STRING_TO_BYTES =
+ new Function<String, byte[]>() {
+ @Override
+ public byte[] apply(String input) {
+ return Bytes.toBytes(input);
+ }
+ };
+ private static final Type PRUNE_INFO_LIST_TYPE =
+ new TypeToken<List<InvalidListPruningDebugTool.RegionPruneInfoPretty>>() { }.getType();
+
+ private static TableName pruneStateTable;
+ private static InvalidListPruningDebugTool pruningDebug;
+
+ private static TreeMultimap<Long, InvalidListPruningDebugTool.RegionPruneInfoPretty> compactedRegions =
+ TreeMultimap.create(Ordering.<Long>natural(), stringComparator());
+ private static TreeMultimap<Long, String> emptyRegions = TreeMultimap.create();
+ private static TreeMultimap<Long, String> notCompactedRegions = TreeMultimap.create();
+ private static TreeMultimap<Long, InvalidListPruningDebugTool.RegionPruneInfoPretty> deletedRegions =
+ TreeMultimap.create(Ordering.<Long>natural(), stringComparator());
+
+
+ @BeforeClass
+ public static void addData() throws Exception {
+ pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ // Prune state table is a non-transactional table, hence no transaction co-processor
+ Collections.<String>emptyList());
+ table.close();
+
+ DataJanitorState dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
+ // Record prune upper bounds for 9 regions
+ long now = System.currentTimeMillis();
+ int maxRegions = 9;
+ TableName compactedTable = TableName.valueOf("default", "compacted_table");
+ TableName emptyTable = TableName.valueOf("default", "empty_table");
+ TableName notCompactedTable = TableName.valueOf("default", "not_compacted_table");
+ TableName deletedTable = TableName.valueOf("default", "deleted_table");
+ for (long i = 0; i < maxRegions; ++i) {
+ // Compacted region
+ byte[] compactedRegion = HRegionInfo.createRegionName(compactedTable, null, i, true);
+ // The first three regions are recorded at one time, second set at another and the third set at a different time
+ long recordTime = now - 6000 + (i / 3) * 100;
+ long pruneUpperBound = (now - (i / 3) * 100000) * TxConstants.MAX_TX_PER_MS;
+ dataJanitorState.savePruneUpperBoundForRegion(compactedRegion, pruneUpperBound);
+ RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(compactedRegion);
+ compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo));
+
+ // Empty region
+ byte[] emptyRegion = HRegionInfo.createRegionName(emptyTable, null, i, true);
+ dataJanitorState.saveEmptyRegionForTime(recordTime + 1, emptyRegion);
+ emptyRegions.put(recordTime, Bytes.toString(emptyRegion));
+
+ // Not compacted region
+ byte[] notCompactedRegion = HRegionInfo.createRegionName(notCompactedTable, null, i, true);
+ notCompactedRegions.put(recordTime, Bytes.toString(notCompactedRegion));
+
+ // Deleted region
+ byte[] deletedRegion = HRegionInfo.createRegionName(deletedTable, null, i, true);
+ dataJanitorState.savePruneUpperBoundForRegion(deletedRegion, pruneUpperBound - 1000);
+ RegionPruneInfo deletedPruneInfo = dataJanitorState.getPruneInfoForRegion(deletedRegion);
+ deletedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(deletedPruneInfo));
+ }
+
+ // Also record some common regions across all runs
+ byte[] commonCompactedRegion =
+ HRegionInfo.createRegionName(TableName.valueOf("default:common_compacted"), null, 100, true);
+ byte[] commonNotCompactedRegion =
+ HRegionInfo.createRegionName(TableName.valueOf("default:common_not_compacted"), null, 100, true);
+ byte[] commonEmptyRegion =
+ HRegionInfo.createRegionName(TableName.valueOf("default:common_empty"), null, 100, true);
+ // Create one region that is the latest deleted region, this region represents a region that gets recorded
+ // every prune run, but gets deleted just before the latest run.
+ byte[] newestDeletedRegion =
+ HRegionInfo.createRegionName(TableName.valueOf("default:newest_deleted"), null, 100, true);
+
+ int runs = maxRegions / 3;
+ for (int i = 0; i < runs; ++i) {
+ long recordTime = now - 6000 + i * 100;
+ long pruneUpperBound = (now - i * 100000) * TxConstants.MAX_TX_PER_MS;
+
+ dataJanitorState.savePruneUpperBoundForRegion(commonCompactedRegion, pruneUpperBound - 1000);
+ RegionPruneInfo c = dataJanitorState.getPruneInfoForRegion(commonCompactedRegion);
+ compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(c));
+
+ dataJanitorState.saveEmptyRegionForTime(recordTime + 1, commonEmptyRegion);
+ emptyRegions.put(recordTime, Bytes.toString(commonEmptyRegion));
+
+ notCompactedRegions.put(recordTime, Bytes.toString(commonNotCompactedRegion));
+
+ // Record the latest deleted region in all the runs except the last one
+ if (i < runs - 1) {
+ dataJanitorState.savePruneUpperBoundForRegion(newestDeletedRegion, pruneUpperBound - 1000);
+ RegionPruneInfo d = dataJanitorState.getPruneInfoForRegion(newestDeletedRegion);
+ compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(d));
+ }
+ }
+
+ // Record the regions present at various times
+ for (long time : compactedRegions.asMap().keySet()) {
+ Set<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Iterables.addAll(allRegions, Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_BYTES));
+ Iterables.addAll(allRegions, Iterables.transform(emptyRegions.get(time), STRING_TO_BYTES));
+ Iterables.addAll(allRegions, Iterables.transform(notCompactedRegions.get(time), STRING_TO_BYTES));
+ dataJanitorState.saveRegionsForTime(time, allRegions);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ pruningDebug.destroy();
+
+ hBaseAdmin.disableTable(pruneStateTable);
+ hBaseAdmin.deleteTable(pruneStateTable);
+ }
+
+ @Before
+ public void before() throws Exception {
+ pruningDebug = new InvalidListPruningDebugTool();
+ pruningDebug.initialize(conf);
+ }
+
+ @After
+ public void after() throws Exception {
+ pruningDebug.destroy();
+ }
+
+ @Test
+ public void testUsage() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ Assert.assertFalse(pruningDebug.execute(new String[0], out));
+ out.flush();
+ readOutputStream(outputStream);
+ }
+ }
+
+ @Test
+ public void testTimeRegions() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ // Get the latest regions for latest recorded time
+ Long latestRecordTime = compactedRegions.asMap().lastKey();
+ Assert.assertTrue(pruningDebug.execute(new String[] {"time-region"}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream));
+
+ // Get the latest regions for latest recorded time by giving the timestamp
+ long now = System.currentTimeMillis();
+ outputStream.reset();
+ Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(now)}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream));
+
+ // Using relative time
+ outputStream.reset();
+ Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", "now-1s"}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream));
+
+ // Get the regions for the oldest recorded time
+ Long oldestRecordTime = compactedRegions.asMap().firstKey();
+ outputStream.reset();
+ Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(oldestRecordTime)}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(expectedRegionsForTime(oldestRecordTime)), readOutputStream(outputStream));
+ }
+ }
+
+ @Test
+ public void testGetPruneInfo() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ Long recordTime = compactedRegions.asMap().lastKey();
+ RegionPruneInfo pruneInfo = compactedRegions.get(recordTime).first();
+ Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", pruneInfo.getRegionNameAsString()}, out));
+ out.flush();
+ Assert.assertEquals(GSON.toJson(new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo)),
+ readOutputStream(outputStream));
+
+ // non-exising region
+ String nonExistingRegion = "non-existing-region";
+ outputStream.reset();
+ Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", nonExistingRegion}, out));
+ out.flush();
+ Assert.assertEquals(String.format("No prune info found for the region %s.", nonExistingRegion),
+ readOutputStream(outputStream));
+ }
+ }
+
+ @Test
+ public void testIdleRegions() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ // Get the list of regions that have the lowest prune upper bounds for the latest record time
+ Long latestRecordTime = compactedRegions.asMap().lastKey();
+ SortedSet<InvalidListPruningDebugTool.RegionPruneInfoPretty> latestExpected =
+ ImmutableSortedSet.copyOf(pruneUpperBoundAndStringComparator(), compactedRegions.get(latestRecordTime));
+ pruningDebug.execute(new String[]{"idle-regions", "-1"}, out);
+ out.flush();
+ assertEquals(latestExpected, readOutputStream(outputStream));
+
+ // Same command with explicit time
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime)}, out);
+ out.flush();
+ assertEquals(latestExpected, readOutputStream(outputStream));
+
+ // Same command with relative time
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"idle-regions", "-1", "now-2s"}, out);
+ out.flush();
+ assertEquals(latestExpected, readOutputStream(outputStream));
+
+ // Same command with reduced number of regions
+ outputStream.reset();
+ int limit = 2;
+ pruningDebug.execute(new String[]{"idle-regions", String.valueOf(limit), String.valueOf(latestRecordTime)}, out);
+ out.flush();
+ Assert.assertEquals(GSON.toJson(subset(latestExpected, 0, limit)), readOutputStream(outputStream));
+
+ // For a different time, this time only live regions that are compacted are returned
+ outputStream.reset();
+ Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1);
+ Set<String> compactedRegionsTime =
+ Sets.newTreeSet(Iterables.transform(compactedRegions.get(secondLastRecordTime), PRUNE_INFO_TO_STRING));
+ Set<String> compactedRegionsLatest =
+ Sets.newTreeSet(Iterables.transform(compactedRegions.get(latestRecordTime), PRUNE_INFO_TO_STRING));
+ Set<String> liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest));
+ pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime - 1)}, out);
+ out.flush();
+ List<RegionPruneInfo> actual = GSON.fromJson(readOutputStream(outputStream), PRUNE_INFO_LIST_TYPE);
+ Assert.assertEquals(liveExpected, Sets.newTreeSet(Iterables.transform(actual, PRUNE_INFO_TO_STRING)));
+ }
+ }
+
+ @Test
+ public void testToCompactRegions() throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (PrintWriter out = new PrintWriter(outputStream)) {
+ // Get the regions that are not compacted for the latest time
+ Long latestRecordTime = compactedRegions.asMap().lastKey();
+ SortedSet<String> expected = notCompactedRegions.get(latestRecordTime);
+ pruningDebug.execute(new String[]{"to-compact-regions", "-1"}, out);
+ out.flush();
+ Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class));
+
+ // Same command with explicit time
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(latestRecordTime)}, out);
+ out.flush();
+ Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class));
+
+ // Same command with relative time
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"to-compact-regions", "-1", "now+1h-3m"}, out);
+ out.flush();
+ Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class));
+
+ // Same command with reduced number of regions
+ int limit = 2;
+ outputStream.reset();
+ pruningDebug.execute(new String[]{"to-compact-regions", String.valueOf(limit), String.valueOf(latestRecordTime)},
+ out);
+ out.flush();
+ // Assert that the actual set is a subset of expected, with size 2 (since the output is not sorted)
+ SortedSet<String> actual = GSON.fromJson(readOutputStream(outputStream),
+ new TypeToken<SortedSet<String>>() { }.getType());
+ Assert.assertEquals(limit, actual.size());
+ Assert.assertTrue(Sets.difference(actual, expected).isEmpty());
+
+ // For a different time, only live regions that are not compacted are returned
+ outputStream.reset();
+ Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1);
+ Set<String> compactedRegionsTime = notCompactedRegions.get(secondLastRecordTime);
+ Set<String> compactedRegionsLatest = notCompactedRegions.get(latestRecordTime);
+ Set<String> liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest));
+ pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(secondLastRecordTime)}, out);
+ out.flush();
+ Assert.assertEquals(GSON.toJson(liveExpected), readOutputStream(outputStream));
+ }
+ }
+
+ private static RegionsAtTime expectedRegionsForTime(long time) {
+ SortedSet<String> regions = new TreeSet<>();
+ regions.addAll(Sets.newTreeSet(Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_STRING)));
+ regions.addAll(emptyRegions.get(time));
+ regions.addAll(notCompactedRegions.get(time));
+ return new RegionsAtTime(time, regions, new SimpleDateFormat(InvalidListPruningDebugTool.DATE_FORMAT));
+ }
+
+ private static Comparator<InvalidListPruningDebugTool.RegionPruneInfoPretty> stringComparator() {
+ return new Comparator<InvalidListPruningDebugTool.RegionPruneInfoPretty>() {
+ @Override
+ public int compare(InvalidListPruningDebugTool.RegionPruneInfoPretty o1,
+ InvalidListPruningDebugTool.RegionPruneInfoPretty o2) {
+ return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString());
+ }
+ };
+ }
+
+ private static Comparator<RegionPruneInfo> pruneUpperBoundAndStringComparator() {
+ return new Comparator<RegionPruneInfo>() {
+ @Override
+ public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
+ int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound());
+ if (result == 0) {
+ return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString());
+ }
+ return result;
+ }
+ };
+ }
+
+ private String readOutputStream(ByteArrayOutputStream out) throws UnsupportedEncodingException {
+ String s = out.toString(Charsets.UTF_8.toString());
+ if (DEBUG_PRINT) {
+ System.out.println(s);
+ }
+ // remove the last newline
+ return s.length() <= 1 ? "" : s.substring(0, s.length() - 1);
+ }
+
+ private void assertEquals(Collection<? extends RegionPruneInfo> expectedSorted, String actualString) {
+ List<? extends RegionPruneInfo> actual = GSON.fromJson(actualString, PRUNE_INFO_LIST_TYPE);
+ List<RegionPruneInfo> actualSorted = new ArrayList<>(actual);
+ Collections.sort(actualSorted, pruneUpperBoundAndStringComparator());
+
+ Assert.assertEquals(GSON.toJson(expectedSorted), GSON.toJson(actualSorted));
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ private <T> SortedSet<T> subset(SortedSet<T> set, int from, int to) {
+ SortedSet<T> subset = new TreeSet<>(set.comparator());
+ int i = from;
+ for (T e : set) {
+ if (i++ >= to) {
+ break;
+ }
+ subset.add(e);
+ }
+ return subset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
deleted file mode 100644
index 443c998..0000000
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tephra.hbase.txprune;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MinMaxPriorityQueue;
-import com.google.common.collect.Sets;
-import com.google.gson.Gson;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.txprune.RegionPruneInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-
-/**
- * Invalid List Pruning Debug Tool.
- */
-public class InvalidListPruningDebug {
- private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
- private static final Gson GSON = new Gson();
- private DataJanitorState dataJanitorState;
- private Connection connection;
- private TableName tableName;
-
- /**
- * Initialize the Invalid List Debug Tool.
- * @param conf {@link Configuration}
- * @throws IOException
- */
- public void initialize(final Configuration conf) throws IOException {
- LOG.debug("InvalidListPruningDebugMain : initialize method called");
- connection = ConnectionFactory.createConnection(conf);
- tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
- dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
- @Override
- public Table get() throws IOException {
- return connection.getTable(tableName);
- }
- });
- }
-
- public void destroy() throws IOException {
- if (connection != null) {
- connection.close();
- }
- }
-
- /**
- * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
- * will stop the progress of pruning.
- *
- * @param numRegions number of regions
- * @return {@link Set} of regions that needs to be compacted and flushed
- */
- public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
- // Fetch the live regions
- Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
- if (latestTimeRegion.isEmpty()) {
- return new HashSet<>();
- }
-
- Long timestamp = latestTimeRegion.keySet().iterator().next();
- SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
-
- SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
- SortedSet<String> emptyRegionNames = new TreeSet<>();
- Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
- for (String regionString : regionStrings) {
- emptyRegionNames.add(regionString);
- }
-
- Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
-
- // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
- // not empty and have not been registered prune upper bound
- Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
- for (RegionPruneInfo prunedRegion : prunedRegions) {
- if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
- nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
- }
- }
-
- if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
- return nonEmptyRegions;
- }
-
- Set<String> subsetRegions = new HashSet<>(numRegions);
- for (String regionName : nonEmptyRegions) {
- if (subsetRegions.size() == numRegions) {
- break;
- }
- subsetRegions.add(regionName);
- }
- return subsetRegions;
- }
-
- /**
- * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
- * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
- * that are known to be live will be returned.
- *
- * @param numRegions number of regions
- * @return Map of region name and its prune upper bound
- */
- public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException {
- List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null);
- if (regionPruneInfos.isEmpty()) {
- return new LinkedList<>();
- }
-
- // Create a set with region names
- Set<String> pruneRegionNameSet = new HashSet<>();
- for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
- pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
- }
-
- // Fetch the live regions
- Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
- if (!latestTimeRegion.isEmpty()) {
- SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
- Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
- List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
- for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
- if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
- liveRegionWithPruneInfoList.add(regionPruneInfo);
- }
- }
-
- // Use the subset of live regions and prune regions
- regionPruneInfos = liveRegionWithPruneInfoList;
- }
-
- if (numRegions < 0) {
- numRegions = regionPruneInfos.size();
- }
-
- Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
- @Override
- public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
- return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound());
- }
- }).maximumSize(numRegions).create();
-
- for (RegionPruneInfo pruneInfo : regionPruneInfos) {
- lowestPrunes.add(pruneInfo);
- }
- return lowestPrunes;
- }
-
- /**
- * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet,
- * it will return a null.
- *
- * @param regionId region id
- * @return {@link RegionPruneInfo} of the region
- * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo}
- */
- @Nullable
- public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException {
- return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId));
- }
-
- /**
- *
- * @param time Given a time, provide the {@link TimeRegions} at or before that time
- * @return transactional regions that are present at or before the given time
- * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions}
- */
- public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException {
- Map<Long, SortedSet<String>> regionMap = new HashMap<>();
- TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time);
- if (timeRegions == null) {
- return regionMap;
- }
- SortedSet<String> regionNames = new TreeSet<>();
- Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN);
- for (String regionString : regionStrings) {
- regionNames.add(regionString);
- }
- regionMap.put(timeRegions.getTime(), regionNames);
- return regionMap;
- }
-
- private void printUsage(PrintWriter pw) {
- pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>");
- pw.println("Available commands, corresponding parameters are:");
- pw.println("****************************************************");
- pw.println("time-region ts");
- pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " +
- "or the latest time before time 'ts'.");
- pw.println("idle-regions limit");
- pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " +
- "provided as the limit, prune upper bounds of all regions are returned.");
- pw.println("prune-info region-name-as-string");
- pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
- pw.println("to-compact-regions limit");
- pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
- "and have not registered a prune upper bound.");
- }
-
- private boolean execute(String[] args) throws IOException {
- try (PrintWriter pw = new PrintWriter(System.out)) {
- if (args.length != 2) {
- printUsage(pw);
- return false;
- }
-
- String command = args[0];
- String parameter = args[1];
- if ("time-region".equals(command)) {
- Long time = Long.parseLong(parameter);
- Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time);
- pw.println(GSON.toJson(timeRegion));
- return true;
- } else if ("idle-regions".equals(command)) {
- Integer numRegions = Integer.parseInt(parameter);
- Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions);
- pw.println(GSON.toJson(regionPruneInfos));
- return true;
- } else if ("prune-info".equals(command)) {
- RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter);
- if (regionPruneInfo != null) {
- pw.println(GSON.toJson(regionPruneInfo));
- } else {
- pw.println(String.format("No prune info found for the region %s.", parameter));
- }
- return true;
- } else if ("to-compact-regions".equals(command)) {
- Integer numRegions = Integer.parseInt(parameter);
- Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
- pw.println(GSON.toJson(toBeCompactedRegions));
- return true;
- } else {
- pw.println(String.format("%s is not a valid command.", command));
- printUsage(pw);
- return false;
- }
- }
- }
-
- public static void main(String[] args) {
- Configuration hConf = HBaseConfiguration.create();
- InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug();
- try {
- pruningDebug.initialize(hConf);
- boolean success = pruningDebug.execute(args);
- pruningDebug.destroy();
- if (!success) {
- System.exit(1);
- }
- } catch (IOException ex) {
- LOG.error("Received an exception while trying to execute the debug tool. ", ex);
- }
- }
-}
|