From commits-return-807-apmail-tephra-commits-archive=tephra.apache.org@tephra.incubator.apache.org Fri Sep 22 02:42:09 2017 Return-Path: X-Original-To: apmail-tephra-commits-archive@minotaur.apache.org Delivered-To: apmail-tephra-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D413ACAB0 for ; Fri, 22 Sep 2017 02:42:09 +0000 (UTC) Received: (qmail 41844 invoked by uid 500); 22 Sep 2017 02:42:09 -0000 Delivered-To: apmail-tephra-commits-archive@tephra.apache.org Received: (qmail 41820 invoked by uid 500); 22 Sep 2017 02:42:09 -0000 Mailing-List: contact commits-help@tephra.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tephra.incubator.apache.org Delivered-To: mailing list commits@tephra.incubator.apache.org Received: (qmail 41806 invoked by uid 99); 22 Sep 2017 02:42:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Sep 2017 02:42:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 04F14CB208 for ; Fri, 22 Sep 2017 02:42:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Lh9lTyJQcKNv for ; Fri, 22 Sep 2017 02:42:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id E87B661260 for ; Fri, 22 Sep 2017 02:42:01 +0000 (UTC) Received: (qmail 40494 invoked by uid 99); 22 Sep 2017 02:42:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Sep 2017 02:42:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5DF42F5A72; Fri, 22 Sep 2017 02:42:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: poorna@apache.org To: commits@tephra.incubator.apache.org Date: Fri, 22 Sep 2017 02:42:02 -0000 Message-Id: In-Reply-To: <0ae43d087064442f94ebe8f37a0ab7da@git.apache.org> References: <0ae43d087064442f94ebe8f37a0ab7da@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] incubator-tephra git commit: TEPHRA-245 Improve prune debug tool 1. Improve documentation 2. Accept relative time as parameter 3. Print human readable dates 4. Sort the idle regions by prune upper bound 5. Fix warnings 6. Add test case http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java new file mode 100644 index 0000000..1476906 --- /dev/null +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.AbstractHBaseTableTest; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.apache.tephra.txprune.hbase.RegionsAtTime; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Test {@link InvalidListPruningDebugTool}. + */ +public class InvalidListPruningDebugTest extends AbstractHBaseTableTest { + private static final Gson GSON = new Gson(); + private static final boolean DEBUG_PRINT = true; + private static final Function PRUNE_INFO_TO_BYTES = + new Function() { + @Override + public byte[] apply(RegionPruneInfo input) { + return input.getRegionName(); + } + }; + private static final Function PRUNE_INFO_TO_STRING = + new Function() { + @Override + public String apply(RegionPruneInfo input) { + return input.getRegionNameAsString(); + } + }; + private static final Function STRING_TO_BYTES = + new Function() { + @Override + public byte[] apply(String input) { + return Bytes.toBytes(input); + } + }; + private static final Type PRUNE_INFO_LIST_TYPE = + new TypeToken>() { }.getType(); + + private static TableName pruneStateTable; + private static InvalidListPruningDebugTool pruningDebug; + + private static TreeMultimap compactedRegions = + TreeMultimap.create(Ordering.natural(), stringComparator()); + private static TreeMultimap emptyRegions = TreeMultimap.create(); + private static TreeMultimap notCompactedRegions = TreeMultimap.create(); + private static TreeMultimap deletedRegions = + TreeMultimap.create(Ordering.natural(), stringComparator()); + + + @BeforeClass + public static void addData() throws Exception { + pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false, + // Prune state table is a non-transactional table, hence no transaction co-processor + Collections.emptyList()); + table.close(); + + DataJanitorState dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + + // Record prune upper bounds for 9 regions + long now = System.currentTimeMillis(); + int maxRegions = 9; + TableName compactedTable = TableName.valueOf("default", "compacted_table"); + TableName emptyTable = TableName.valueOf("default", "empty_table"); + TableName notCompactedTable = TableName.valueOf("default", "not_compacted_table"); + TableName deletedTable = TableName.valueOf("default", "deleted_table"); + for (long i = 0; i < maxRegions; ++i) { + // Compacted region + byte[] compactedRegion = HRegionInfo.createRegionName(compactedTable, null, i, true); + // The first three regions are recorded at one time, second set at another and the third set at a different time + long recordTime = now - 6000 + (i / 3) * 100; + long pruneUpperBound = (now - (i / 3) * 100000) * TxConstants.MAX_TX_PER_MS; + dataJanitorState.savePruneUpperBoundForRegion(compactedRegion, pruneUpperBound); + RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(compactedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo)); + + // Empty region + byte[] emptyRegion = HRegionInfo.createRegionName(emptyTable, null, i, true); + dataJanitorState.saveEmptyRegionForTime(recordTime + 1, emptyRegion); + emptyRegions.put(recordTime, Bytes.toString(emptyRegion)); + + // Not compacted region + byte[] notCompactedRegion = HRegionInfo.createRegionName(notCompactedTable, null, i, true); + notCompactedRegions.put(recordTime, Bytes.toString(notCompactedRegion)); + + // Deleted region + byte[] deletedRegion = HRegionInfo.createRegionName(deletedTable, null, i, true); + dataJanitorState.savePruneUpperBoundForRegion(deletedRegion, pruneUpperBound - 1000); + RegionPruneInfo deletedPruneInfo = dataJanitorState.getPruneInfoForRegion(deletedRegion); + deletedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(deletedPruneInfo)); + } + + // Also record some common regions across all runs + byte[] commonCompactedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_compacted"), null, 100, true); + byte[] commonNotCompactedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_not_compacted"), null, 100, true); + byte[] commonEmptyRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_empty"), null, 100, true); + // Create one region that is the latest deleted region, this region represents a region that gets recorded + // every prune run, but gets deleted just before the latest run. + byte[] newestDeletedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:newest_deleted"), null, 100, true); + + int runs = maxRegions / 3; + for (int i = 0; i < runs; ++i) { + long recordTime = now - 6000 + i * 100; + long pruneUpperBound = (now - i * 100000) * TxConstants.MAX_TX_PER_MS; + + dataJanitorState.savePruneUpperBoundForRegion(commonCompactedRegion, pruneUpperBound - 1000); + RegionPruneInfo c = dataJanitorState.getPruneInfoForRegion(commonCompactedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(c)); + + dataJanitorState.saveEmptyRegionForTime(recordTime + 1, commonEmptyRegion); + emptyRegions.put(recordTime, Bytes.toString(commonEmptyRegion)); + + notCompactedRegions.put(recordTime, Bytes.toString(commonNotCompactedRegion)); + + // Record the latest deleted region in all the runs except the last one + if (i < runs - 1) { + dataJanitorState.savePruneUpperBoundForRegion(newestDeletedRegion, pruneUpperBound - 1000); + RegionPruneInfo d = dataJanitorState.getPruneInfoForRegion(newestDeletedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(d)); + } + } + + // Record the regions present at various times + for (long time : compactedRegions.asMap().keySet()) { + Set allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Iterables.addAll(allRegions, Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_BYTES)); + Iterables.addAll(allRegions, Iterables.transform(emptyRegions.get(time), STRING_TO_BYTES)); + Iterables.addAll(allRegions, Iterables.transform(notCompactedRegions.get(time), STRING_TO_BYTES)); + dataJanitorState.saveRegionsForTime(time, allRegions); + } + } + + @AfterClass + public static void cleanup() throws Exception { + pruningDebug.destroy(); + + hBaseAdmin.disableTable(pruneStateTable); + hBaseAdmin.deleteTable(pruneStateTable); + } + + @Before + public void before() throws Exception { + pruningDebug = new InvalidListPruningDebugTool(); + pruningDebug.initialize(conf); + } + + @After + public void after() throws Exception { + pruningDebug.destroy(); + } + + @Test + public void testUsage() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + Assert.assertFalse(pruningDebug.execute(new String[0], out)); + out.flush(); + readOutputStream(outputStream); + } + } + + @Test + public void testTimeRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the latest regions for latest recorded time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region"}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Get the latest regions for latest recorded time by giving the timestamp + long now = System.currentTimeMillis(); + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(now)}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Using relative time + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", "now-1s"}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Get the regions for the oldest recorded time + Long oldestRecordTime = compactedRegions.asMap().firstKey(); + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(oldestRecordTime)}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(oldestRecordTime)), readOutputStream(outputStream)); + } + } + + @Test + public void testGetPruneInfo() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + Long recordTime = compactedRegions.asMap().lastKey(); + RegionPruneInfo pruneInfo = compactedRegions.get(recordTime).first(); + Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", pruneInfo.getRegionNameAsString()}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo)), + readOutputStream(outputStream)); + + // non-exising region + String nonExistingRegion = "non-existing-region"; + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", nonExistingRegion}, out)); + out.flush(); + Assert.assertEquals(String.format("No prune info found for the region %s.", nonExistingRegion), + readOutputStream(outputStream)); + } + } + + @Test + public void testIdleRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the list of regions that have the lowest prune upper bounds for the latest record time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + SortedSet latestExpected = + ImmutableSortedSet.copyOf(pruneUpperBoundAndStringComparator(), compactedRegions.get(latestRecordTime)); + pruningDebug.execute(new String[]{"idle-regions", "-1"}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with explicit time + outputStream.reset(); + pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime)}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with relative time + outputStream.reset(); + pruningDebug.execute(new String[]{"idle-regions", "-1", "now-2s"}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with reduced number of regions + outputStream.reset(); + int limit = 2; + pruningDebug.execute(new String[]{"idle-regions", String.valueOf(limit), String.valueOf(latestRecordTime)}, out); + out.flush(); + Assert.assertEquals(GSON.toJson(subset(latestExpected, 0, limit)), readOutputStream(outputStream)); + + // For a different time, this time only live regions that are compacted are returned + outputStream.reset(); + Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1); + Set compactedRegionsTime = + Sets.newTreeSet(Iterables.transform(compactedRegions.get(secondLastRecordTime), PRUNE_INFO_TO_STRING)); + Set compactedRegionsLatest = + Sets.newTreeSet(Iterables.transform(compactedRegions.get(latestRecordTime), PRUNE_INFO_TO_STRING)); + Set liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest)); + pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime - 1)}, out); + out.flush(); + List actual = GSON.fromJson(readOutputStream(outputStream), PRUNE_INFO_LIST_TYPE); + Assert.assertEquals(liveExpected, Sets.newTreeSet(Iterables.transform(actual, PRUNE_INFO_TO_STRING))); + } + } + + @Test + public void testToCompactRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the regions that are not compacted for the latest time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + SortedSet expected = notCompactedRegions.get(latestRecordTime); + pruningDebug.execute(new String[]{"to-compact-regions", "-1"}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with explicit time + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(latestRecordTime)}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with relative time + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", "now+1h-3m"}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with reduced number of regions + int limit = 2; + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", String.valueOf(limit), String.valueOf(latestRecordTime)}, + out); + out.flush(); + // Assert that the actual set is a subset of expected, with size 2 (since the output is not sorted) + SortedSet actual = GSON.fromJson(readOutputStream(outputStream), + new TypeToken>() { }.getType()); + Assert.assertEquals(limit, actual.size()); + Assert.assertTrue(Sets.difference(actual, expected).isEmpty()); + + // For a different time, only live regions that are not compacted are returned + outputStream.reset(); + Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1); + Set compactedRegionsTime = notCompactedRegions.get(secondLastRecordTime); + Set compactedRegionsLatest = notCompactedRegions.get(latestRecordTime); + Set liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest)); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(secondLastRecordTime)}, out); + out.flush(); + Assert.assertEquals(GSON.toJson(liveExpected), readOutputStream(outputStream)); + } + } + + private static RegionsAtTime expectedRegionsForTime(long time) { + SortedSet regions = new TreeSet<>(); + regions.addAll(Sets.newTreeSet(Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_STRING))); + regions.addAll(emptyRegions.get(time)); + regions.addAll(notCompactedRegions.get(time)); + return new RegionsAtTime(time, regions, new SimpleDateFormat(InvalidListPruningDebugTool.DATE_FORMAT)); + } + + private static Comparator stringComparator() { + return new Comparator() { + @Override + public int compare(InvalidListPruningDebugTool.RegionPruneInfoPretty o1, + InvalidListPruningDebugTool.RegionPruneInfoPretty o2) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + }; + } + + private static Comparator pruneUpperBoundAndStringComparator() { + return new Comparator() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound()); + if (result == 0) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + return result; + } + }; + } + + private String readOutputStream(ByteArrayOutputStream out) throws UnsupportedEncodingException { + String s = out.toString(Charsets.UTF_8.toString()); + if (DEBUG_PRINT) { + System.out.println(s); + } + // remove the last newline + return s.length() <= 1 ? "" : s.substring(0, s.length() - 1); + } + + private void assertEquals(Collection expectedSorted, String actualString) { + List actual = GSON.fromJson(actualString, PRUNE_INFO_LIST_TYPE); + List actualSorted = new ArrayList<>(actual); + Collections.sort(actualSorted, pruneUpperBoundAndStringComparator()); + + Assert.assertEquals(GSON.toJson(expectedSorted), GSON.toJson(actualSorted)); + } + + @SuppressWarnings("SameParameterValue") + private SortedSet subset(SortedSet set, int from, int to) { + SortedSet subset = new TreeSet<>(set.comparator()); + int i = from; + for (T e : set) { + if (i++ >= to) { + break; + } + subset.add(e); + } + return subset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java deleted file mode 100644 index 443c998..0000000 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tephra.hbase.txprune; - -import com.google.common.collect.Iterables; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.tephra.TxConstants; -import org.apache.tephra.txprune.RegionPruneInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import javax.annotation.Nullable; - -/** - * Invalid List Pruning Debug Tool. - */ -public class InvalidListPruningDebug { - private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); - private static final Gson GSON = new Gson(); - private DataJanitorState dataJanitorState; - private Connection connection; - private TableName tableName; - - /** - * Initialize the Invalid List Debug Tool. - * @param conf {@link Configuration} - * @throws IOException - */ - public void initialize(final Configuration conf) throws IOException { - LOG.debug("InvalidListPruningDebugMain : initialize method called"); - connection = ConnectionFactory.createConnection(conf); - tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); - dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return connection.getTable(tableName); - } - }); - } - - public void destroy() throws IOException { - if (connection != null) { - connection.close(); - } - } - - /** - * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions - * will stop the progress of pruning. - * - * @param numRegions number of regions - * @return {@link Set} of regions that needs to be compacted and flushed - */ - public Set getRegionsToBeCompacted(Integer numRegions) throws IOException { - // Fetch the live regions - Map> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); - if (latestTimeRegion.isEmpty()) { - return new HashSet<>(); - } - - Long timestamp = latestTimeRegion.keySet().iterator().next(); - SortedSet liveRegions = latestTimeRegion.get(timestamp); - - SortedSet emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); - SortedSet emptyRegionNames = new TreeSet<>(); - Iterable regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); - for (String regionString : regionStrings) { - emptyRegionNames.add(regionString); - } - - Set nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); - - // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are - // not empty and have not been registered prune upper bound - Queue prunedRegions = getIdleRegions(-1); - for (RegionPruneInfo prunedRegion : prunedRegions) { - if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { - nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); - } - } - - if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { - return nonEmptyRegions; - } - - Set subsetRegions = new HashSet<>(numRegions); - for (String regionName : nonEmptyRegions) { - if (subsetRegions.size() == numRegions) { - break; - } - subsetRegions.add(regionName); - } - return subsetRegions; - } - - /** - * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. - * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions - * that are known to be live will be returned. - * - * @param numRegions number of regions - * @return Map of region name and its prune upper bound - */ - public Queue getIdleRegions(Integer numRegions) throws IOException { - List regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); - if (regionPruneInfos.isEmpty()) { - return new LinkedList<>(); - } - - // Create a set with region names - Set pruneRegionNameSet = new HashSet<>(); - for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { - pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); - } - - // Fetch the live regions - Map> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); - if (!latestTimeRegion.isEmpty()) { - SortedSet liveRegions = latestTimeRegion.values().iterator().next(); - Set liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); - List liveRegionWithPruneInfoList = new ArrayList<>(); - for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { - if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { - liveRegionWithPruneInfoList.add(regionPruneInfo); - } - } - - // Use the subset of live regions and prune regions - regionPruneInfos = liveRegionWithPruneInfoList; - } - - if (numRegions < 0) { - numRegions = regionPruneInfos.size(); - } - - Queue lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator() { - @Override - public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { - return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); - } - }).maximumSize(numRegions).create(); - - for (RegionPruneInfo pruneInfo : regionPruneInfos) { - lowestPrunes.add(pruneInfo); - } - return lowestPrunes; - } - - /** - * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, - * it will return a null. - * - * @param regionId region id - * @return {@link RegionPruneInfo} of the region - * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} - */ - @Nullable - public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { - return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); - } - - /** - * - * @param time Given a time, provide the {@link TimeRegions} at or before that time - * @return transactional regions that are present at or before the given time - * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} - */ - public Map> getRegionsOnOrBeforeTime(Long time) throws IOException { - Map> regionMap = new HashMap<>(); - TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); - if (timeRegions == null) { - return regionMap; - } - SortedSet regionNames = new TreeSet<>(); - Iterable regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); - for (String regionString : regionStrings) { - regionNames.add(regionString); - } - regionMap.put(timeRegions.getTime(), regionNames); - return regionMap; - } - - private void printUsage(PrintWriter pw) { - pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning "); - pw.println("Available commands, corresponding parameters are:"); - pw.println("****************************************************"); - pw.println("time-region ts"); - pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + - "or the latest time before time 'ts'."); - pw.println("idle-regions limit"); - pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + - "provided as the limit, prune upper bounds of all regions are returned."); - pw.println("prune-info region-name-as-string"); - pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); - pw.println("to-compact-regions limit"); - pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + - "and have not registered a prune upper bound."); - } - - private boolean execute(String[] args) throws IOException { - try (PrintWriter pw = new PrintWriter(System.out)) { - if (args.length != 2) { - printUsage(pw); - return false; - } - - String command = args[0]; - String parameter = args[1]; - if ("time-region".equals(command)) { - Long time = Long.parseLong(parameter); - Map> timeRegion = getRegionsOnOrBeforeTime(time); - pw.println(GSON.toJson(timeRegion)); - return true; - } else if ("idle-regions".equals(command)) { - Integer numRegions = Integer.parseInt(parameter); - Queue regionPruneInfos = getIdleRegions(numRegions); - pw.println(GSON.toJson(regionPruneInfos)); - return true; - } else if ("prune-info".equals(command)) { - RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); - if (regionPruneInfo != null) { - pw.println(GSON.toJson(regionPruneInfo)); - } else { - pw.println(String.format("No prune info found for the region %s.", parameter)); - } - return true; - } else if ("to-compact-regions".equals(command)) { - Integer numRegions = Integer.parseInt(parameter); - Set toBeCompactedRegions = getRegionsToBeCompacted(numRegions); - pw.println(GSON.toJson(toBeCompactedRegions)); - return true; - } else { - pw.println(String.format("%s is not a valid command.", command)); - printUsage(pw); - return false; - } - } - } - - public static void main(String[] args) { - Configuration hConf = HBaseConfiguration.create(); - InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); - try { - pruningDebug.initialize(hConf); - boolean success = pruningDebug.execute(args); - pruningDebug.destroy(); - if (!success) { - System.exit(1); - } - } catch (IOException ex) { - LOG.error("Received an exception while trying to execute the debug tool. ", ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java new file mode 100644 index 0000000..5d7b871 --- /dev/null +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.apache.tephra.txprune.hbase.InvalidListPruningDebug; +import org.apache.tephra.txprune.hbase.RegionsAtTime; +import org.apache.tephra.util.TimeMathParser; +import org.apache.tephra.util.TxUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebugTool implements InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebugTool.class); + private static final Gson GSON = new Gson(); + private static final String NOW = "now"; + @VisibleForTesting + static final String DATE_FORMAT = "d-MMM-yyyy HH:mm:ss z"; + + private DataJanitorState dataJanitorState; + private Connection connection; + private TableName tableName; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException when not able to create an HBase connection + */ + @Override + @SuppressWarnings("WeakerAccess") + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + connection = ConnectionFactory.createConnection(conf); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return connection.getTable(tableName); + } + }); + } + + @Override + @SuppressWarnings("WeakerAccess") + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + *

+ * Note that this can return false positives in the following case - + * At time 't' empty regions were recorded, and time 't+1' prune iteration was invoked. + * Since a new set of regions was recorded at time 't+1', all regions recorded as empty before time 't + 1' will + * now be reported as blocking the pruning, even though they are empty. This is because we cannot tell if those + * regions got any new data between time 't' and 't + 1'. + * + * @param numRegions number of regions + * @param time time in milliseconds or relative time, regions recorded before the given time are returned + * @return {@link Set} of regions that needs to be compacted and flushed + */ + @Override + @SuppressWarnings("WeakerAccess") + public Set getRegionsToBeCompacted(Integer numRegions, String time) throws IOException { + // Fetch the live regions at the given time + RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time); + if (timeRegion.getRegions().isEmpty()) { + return Collections.emptySet(); + } + + Long timestamp = timeRegion.getTime(); + SortedSet regions = timeRegion.getRegions(); + + // Get the live regions + SortedSet liveRegions = getRegionsOnOrBeforeTime(NOW).getRegions(); + // Retain only the live regions + regions = Sets.newTreeSet(Sets.intersection(liveRegions, regions)); + + SortedSet emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet emptyRegionNames = new TreeSet<>(); + Iterable regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set nonEmptyRegions = Sets.newHashSet(Sets.difference(regions, emptyRegionNames)); + + // Get all pruned regions for the current time and remove them from the nonEmptyRegions, + // resulting in a set of regions that are not empty and have not been registered prune upper bound + List prunedRegions = dataJanitorState.getPruneInfoForRegions(null); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. + * + * @param numRegions number of regions + * @param time time in milliseconds or relative time, regions recorded before the given time are returned + * @return Map of region name and its prune upper bound + */ + @Override + @SuppressWarnings("WeakerAccess") + public SortedSet getIdleRegions(Integer numRegions, String time) throws IOException { + List regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new TreeSet<>(); + } + + // Create a set with region names + Set pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the latest live regions + RegionsAtTime latestRegions = getRegionsOnOrBeforeTime(NOW); + + // Fetch the regions at the given time + RegionsAtTime timeRegions = getRegionsOnOrBeforeTime(time); + Set liveRegions = Sets.intersection(latestRegions.getRegions(), timeRegions.getRegions()); + Set liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Comparator comparator = new Comparator() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound()); + if (result == 0) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + return result; + } + }; + MinMaxPriorityQueue lowestPrunes = + MinMaxPriorityQueue.orderedBy(comparator).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(new RegionPruneInfoPretty(pruneInfo)); + } + + SortedSet regions = new TreeSet<>(comparator); + regions.addAll(lowestPrunes); + return regions; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Override + @SuppressWarnings("WeakerAccess") + @Nullable + public RegionPruneInfoPretty getRegionPruneInfo(String regionId) throws IOException { + RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + return pruneInfo == null ? null : new RegionPruneInfoPretty(pruneInfo); + } + + /** + * + * @param timeString Given a time, provide the {@link TimeRegions} at or before that time. + * Time can be in milliseconds or relative time. + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + @Override + @SuppressWarnings("WeakerAccess") + public RegionsAtTime getRegionsOnOrBeforeTime(String timeString) throws IOException { + long time = TimeMathParser.parseTime(timeString, TimeUnit.MILLISECONDS); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return new RegionsAtTime(time, new TreeSet(), dateFormat); + } + SortedSet regionNames = new TreeSet<>(); + Iterable regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + return new RegionsAtTime(timeRegions.getTime(), regionNames, dateFormat); + } + + private void printUsage(PrintWriter pw) { + pw.println(); + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning "); + pw.println(); + pw.println("Available commands"); + pw.println("------------------"); + pw.println("to-compact-regions limit [time]"); + pw.println("Desc: Prints out the regions that are active, but not empty, " + + "and have not registered a prune upper bound."); + pw.println(); + pw.println("idle-regions limit [time]"); + pw.println("Desc: Prints out the regions that have the lowest prune upper bounds."); + pw.println(); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints the prune upper bound and the time it was recorded for the given region."); + pw.println(); + pw.println("time-region [time]"); + pw.println("Desc: Prints out the transactional regions present in HBase recorded at or before the given time."); + pw.println(); + pw.println("Parameters"); + pw.println("----------"); + pw.println(" * limit - used to limit the number of regions returned, -1 to apply no limit"); + pw.println(" * time - if time is not provided, the current time is used. "); + pw.println(" When provided, the data recorded on or before the given time is returned."); + pw.println(" Time can be provided in milliseconds, or can be provided as a relative time."); + pw.println(" Examples for relative time -"); + pw.println(" now = current time,"); + pw.println(" now-1d = current time - 1 day,"); + pw.println(" now-1d+4h = 20 hours before now,"); + pw.println(" now+5s = current time + 5 seconds"); + pw.println(); + } + + @VisibleForTesting + boolean execute(String[] args, PrintWriter out) throws IOException { + if (args.length < 1) { + printUsage(out); + return false; + } + + String command = args[0]; + switch (command) { + case "time-region": + if (args.length <= 2) { + String time = args.length == 2 ? args[1] : NOW; + RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time); + out.println(GSON.toJson(timeRegion)); + return true; + } + break; + case "idle-regions": + if (args.length <= 3) { + Integer numRegions = Integer.parseInt(args[1]); + String time = args.length == 3 ? args[2] : NOW; + SortedSet regionPruneInfos = getIdleRegions(numRegions, time); + out.println(GSON.toJson(regionPruneInfos)); + return true; + } + break; + case "prune-info": + if (args.length == 2) { + String regionName = args[1]; + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(regionName); + if (regionPruneInfo != null) { + out.println(GSON.toJson(regionPruneInfo)); + } else { + out.println(String.format("No prune info found for the region %s.", regionName)); + } + return true; + } + break; + case "to-compact-regions": + if (args.length <= 3) { + Integer numRegions = Integer.parseInt(args[1]); + String time = args.length == 3 ? args[2] : NOW; + Set toBeCompactedRegions = getRegionsToBeCompacted(numRegions, time); + out.println(GSON.toJson(toBeCompactedRegions)); + return true; + } + break; + } + + printUsage(out); + return false; + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebugTool pruningDebug = new InvalidListPruningDebugTool(); + try (PrintWriter out = new PrintWriter(System.out)) { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args, out); + pruningDebug.destroy(); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } + + /** + * Wrapper class around {@link RegionPruneInfo} to print human readable dates for timestamps. + */ + @SuppressWarnings({"WeakerAccess", "unused"}) + public static class RegionPruneInfoPretty extends RegionPruneInfo { + private final transient SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + private final String pruneUpperBoundAsString; + private final String pruneRecordTimeAsString; + + public RegionPruneInfoPretty(RegionPruneInfo regionPruneInfo) { + this(regionPruneInfo.getRegionName(), regionPruneInfo.getRegionNameAsString(), + regionPruneInfo.getPruneUpperBound(), regionPruneInfo.getPruneRecordTime()); + } + + public RegionPruneInfoPretty(byte[] regionName, String regionNameAsString, + long pruneUpperBound, long pruneRecordTime) { + super(regionName, regionNameAsString, pruneUpperBound, pruneRecordTime); + pruneUpperBoundAsString = dateFormat.format(TxUtils.getTimestamp(pruneUpperBound)); + pruneRecordTimeAsString = dateFormat.format(pruneRecordTime); + } + + public String getPruneUpperBoundAsString() { + return pruneUpperBoundAsString; + } + + public String getPruneRecordTimeAsString() { + return pruneRecordTimeAsString; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + RegionPruneInfoPretty that = (RegionPruneInfoPretty) o; + return Objects.equals(pruneUpperBoundAsString, that.pruneUpperBoundAsString) && + Objects.equals(pruneRecordTimeAsString, that.pruneRecordTimeAsString); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), pruneUpperBoundAsString, pruneRecordTimeAsString); + } + + @Override + public String toString() { + return "RegionPruneInfoPretty{" + + ", pruneUpperBoundAsString='" + pruneUpperBoundAsString + '\'' + + ", pruneRecordTimeAsString='" + pruneRecordTimeAsString + '\'' + + "} " + super.toString(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java new file mode 100644 index 0000000..1476906 --- /dev/null +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.AbstractHBaseTableTest; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.apache.tephra.txprune.hbase.RegionsAtTime; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Type; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Test {@link InvalidListPruningDebugTool}. + */ +public class InvalidListPruningDebugTest extends AbstractHBaseTableTest { + private static final Gson GSON = new Gson(); + private static final boolean DEBUG_PRINT = true; + private static final Function PRUNE_INFO_TO_BYTES = + new Function() { + @Override + public byte[] apply(RegionPruneInfo input) { + return input.getRegionName(); + } + }; + private static final Function PRUNE_INFO_TO_STRING = + new Function() { + @Override + public String apply(RegionPruneInfo input) { + return input.getRegionNameAsString(); + } + }; + private static final Function STRING_TO_BYTES = + new Function() { + @Override + public byte[] apply(String input) { + return Bytes.toBytes(input); + } + }; + private static final Type PRUNE_INFO_LIST_TYPE = + new TypeToken>() { }.getType(); + + private static TableName pruneStateTable; + private static InvalidListPruningDebugTool pruningDebug; + + private static TreeMultimap compactedRegions = + TreeMultimap.create(Ordering.natural(), stringComparator()); + private static TreeMultimap emptyRegions = TreeMultimap.create(); + private static TreeMultimap notCompactedRegions = TreeMultimap.create(); + private static TreeMultimap deletedRegions = + TreeMultimap.create(Ordering.natural(), stringComparator()); + + + @BeforeClass + public static void addData() throws Exception { + pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false, + // Prune state table is a non-transactional table, hence no transaction co-processor + Collections.emptyList()); + table.close(); + + DataJanitorState dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + + // Record prune upper bounds for 9 regions + long now = System.currentTimeMillis(); + int maxRegions = 9; + TableName compactedTable = TableName.valueOf("default", "compacted_table"); + TableName emptyTable = TableName.valueOf("default", "empty_table"); + TableName notCompactedTable = TableName.valueOf("default", "not_compacted_table"); + TableName deletedTable = TableName.valueOf("default", "deleted_table"); + for (long i = 0; i < maxRegions; ++i) { + // Compacted region + byte[] compactedRegion = HRegionInfo.createRegionName(compactedTable, null, i, true); + // The first three regions are recorded at one time, second set at another and the third set at a different time + long recordTime = now - 6000 + (i / 3) * 100; + long pruneUpperBound = (now - (i / 3) * 100000) * TxConstants.MAX_TX_PER_MS; + dataJanitorState.savePruneUpperBoundForRegion(compactedRegion, pruneUpperBound); + RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(compactedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo)); + + // Empty region + byte[] emptyRegion = HRegionInfo.createRegionName(emptyTable, null, i, true); + dataJanitorState.saveEmptyRegionForTime(recordTime + 1, emptyRegion); + emptyRegions.put(recordTime, Bytes.toString(emptyRegion)); + + // Not compacted region + byte[] notCompactedRegion = HRegionInfo.createRegionName(notCompactedTable, null, i, true); + notCompactedRegions.put(recordTime, Bytes.toString(notCompactedRegion)); + + // Deleted region + byte[] deletedRegion = HRegionInfo.createRegionName(deletedTable, null, i, true); + dataJanitorState.savePruneUpperBoundForRegion(deletedRegion, pruneUpperBound - 1000); + RegionPruneInfo deletedPruneInfo = dataJanitorState.getPruneInfoForRegion(deletedRegion); + deletedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(deletedPruneInfo)); + } + + // Also record some common regions across all runs + byte[] commonCompactedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_compacted"), null, 100, true); + byte[] commonNotCompactedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_not_compacted"), null, 100, true); + byte[] commonEmptyRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:common_empty"), null, 100, true); + // Create one region that is the latest deleted region, this region represents a region that gets recorded + // every prune run, but gets deleted just before the latest run. + byte[] newestDeletedRegion = + HRegionInfo.createRegionName(TableName.valueOf("default:newest_deleted"), null, 100, true); + + int runs = maxRegions / 3; + for (int i = 0; i < runs; ++i) { + long recordTime = now - 6000 + i * 100; + long pruneUpperBound = (now - i * 100000) * TxConstants.MAX_TX_PER_MS; + + dataJanitorState.savePruneUpperBoundForRegion(commonCompactedRegion, pruneUpperBound - 1000); + RegionPruneInfo c = dataJanitorState.getPruneInfoForRegion(commonCompactedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(c)); + + dataJanitorState.saveEmptyRegionForTime(recordTime + 1, commonEmptyRegion); + emptyRegions.put(recordTime, Bytes.toString(commonEmptyRegion)); + + notCompactedRegions.put(recordTime, Bytes.toString(commonNotCompactedRegion)); + + // Record the latest deleted region in all the runs except the last one + if (i < runs - 1) { + dataJanitorState.savePruneUpperBoundForRegion(newestDeletedRegion, pruneUpperBound - 1000); + RegionPruneInfo d = dataJanitorState.getPruneInfoForRegion(newestDeletedRegion); + compactedRegions.put(recordTime, new InvalidListPruningDebugTool.RegionPruneInfoPretty(d)); + } + } + + // Record the regions present at various times + for (long time : compactedRegions.asMap().keySet()) { + Set allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Iterables.addAll(allRegions, Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_BYTES)); + Iterables.addAll(allRegions, Iterables.transform(emptyRegions.get(time), STRING_TO_BYTES)); + Iterables.addAll(allRegions, Iterables.transform(notCompactedRegions.get(time), STRING_TO_BYTES)); + dataJanitorState.saveRegionsForTime(time, allRegions); + } + } + + @AfterClass + public static void cleanup() throws Exception { + pruningDebug.destroy(); + + hBaseAdmin.disableTable(pruneStateTable); + hBaseAdmin.deleteTable(pruneStateTable); + } + + @Before + public void before() throws Exception { + pruningDebug = new InvalidListPruningDebugTool(); + pruningDebug.initialize(conf); + } + + @After + public void after() throws Exception { + pruningDebug.destroy(); + } + + @Test + public void testUsage() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + Assert.assertFalse(pruningDebug.execute(new String[0], out)); + out.flush(); + readOutputStream(outputStream); + } + } + + @Test + public void testTimeRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the latest regions for latest recorded time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region"}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Get the latest regions for latest recorded time by giving the timestamp + long now = System.currentTimeMillis(); + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(now)}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Using relative time + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", "now-1s"}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(latestRecordTime)), readOutputStream(outputStream)); + + // Get the regions for the oldest recorded time + Long oldestRecordTime = compactedRegions.asMap().firstKey(); + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[] {"time-region", Long.toString(oldestRecordTime)}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(expectedRegionsForTime(oldestRecordTime)), readOutputStream(outputStream)); + } + } + + @Test + public void testGetPruneInfo() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + Long recordTime = compactedRegions.asMap().lastKey(); + RegionPruneInfo pruneInfo = compactedRegions.get(recordTime).first(); + Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", pruneInfo.getRegionNameAsString()}, out)); + out.flush(); + Assert.assertEquals(GSON.toJson(new InvalidListPruningDebugTool.RegionPruneInfoPretty(pruneInfo)), + readOutputStream(outputStream)); + + // non-exising region + String nonExistingRegion = "non-existing-region"; + outputStream.reset(); + Assert.assertTrue(pruningDebug.execute(new String[]{"prune-info", nonExistingRegion}, out)); + out.flush(); + Assert.assertEquals(String.format("No prune info found for the region %s.", nonExistingRegion), + readOutputStream(outputStream)); + } + } + + @Test + public void testIdleRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the list of regions that have the lowest prune upper bounds for the latest record time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + SortedSet latestExpected = + ImmutableSortedSet.copyOf(pruneUpperBoundAndStringComparator(), compactedRegions.get(latestRecordTime)); + pruningDebug.execute(new String[]{"idle-regions", "-1"}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with explicit time + outputStream.reset(); + pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime)}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with relative time + outputStream.reset(); + pruningDebug.execute(new String[]{"idle-regions", "-1", "now-2s"}, out); + out.flush(); + assertEquals(latestExpected, readOutputStream(outputStream)); + + // Same command with reduced number of regions + outputStream.reset(); + int limit = 2; + pruningDebug.execute(new String[]{"idle-regions", String.valueOf(limit), String.valueOf(latestRecordTime)}, out); + out.flush(); + Assert.assertEquals(GSON.toJson(subset(latestExpected, 0, limit)), readOutputStream(outputStream)); + + // For a different time, this time only live regions that are compacted are returned + outputStream.reset(); + Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1); + Set compactedRegionsTime = + Sets.newTreeSet(Iterables.transform(compactedRegions.get(secondLastRecordTime), PRUNE_INFO_TO_STRING)); + Set compactedRegionsLatest = + Sets.newTreeSet(Iterables.transform(compactedRegions.get(latestRecordTime), PRUNE_INFO_TO_STRING)); + Set liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest)); + pruningDebug.execute(new String[]{"idle-regions", "-1", String.valueOf(latestRecordTime - 1)}, out); + out.flush(); + List actual = GSON.fromJson(readOutputStream(outputStream), PRUNE_INFO_LIST_TYPE); + Assert.assertEquals(liveExpected, Sets.newTreeSet(Iterables.transform(actual, PRUNE_INFO_TO_STRING))); + } + } + + @Test + public void testToCompactRegions() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (PrintWriter out = new PrintWriter(outputStream)) { + // Get the regions that are not compacted for the latest time + Long latestRecordTime = compactedRegions.asMap().lastKey(); + SortedSet expected = notCompactedRegions.get(latestRecordTime); + pruningDebug.execute(new String[]{"to-compact-regions", "-1"}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with explicit time + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(latestRecordTime)}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with relative time + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", "now+1h-3m"}, out); + out.flush(); + Assert.assertEquals(expected, GSON.fromJson(readOutputStream(outputStream), SortedSet.class)); + + // Same command with reduced number of regions + int limit = 2; + outputStream.reset(); + pruningDebug.execute(new String[]{"to-compact-regions", String.valueOf(limit), String.valueOf(latestRecordTime)}, + out); + out.flush(); + // Assert that the actual set is a subset of expected, with size 2 (since the output is not sorted) + SortedSet actual = GSON.fromJson(readOutputStream(outputStream), + new TypeToken>() { }.getType()); + Assert.assertEquals(limit, actual.size()); + Assert.assertTrue(Sets.difference(actual, expected).isEmpty()); + + // For a different time, only live regions that are not compacted are returned + outputStream.reset(); + Long secondLastRecordTime = Iterables.get(compactedRegions.keySet(), 1); + Set compactedRegionsTime = notCompactedRegions.get(secondLastRecordTime); + Set compactedRegionsLatest = notCompactedRegions.get(latestRecordTime); + Set liveExpected = new TreeSet<>(Sets.intersection(compactedRegionsTime, compactedRegionsLatest)); + pruningDebug.execute(new String[]{"to-compact-regions", "-1", String.valueOf(secondLastRecordTime)}, out); + out.flush(); + Assert.assertEquals(GSON.toJson(liveExpected), readOutputStream(outputStream)); + } + } + + private static RegionsAtTime expectedRegionsForTime(long time) { + SortedSet regions = new TreeSet<>(); + regions.addAll(Sets.newTreeSet(Iterables.transform(compactedRegions.get(time), PRUNE_INFO_TO_STRING))); + regions.addAll(emptyRegions.get(time)); + regions.addAll(notCompactedRegions.get(time)); + return new RegionsAtTime(time, regions, new SimpleDateFormat(InvalidListPruningDebugTool.DATE_FORMAT)); + } + + private static Comparator stringComparator() { + return new Comparator() { + @Override + public int compare(InvalidListPruningDebugTool.RegionPruneInfoPretty o1, + InvalidListPruningDebugTool.RegionPruneInfoPretty o2) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + }; + } + + private static Comparator pruneUpperBoundAndStringComparator() { + return new Comparator() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound()); + if (result == 0) { + return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString()); + } + return result; + } + }; + } + + private String readOutputStream(ByteArrayOutputStream out) throws UnsupportedEncodingException { + String s = out.toString(Charsets.UTF_8.toString()); + if (DEBUG_PRINT) { + System.out.println(s); + } + // remove the last newline + return s.length() <= 1 ? "" : s.substring(0, s.length() - 1); + } + + private void assertEquals(Collection expectedSorted, String actualString) { + List actual = GSON.fromJson(actualString, PRUNE_INFO_LIST_TYPE); + List actualSorted = new ArrayList<>(actual); + Collections.sort(actualSorted, pruneUpperBoundAndStringComparator()); + + Assert.assertEquals(GSON.toJson(expectedSorted), GSON.toJson(actualSorted)); + } + + @SuppressWarnings("SameParameterValue") + private SortedSet subset(SortedSet set, int from, int to) { + SortedSet subset = new TreeSet<>(set.comparator()); + int i = from; + for (T e : set) { + if (i++ >= to) { + break; + } + subset.add(e); + } + return subset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b841c68/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java deleted file mode 100644 index 443c998..0000000 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tephra.hbase.txprune; - -import com.google.common.collect.Iterables; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.tephra.TxConstants; -import org.apache.tephra.txprune.RegionPruneInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import javax.annotation.Nullable; - -/** - * Invalid List Pruning Debug Tool. - */ -public class InvalidListPruningDebug { - private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); - private static final Gson GSON = new Gson(); - private DataJanitorState dataJanitorState; - private Connection connection; - private TableName tableName; - - /** - * Initialize the Invalid List Debug Tool. - * @param conf {@link Configuration} - * @throws IOException - */ - public void initialize(final Configuration conf) throws IOException { - LOG.debug("InvalidListPruningDebugMain : initialize method called"); - connection = ConnectionFactory.createConnection(conf); - tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); - dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return connection.getTable(tableName); - } - }); - } - - public void destroy() throws IOException { - if (connection != null) { - connection.close(); - } - } - - /** - * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions - * will stop the progress of pruning. - * - * @param numRegions number of regions - * @return {@link Set} of regions that needs to be compacted and flushed - */ - public Set getRegionsToBeCompacted(Integer numRegions) throws IOException { - // Fetch the live regions - Map> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); - if (latestTimeRegion.isEmpty()) { - return new HashSet<>(); - } - - Long timestamp = latestTimeRegion.keySet().iterator().next(); - SortedSet liveRegions = latestTimeRegion.get(timestamp); - - SortedSet emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); - SortedSet emptyRegionNames = new TreeSet<>(); - Iterable regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); - for (String regionString : regionStrings) { - emptyRegionNames.add(regionString); - } - - Set nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); - - // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are - // not empty and have not been registered prune upper bound - Queue prunedRegions = getIdleRegions(-1); - for (RegionPruneInfo prunedRegion : prunedRegions) { - if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { - nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); - } - } - - if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { - return nonEmptyRegions; - } - - Set subsetRegions = new HashSet<>(numRegions); - for (String regionName : nonEmptyRegions) { - if (subsetRegions.size() == numRegions) { - break; - } - subsetRegions.add(regionName); - } - return subsetRegions; - } - - /** - * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. - * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions - * that are known to be live will be returned. - * - * @param numRegions number of regions - * @return Map of region name and its prune upper bound - */ - public Queue getIdleRegions(Integer numRegions) throws IOException { - List regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); - if (regionPruneInfos.isEmpty()) { - return new LinkedList<>(); - } - - // Create a set with region names - Set pruneRegionNameSet = new HashSet<>(); - for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { - pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); - } - - // Fetch the live regions - Map> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); - if (!latestTimeRegion.isEmpty()) { - SortedSet liveRegions = latestTimeRegion.values().iterator().next(); - Set liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); - List liveRegionWithPruneInfoList = new ArrayList<>(); - for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { - if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { - liveRegionWithPruneInfoList.add(regionPruneInfo); - } - } - - // Use the subset of live regions and prune regions - regionPruneInfos = liveRegionWithPruneInfoList; - } - - if (numRegions < 0) { - numRegions = regionPruneInfos.size(); - } - - Queue lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator() { - @Override - public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { - return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); - } - }).maximumSize(numRegions).create(); - - for (RegionPruneInfo pruneInfo : regionPruneInfos) { - lowestPrunes.add(pruneInfo); - } - return lowestPrunes; - } - - /** - * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, - * it will return a null. - * - * @param regionId region id - * @return {@link RegionPruneInfo} of the region - * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} - */ - @Nullable - public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { - return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); - } - - /** - * - * @param time Given a time, provide the {@link TimeRegions} at or before that time - * @return transactional regions that are present at or before the given time - * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} - */ - public Map> getRegionsOnOrBeforeTime(Long time) throws IOException { - Map> regionMap = new HashMap<>(); - TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); - if (timeRegions == null) { - return regionMap; - } - SortedSet regionNames = new TreeSet<>(); - Iterable regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); - for (String regionString : regionStrings) { - regionNames.add(regionString); - } - regionMap.put(timeRegions.getTime(), regionNames); - return regionMap; - } - - private void printUsage(PrintWriter pw) { - pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning "); - pw.println("Available commands, corresponding parameters are:"); - pw.println("****************************************************"); - pw.println("time-region ts"); - pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + - "or the latest time before time 'ts'."); - pw.println("idle-regions limit"); - pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + - "provided as the limit, prune upper bounds of all regions are returned."); - pw.println("prune-info region-name-as-string"); - pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); - pw.println("to-compact-regions limit"); - pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + - "and have not registered a prune upper bound."); - } - - private boolean execute(String[] args) throws IOException { - try (PrintWriter pw = new PrintWriter(System.out)) { - if (args.length != 2) { - printUsage(pw); - return false; - } - - String command = args[0]; - String parameter = args[1]; - if ("time-region".equals(command)) { - Long time = Long.parseLong(parameter); - Map> timeRegion = getRegionsOnOrBeforeTime(time); - pw.println(GSON.toJson(timeRegion)); - return true; - } else if ("idle-regions".equals(command)) { - Integer numRegions = Integer.parseInt(parameter); - Queue regionPruneInfos = getIdleRegions(numRegions); - pw.println(GSON.toJson(regionPruneInfos)); - return true; - } else if ("prune-info".equals(command)) { - RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); - if (regionPruneInfo != null) { - pw.println(GSON.toJson(regionPruneInfo)); - } else { - pw.println(String.format("No prune info found for the region %s.", parameter)); - } - return true; - } else if ("to-compact-regions".equals(command)) { - Integer numRegions = Integer.parseInt(parameter); - Set toBeCompactedRegions = getRegionsToBeCompacted(numRegions); - pw.println(GSON.toJson(toBeCompactedRegions)); - return true; - } else { - pw.println(String.format("%s is not a valid command.", command)); - printUsage(pw); - return false; - } - } - } - - public static void main(String[] args) { - Configuration hConf = HBaseConfiguration.create(); - InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); - try { - pruningDebug.initialize(hConf); - boolean success = pruningDebug.execute(args); - pruningDebug.destroy(); - if (!success) { - System.exit(1); - } - } catch (IOException ex) { - LOG.error("Received an exception while trying to execute the debug tool. ", ex); - } - } -}