kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [kudu] 01/03: [backup] Support table name alterations between Kudu backups
Date Mon, 06 May 2019 18:09:23 GMT
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit aa0e52d2ac58bb900486a6bdd694eff1191881a9
Author: Grant Henke <granthenke@apache.org>
AuthorDate: Tue Apr 30 17:21:15 2019 -0500

    [backup] Support table name alterations between Kudu backups
    
    This patch enables table names to be altered between
    backups. It handles adding and renaming tables
    that overlap old table names.
    
    It does this by leveraging the table ID metadata,
    adding the table ID to the table directory, and building
    each BackupGraph by table ID instead of table name.
    
    Because we know that a table ID must always be unique
    and can never change, we can use the ID to link all
    related backups.
    
    The backup job uses the following high level logic to find
    table backups:
     1. Lookup all the current tableIds
     2. List the table directores filtering by tableIds
     3. Build a BackupGraph for each tableId
    
    The restore job uses the following high level logic to find
    table backups:
      1. List all the table directories
      2. Build a set of tableIds for the required tableNames
      3. Build a BackupGraph for each tableId
      4. Select the BackupGraph that used the tableName last
    
    Change-Id: Id08d1fa293d76538adc61fdfc7593b1900521e01
    Reviewed-on: http://gerrit.cloudera.org:8080/13210
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Mike Percy <mpercy@apache.org>
---
 .../scala/org/apache/kudu/backup/BackupGraph.scala |  11 +-
 .../scala/org/apache/kudu/backup/KuduBackup.scala  |  32 +++-
 .../scala/org/apache/kudu/backup/KuduRestore.scala |  16 +-
 .../scala/org/apache/kudu/backup/SessionIO.scala   | 181 +++++++++++++++------
 .../org/apache/kudu/backup/TestKuduBackup.scala    |  35 +++-
 .../org/apache/kudu/client/ListTablesRequest.java  |  12 +-
 .../org/apache/kudu/client/ListTablesResponse.java |  65 +++++++-
 7 files changed, 285 insertions(+), 67 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
index 2ed1b65..13bb9e3 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupGraph.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class BackupGraph(val tableName: String) {
+class BackupGraph(val tableId: String) {
   // Index of backup.fromMs -> backup for use in chaining backups together.
   private val adjacencyList = mutable.Map[Long, mutable.ListBuffer[BackupNode]]()
 
@@ -136,7 +136,7 @@ class BackupGraph(val tableName: String) {
    */
   def restorePath: BackupPath = {
     if (backupPaths.isEmpty) {
-      throw new RuntimeException(s"No valid backups found for table: $tableName")
+      throw new RuntimeException(s"No valid backups found for table ID: $tableId")
     }
 
     //  1. Pick the path with the most recent backup.
@@ -154,7 +154,7 @@ class BackupGraph(val tableName: String) {
    * @return
    */
   def filterByTime(timeMs: Long): BackupGraph = {
-    val result = new BackupGraph(tableName)
+    val result = new BackupGraph(tableId)
     val distinctBackups = adjacencyList.values.flatten.toSet
     distinctBackups.filter(_.metadata.getToMs <= timeMs).foreach(result.addBackup)
     result
@@ -185,6 +185,11 @@ case class BackupPath(backups: Seq[BackupNode]) {
   def lastBackup: BackupNode = backups.last
 
   /**
+   * @return the tableName for the entire path.
+   */
+  def tableName: String = backups.last.metadata.getTableName
+
+  /**
    * @return the toMs for the entire path.
    */
   def toMs: Long = backups.last.metadata.getToMs
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 5d498ce..2b0d84b 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -24,6 +24,8 @@ import org.apache.yetus.audience.InterfaceStability
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
+import scala.collection.JavaConverters._
+
 /**
  * The main class for a Kudu backup spark job.
  */
@@ -40,12 +42,35 @@ object KuduBackup {
         session.sparkContext
       )
     val io = new SessionIO(session, options)
+
+    // Read the required backup metadata.
+    val backupGraphs =
+      // Only read the backup metadata if it will be used.
+      if (!options.forceFull || options.fromMs != BackupOptions.DefaultFromMS) {
+        // Convert the input table names to be backed up into table IDs.
+        // This will allow us to link against old backup data by referencing
+        // the static table ID even when the table name changes between backups.
+        val nameToId = context.syncClient.getTablesList.getTableInfosList.asScala
+          .filter(info => options.tables.contains(info.getTableName))
+          .map(info => (info.getTableName, info.getTableId))
+          .toMap
+        val tableIds = options.tables.flatMap(nameToId.get)
+        io.readBackupGraphsByTableId(tableIds)
+      } else {
+        Seq[BackupGraph]()
+      }
+    // Key the backupMap by the table ID.
+    val backupMap = backupGraphs.map { graph =>
+      (graph.tableId, graph)
+    }.toMap
+
     // TODO (KUDU-2786): Make parallel so each table isn't process serially.
     // TODO (KUDU-2787): Handle single table failures.
     options.tables.foreach { tableName =>
       var tableOptions = options.copy() // Copy the options so we can modify them for the
table.
       val table = context.syncClient.openTable(tableName)
-      val backupPath = io.backupPath(tableName, tableOptions.toMs)
+      val tableId = table.getTableId
+      val backupPath = io.backupPath(table, tableOptions.toMs)
       val metadataPath = io.backupMetadataPath(backupPath)
       log.info(s"Backing up table $tableName to path: $backupPath")
 
@@ -59,9 +84,8 @@ object KuduBackup {
         incremental = true
       } else {
         log.info("Looking for a previous backup, forceFull or fromMs options are not set.")
-        val graph = io.readBackupGraph(tableName)
-        if (graph.hasFullBackup) {
-          val base = graph.backupBase
+        if (backupMap.contains(tableId) && backupMap(tableId).hasFullBackup) {
+          val base = backupMap(tableId).backupBase
           log.info(s"Setting fromMs to ${base.metadata.getToMs} from backup in path: ${base.path}")
           tableOptions = tableOptions.copy(fromMs = base.metadata.getToMs)
           incremental = true
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index d5fe12c..77bddeb 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -19,7 +19,6 @@ package org.apache.kudu.backup
 import org.apache.kudu.backup.Backup.TableMetadataPB
 import org.apache.kudu.client.AlterTableOptions
 import org.apache.kudu.client.KuduPartitioner
-import org.apache.kudu.client.NonCoveredRangeException
 import org.apache.kudu.client.Partition
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.spark.kudu.KuduContext
@@ -50,17 +49,26 @@ object KuduRestore {
       )
     val io = new SessionIO(session, options)
 
+    // Read the required backup metadata.
+    val backupGraphs = io.readBackupGraphsByTableName(options.tables, options.timestampMs)
+    // Key the backupMap by the last table name.
+    val backupMap = backupGraphs
+      .groupBy(_.restorePath.tableName)
+      .mapValues(_.maxBy(_.restorePath.toMs))
+
     // TODO (KUDU-2786): Make parallel so each table isn't processed serially.
     // TODO (KUDU-2787): Handle single table failures.
     options.tables.foreach { tableName =>
-      val graph = io.readBackupGraph(tableName).filterByTime(options.timestampMs)
+      if (!backupMap.contains(tableName)) {
+        throw new RuntimeException(s"No valid backups found for table: $tableName")
+      }
+      val graph = backupMap(tableName)
       val lastMetadata = graph.restorePath.backups.last.metadata
+      val restoreName = s"${lastMetadata.getTableName}${options.tableSuffix}"
       graph.restorePath.backups.foreach { backup =>
         log.info(s"Restoring table $tableName from path: ${backup.path}")
         val metadata = backup.metadata
         val isFullRestore = metadata.getFromMs == 0
-        val restoreName = s"${metadata.getTableName}${options.tableSuffix}"
-
         // TODO (KUDU-2788): Store the full metadata to compare/validate for each applied
partial.
 
         // On the full restore we may need to create the table.
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
index f49902e..82578ad 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala
@@ -24,10 +24,12 @@ import com.google.common.io.CharStreams
 import com.google.protobuf.util.JsonFormat
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.LocatedFileStatus
 import org.apache.hadoop.fs.Path
 import org.apache.kudu.Schema
 import org.apache.kudu.backup.Backup.TableMetadataPB
 import org.apache.kudu.backup.SessionIO._
+import org.apache.kudu.client.KuduTable
 import org.apache.kudu.spark.kudu.SparkUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.ByteType
@@ -45,14 +47,21 @@ import scala.collection.mutable
  * of metadata and data of the backup and restore jobs.
  *
  * The default backup directory structure is:
- * /<rootPath>/<tableName>/<backup-id>/
+ * /<rootPath>/<tableId>-<tableName>/<backup-id>/
  *   .kudu-metadata.json
  *   part-*.parquet
  *
- * In the above path the `/<rootPath>` can be used to distinguish separate backup groups.
- * The `<backup-id>` is currently the `toMs` time for the job.
- *
- * TODO (KUDU-2788): Should the tableName contain the table id?
+ * - rootPath: can be used to distinguish separate backup groups, jobs, or concerns.
+ * - tableId: the unique internal ID of the table being backed up.
+ * - tableName: the name of the table being backed up.
+ * - backup-id: A way to uniquely identify/group the data for a single backup run.
+ *   - Currently the `toMs` time for the job.
+ * - .kudu-metadata.json: Contains all of the metadata to support recreating the table,
+ *   linking backups by time, and handling data format changes.
+ *   - Written last so that failed backups will not have a metadata file and will not be
+ *     considered at restore time or backup linking time.
+ * - part-*.parquet: The data files containing the tables data.
+ *   - Incremental backups contain an additional “RowAction” byte column at the end.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -60,13 +69,12 @@ class SessionIO(val session: SparkSession, options: CommonOptions) {
   val log: Logger = LoggerFactory.getLogger(getClass)
 
   val conf: Configuration = session.sparkContext.hadoopConfiguration
-  val rootHPath: Path = new Path(options.rootPath)
-  val fs: FileSystem = rootHPath.getFileSystem(conf)
+  val rootPath: Path = new Path(options.rootPath)
+  val fs: FileSystem = rootPath.getFileSystem(conf)
 
   /**
    * Returns the Spark schema for backup data based on the Kudu Schema.
    * Additionally handles adding the RowAction column for incremental backup/restore.
-   * @return the Spark schema for backup data.
    */
   def dataSchema(schema: Schema, includeRowAction: Boolean = true): StructType = {
     var fields = SparkUtil.sparkSchema(schema).fields
@@ -91,21 +99,23 @@ class SessionIO(val session: SparkSession, options: CommonOptions) {
   }
 
   /**
-   * @return the path to the table directory.
+   * Return the path to the table directory.
    */
-  def tablePath(tableName: String): Path = {
-    new Path(options.rootPath, URLEncoder.encode(tableName, "UTF-8"))
+  def tablePath(table: KuduTable): Path = {
+    val tableName = URLEncoder.encode(table.getName, "UTF-8")
+    val dirName = s"${table.getTableId}-$tableName"
+    new Path(options.rootPath, dirName)
   }
 
   /**
-   * @return the backup path for a table and time.
+   * Return the backup path for a table and time.
    */
-  def backupPath(tableName: String, timestampMs: Long): Path = {
-    new Path(tablePath(tableName), timestampMs.toString)
+  def backupPath(table: KuduTable, timestampMs: Long): Path = {
+    new Path(tablePath(table), timestampMs.toString)
   }
 
   /**
-   * @return the path to the metadata file within a backup path.
+   * Return the path to the metadata file within a backup path.
    */
   def backupMetadataPath(backupPath: Path): Path = {
     new Path(backupPath, MetadataFileName)
@@ -113,11 +123,9 @@ class SessionIO(val session: SparkSession, options: CommonOptions) {
 
   /**
    * Serializes the table metadata to Json and writes it to the metadata path.
-   * @param tableMetadata the metadata to serialize.
-   * @param metadataPath the path to write the metadata file too.
    */
   def writeTableMetadata(tableMetadata: TableMetadataPB, metadataPath: Path): Unit = {
-    log.error(s"Writing metadata to $metadataPath")
+    log.info(s"Writing metadata to $metadataPath")
     val out = fs.create(metadataPath, /* overwrite= */ false)
     val json = JsonFormat.printer().print(tableMetadata)
     out.write(json.getBytes(StandardCharsets.UTF_8))
@@ -126,51 +134,130 @@ class SessionIO(val session: SparkSession, options: CommonOptions)
{
   }
 
   /**
-   * Reads an entire backup graph by reading all of the metadata files for the
-   * given table. See [[BackupGraph]] for more details.
-   * @param tableName the table to read a backup graph for.
-   * @return the full backup graph.
+   * Reads all of the backup graphs for a given list of table names and a time filter.
+   */
+  def readBackupGraphsByTableName(
+      tableNames: Seq[String],
+      timeMs: Long = System.currentTimeMillis()): Seq[BackupGraph] = {
+    // We also need to include the metadata from old table names.
+    // To handle this we list all directories, get the IDs for the tableNames,
+    // and then filter the directories by those IDs.
+    val allDirs = listAllTableDirs()
+    val encodedNames = tableNames.map(URLEncoder.encode(_, "UTF-8")).toSet
+    val tableIds =
+      allDirs.flatMap { dir =>
+        val dirName = dir.getName
+        val tableName = tableNameFromDirName(dirName)
+        if (encodedNames.contains(tableName)) {
+          Some(tableIdFromDirName(dirName))
+        } else {
+          None
+        }
+      }.toSet
+    val dirs = allDirs.filter(dir => tableIds.contains(tableIdFromDirName(dir.getName)))
+    buildBackupGraphs(dirs, timeMs)
+  }
+
+  /**
+   * Reads all of the backup graphs for a given list of table IDs and a time filter.
+   */
+  def readBackupGraphsByTableId(
+      tableIds: Seq[String],
+      timeMs: Long = System.currentTimeMillis()): Seq[BackupGraph] = {
+    val dirs = listTableIdDirs(tableIds)
+    buildBackupGraphs(dirs, timeMs)
+  }
+
+  /**
+   * Builds all of the backup graphs for a given list of directories by reading all of the
+   * metadata files and inserting them into a backup graph for each table id.
+   * See [[BackupGraph]] for more details.
+   */
+  private def buildBackupGraphs(dirs: Seq[Path], timeMs: Long): Seq[BackupGraph] = {
+    // Read all the metadata and filter by timesMs.
+    val metadata = dirs.flatMap(readTableBackups).filter(_._2.getToMs <= timeMs)
+    // Group the metadata by the table ID and create a BackupGraph for each table ID.
+    metadata
+      .groupBy(_._2.getTableId)
+      .map {
+        case (tableId, pm) =>
+          val graph = new BackupGraph(tableId)
+          pm.foreach {
+            case (path, metadata) =>
+              graph.addBackup(BackupNode(path, metadata))
+          }
+          graph
+      }
+      .toList
+  }
+
+  /**
+   * Return all of the table directories.
+   */
+  private def listAllTableDirs(): Seq[Path] = {
+    listMatching(_ => true)
+  }
+
+  /**
+   * Return the table directories for a given list of table IDs.
    */
-  def readBackupGraph(tableName: String): BackupGraph = {
-    val backups = readTableBackups(tableName)
-    val graph = new BackupGraph(tableName)
-    backups.foreach {
-      case (path, metadata) =>
-        graph.addBackup(BackupNode(path, metadata))
+  private def listTableIdDirs(tableIds: Seq[String]): Seq[Path] = {
+    val idSet = tableIds.toSet
+    listMatching { file =>
+      val name = file.getPath.getName
+      file.isDirectory && idSet.contains(tableIdFromDirName(name))
     }
-    graph
+  }
+
+  private def tableIdFromDirName(dirName: String): String = {
+    // Split to the left of "-" and keep the first half to get the table ID.
+    dirName.splitAt(dirName.indexOf("-"))._1
+  }
+
+  private def tableNameFromDirName(dirName: String): String = {
+    // Split to the right of "-" and keep the second half to get the table name.
+    dirName.splitAt(dirName.indexOf("-") + 1)._2
   }
 
   /**
-   * Reads and returns all of the metadata for a given table.
-   * @param tableName the table to read the metadata for.
-   * @return a sequence of all the paths and metadata.
+   * List all the files in the root directory and return the files that match
+   * according to the passed function.
    */
-  // TODO: Also use table-id to find backups.
-  private def readTableBackups(tableName: String): Seq[(Path, TableMetadataPB)] = {
-    val hPath = new Path(tablePath(tableName).toString)
-    val results = new mutable.ListBuffer[(Path, TableMetadataPB)]()
-    if (fs.exists(hPath)) {
-      val iter = fs.listLocatedStatus(hPath)
+  private def listMatching(fn: LocatedFileStatus => Boolean): Seq[Path] = {
+    val results = new mutable.ListBuffer[Path]()
+    if (fs.exists(rootPath)) {
+      val iter = fs.listLocatedStatus(rootPath)
       while (iter.hasNext) {
         val file = iter.next()
-        if (file.isDirectory) {
-          val metadataHPath = new Path(file.getPath, MetadataFileName)
-          if (fs.exists(metadataHPath)) {
-            val metadata = readTableMetadata(metadataHPath)
-            results += ((file.getPath, metadata))
-          }
+        if (fn(file)) {
+          results += file.getPath
+        }
+      }
+    }
+    results
+  }
+
+  /**
+   * Reads and returns all of the metadata for a given table directory.
+   */
+  private def readTableBackups(tableDir: Path): Seq[(Path, TableMetadataPB)] = {
+    val results = new mutable.ListBuffer[(Path, TableMetadataPB)]()
+    val files = fs.listStatus(tableDir)
+    files.foreach { file =>
+      if (file.isDirectory) {
+        val metadataPath = new Path(file.getPath, MetadataFileName)
+        if (fs.exists(metadataPath)) {
+          val metadata = readTableMetadata(metadataPath)
+          results += ((file.getPath, metadata))
         }
       }
     }
-    log.error(s"Found ${results.size} paths in ${hPath.toString}")
+    log.info(s"Found ${results.size} paths in ${tableDir.toString}")
     results.toList
   }
 
   /**
    * Reads and deserializes the metadata file at the given path.
-   * @param metadataPath the path to the metadata file.
-   * @return the deserialized table metadata.
    */
   def readTableMetadata(metadataPath: Path): TableMetadataPB = {
     val in = new InputStreamReader(fs.open(metadataPath), StandardCharsets.UTF_8)
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index f253743..6de14d3 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -405,6 +405,37 @@ class TestKuduBackup extends KuduTestSuite {
     assertEquals(expectedKeys, rows)
   }
 
+  @Test
+  def testTableAlterHandling(): Unit = {
+    // Create the initial table and load it with data.
+    val tableName = "testTableAlterHandling"
+    var table = kuduClient.createTable(tableName, schema, tableOptions)
+    insertRows(table, 100)
+
+    // Run and validate initial backup.
+    backupAndValidateTable(tableName, 100, false)
+
+    // Rename the table and insert more rows
+    val newTableName = "impala::default.testTableAlterHandling"
+    kuduClient.alterTable(tableName, new AlterTableOptions().renameTable(newTableName))
+    table = kuduClient.openTable(newTableName)
+    insertRows(table, 100, 100)
+
+    // Run and validate an incremental backup.
+    backupAndValidateTable(newTableName, 100, true)
+
+    // Create a new table with the old name.
+    val tableWithOldName = kuduClient.createTable(tableName, schema, tableOptions)
+    insertRows(tableWithOldName, 50)
+
+    // Backup the table with the old name.
+    backupAndValidateTable(tableName, 50, false)
+
+    // Restore the tables and check the row counts.
+    restoreAndValidateTable(newTableName, 200)
+    restoreAndValidateTable(tableName, 50)
+  }
+
   def createPartitionRow(value: Int): PartialRow = {
     val row = schema.newPartialRow()
     row.addInt("key", value)
@@ -505,7 +536,8 @@ class TestKuduBackup extends KuduTestSuite {
       expectIncremental: Boolean): Unit = {
     val io = new SessionIO(ss, options)
     val tableName = options.tables.head
-    val backupPath = io.backupPath(tableName, options.toMs)
+    val table = harness.getClient.openTable(tableName)
+    val backupPath = io.backupPath(table, options.toMs)
     val metadataPath = io.backupMetadataPath(backupPath)
     val metadata = io.readTableMetadata(metadataPath)
 
@@ -517,7 +549,6 @@ class TestKuduBackup extends KuduTestSuite {
     }
 
     // Verify the output data.
-    val table = harness.getClient.openTable(tableName)
     val schema = io.dataSchema(table.getSchema, expectIncremental)
     val df = ss.sqlContext.read
       .format(metadata.getDataFormat)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
index a0e43a7..bc77664 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.protobuf.Message;
+import org.apache.kudu.client.ListTablesResponse.TableInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.util.Timer;
 
@@ -66,14 +67,13 @@ class ListTablesRequest extends KuduRpc<ListTablesResponse> {
     final Master.ListTablesResponsePB.Builder respBuilder =
         Master.ListTablesResponsePB.newBuilder();
     readProtobuf(callResponse.getPBMessage(), respBuilder);
-    int serversCount = respBuilder.getTablesCount();
-    List<String> tables = new ArrayList<String>(serversCount);
-    for (Master.ListTablesResponsePB.TableInfo info : respBuilder.getTablesList()) {
-      tables.add(info.getName());
+    int tablesCount = respBuilder.getTablesCount();
+    List<TableInfo> tableInfos = new ArrayList<>(tablesCount);
+    for (Master.ListTablesResponsePB.TableInfo infoPb : respBuilder.getTablesList()) {
+      tableInfos.add(new TableInfo(infoPb.getId().toStringUtf8(), infoPb.getName()));
     }
     ListTablesResponse response = new ListTablesResponse(timeoutTracker.getElapsedMillis(),
-                                                         tsUUID,
-                                                         tables);
+                                                         tsUUID, tableInfos);
     return new Pair<ListTablesResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
index d7d14e2..4ddcf44 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesResponse.java
@@ -17,8 +17,11 @@
 
 package org.apache.kudu.client;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -26,10 +29,16 @@ import org.apache.yetus.audience.InterfaceStability;
 @InterfaceStability.Evolving
 public class ListTablesResponse extends KuduRpcResponse {
 
+  private final List<TableInfo> tableInfosList;
   private final List<String> tablesList;
 
-  ListTablesResponse(long elapsedMillis, String tsUUID, List<String> tablesList) {
+  ListTablesResponse(long elapsedMillis, String tsUUID, List<TableInfo> tableInfosList)
{
     super(elapsedMillis, tsUUID);
+    List<String> tablesList = new ArrayList<>();
+    for (TableInfo info : tableInfosList) {
+      tablesList.add(info.getTableName());
+    }
+    this.tableInfosList = tableInfosList;
     this.tablesList = tablesList;
   }
 
@@ -40,4 +49,58 @@ public class ListTablesResponse extends KuduRpcResponse {
   public List<String> getTablesList() {
     return tablesList;
   }
+
+  /**
+   * Get the list of tables as specified in the request.
+   * @return a list of TableInfo
+   */
+  public List<TableInfo> getTableInfosList() {
+    return tableInfosList;
+  }
+
+  public static class TableInfo {
+    private final String tableId;
+    private final String tableName;
+
+    TableInfo(String tableId, String tableName) {
+      this.tableId = tableId;
+      this.tableName = tableName;
+    }
+
+    /**
+     * @return the table id
+     */
+    public String getTableId() {
+      return tableId;
+    }
+
+    /**
+     * @return the table name
+     */
+    public String getTableName() {
+      return tableName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      TableInfo tableInfo = (TableInfo) o;
+      return Objects.equal(tableId, tableInfo.tableId) &&
+          Objects.equal(tableName, tableInfo.tableName);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(tableId, tableName);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("tableId", tableId)
+          .add("tableName", tableName)
+          .toString();
+    }
+  }
 }


Mime
View raw message